-
Notifications
You must be signed in to change notification settings - Fork 0
Added test for read/writing Parquet files as streams to IBucket #144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Important Review skippedReview was skipped due to path filters ⛔ Files ignored due to path filters (3)
CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including You can disable this status message by setting the 📝 WalkthroughWalkthroughThis update introduces enhanced support for Parquet file handling in the test suite and core S3 interaction logic. The S3 bucket class now supports configurable buffering for input streams, improving flexibility in stream handling. New test dependencies for Parquet and Hadoop are added to the Maven configuration. Test classes are extended with new methods to verify the ability to write and read Parquet-format data streams using the storage interface. A new utility class provides in-memory and stream-based wrappers for Parquet's input and output interfaces, enabling comprehensive in-memory Parquet testing. Additionally, a new Changes
Sequence Diagram(s)sequenceDiagram
participant Test as Test Method
participant Bucket as IBucket Implementation
participant Parquet as ParquetUtils
participant S3 as S3Bucket (for S3 tests)
Test->>Parquet: generateParquetOutput()
Parquet-->>Test: InputStream with Parquet data
Test->>Bucket: putObjectStream("file.parquet", InputStream)
Bucket-->>Bucket: Store data (in-memory or S3)
Test->>Bucket: getObjectStream("file.parquet")
Bucket-->>Test: InputStream (buffered if S3Bucket)
Test->>Parquet: ParquetFileReader(InputStream)
Parquet-->>Test: Read and verify data rows
Poem
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)
80-89
: 🛠️ Refactor suggestion
⚠️ Potential issueDo not close the shared
S3AsyncClient
inside everyputObjectStream
call
finally { s3AsyncClient.close(); }
shuts the client down after a single upload, rendering theS3Bucket
instance unusable for any subsequent asynchronous operations (including multi-part uploads executed later in the same test run or in production code).
Move the shutdown logic to an explicitclose()
method onS3Bucket
, or let the application lifecycle manage it.@@ - } finally { - s3AsyncClient.close(); - } + }
🧹 Nitpick comments (2)
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)
35-48
: Validate and rename the instance-level buffer size
- A negative or zero
bufSize
argument silently produces an unusableBufferedInputStream
; add validation.BUF_SIZE
is not a constant but an instance field; Java conventions recommend lowerCamelCase for variables.- protected final int BUF_SIZE; // 8 KB by default + protected final int bufSize; // validated, per-instance @@ - this.BUF_SIZE = bufSize; + if (bufSize <= 0) { + throw new IllegalArgumentException("bufSize must be positive"); + } + this.bufSize = bufSize;Remember to update usages (
BUF_SIZE → bufSize
) below.java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (1)
134-169
: Use try-with-resources forObjectStream
to avoid leaks
objectStream
is opened but never closed if an assertion fails. Wrap it in a try-with-resources block:- ObjectStream objectStream = storage.getObjectStream(path); - assertTrue(objectStream.getStream().markSupported()); + try (ObjectStream objectStream = storage.getObjectStream(path)) { + assertTrue(objectStream.getStream().markSupported()); ... - } + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
java/pom.xml
(1 hunks)java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
(3 hunks)java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java
(4 hunks)java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java
(1 hunks)java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java
(1 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java
(2 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java
(1 hunks)
🔇 Additional comments (7)
java/pom.xml (1)
55-67
: Dependencies for Parquet testing look good!The addition of Parquet and Hadoop dependencies with test scope is appropriate for the new streaming tests. Using recent versions (Parquet 1.15.1 and Hadoop 3.4.1) ensures compatibility with modern features while keeping them isolated to the test environment.
java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java (1)
44-47
: Test for Parquet streaming looks good!The new test method follows the established pattern of the class, delegating to the shared tester implementation. This maintains consistency while adding the new Parquet streaming functionality.
java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java (1)
29-32
: Test implementation matches established pattern!The added test follows the same pattern as in S3BucketTest, ensuring consistent testing of Parquet functionality across different bucket implementations.
java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java (2)
6-6
: Import for @disabled annotation is appropriateThe import is correctly added to support the disabled test.
44-48
: Well-documented disabled test for SDK V1Good job properly documenting why this test is disabled with a clear explanation about AWS SDK V1's S3ObjectInputStream limitations for Parquet reading. This approach preserves test consistency across implementations while acknowledging the technical constraint.
java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java (1)
163-171
: Re-mark the buffered stream with a generous read-limitDown-stream code (
SeekableInputStreamWrapper
) relies onInputStream.reset()
to seek backwards.
BufferedInputStream
marks are invalidated once more thanreadLimit
bytes have been read; passing0
(as done later) causesreset()
to fail after the first read.Although the faulty code resides in
SeekableInputStreamWrapper
, you can protect yourself here by pre-marking the stream with a large limit:BufferedInputStream bis = new BufferedInputStream(inputStream, bufSize); bis.mark(Integer.MAX_VALUE); return new ObjectStream(bis, name.toString());(See dedicated comment in
ParquetUtils.java
for the actual fix.)java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java (1)
305-342
: Mark/reset support in the generated Parquet pipe may fail
SeekableInputStreamWrapper
resets to the previously set mark (see separate comment) but the pipe created here is not wrapped in aBufferedInputStream
and no mark is set. When tests later callobjectStream.getStream().markSupported()
the result istrue
(because S3 layer wraps it), yet seeking backwards on the returned stream will fail outside the test environment.Consider wrapping
inputStreamToReturn
:-PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); +PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); +inputStreamToReturn.mark(Integer.MAX_VALUE);
public long getLength() throws IOException { | ||
return stream.getStream().available(); | ||
} | ||
|
||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
available()
is not a reliable file length
InputStream.available()
returns the number of immediately readable bytes, not the total length. For S3 streams this is often 0
, which will break Parquet’s footer discovery.
Provide length via ObjectStream.getSize()
(if available) or buffer the stream to measure once:
public long getLength() {
return stream.getSize(); // expose object size from bucket meta-data
}
If no size is available, document the limitation clearly.
✅ Addressed in commit 3fcf702
public static class SeekableInputStreamWrapper extends SeekableInputStream { | ||
private final InputStream in; | ||
private long pos = 0; | ||
private final int markLimit = 0; | ||
|
||
/** | ||
* @param in the raw InputStream (e.g. S3ObjectInputStream) | ||
*/ | ||
public SeekableInputStreamWrapper(InputStream in) { | ||
this.in = in; | ||
in.mark(markLimit); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
markLimit
set to 0 breaks backward seek
– increase and safeguard
SeekableInputStreamWrapper
sets in.mark(markLimit)
with markLimit = 0
; after a single byte is read, reset()
throws IOException ("Mark invalid")
, which will surface when Parquet seeks backwards.
Fix:
- private final int markLimit = 0;
+ private static final int MARK_LIMIT = Integer.MAX_VALUE; // allow full rewind
@@
- public SeekableInputStreamWrapper(InputStream in) {
- this.in = in;
- in.mark(markLimit);
+ public SeekableInputStreamWrapper(InputStream in) {
+ // Ensure mark/reset capability
+ this.in = in.markSupported() ? in : new BufferedInputStream(in);
+ this.in.mark(MARK_LIMIT);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public static class SeekableInputStreamWrapper extends SeekableInputStream { | |
private final InputStream in; | |
private long pos = 0; | |
private final int markLimit = 0; | |
/** | |
* @param in the raw InputStream (e.g. S3ObjectInputStream) | |
*/ | |
public SeekableInputStreamWrapper(InputStream in) { | |
this.in = in; | |
in.mark(markLimit); | |
public static class SeekableInputStreamWrapper extends SeekableInputStream { | |
private final InputStream in; | |
private long pos = 0; | |
private static final int MARK_LIMIT = Integer.MAX_VALUE; // allow full rewind | |
/** | |
* @param in the raw InputStream (e.g. S3ObjectInputStream) | |
*/ | |
public SeekableInputStreamWrapper(InputStream in) { | |
// Ensure mark/reset capability | |
this.in = in.markSupported() ? in : new BufferedInputStream(in); | |
this.in.mark(MARK_LIMIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! Fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asuiu Awesome! 🚀 That's going to make the Parquet implementation way more robust. Sometimes the simplest fixes prevent the biggest headaches later. Seeking backward properly is critical for Parquet reading operations.
(_/)
(•ᴗ•)
/ >🔍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java (1)
63-67
: Implementation looks good but consider potential inefficiency.The implementation correctly returns the object's size by leveraging the existing
getObject
method. This approach is simple but loads the entire object into memory to determine its size, which could be inefficient for very large objects. Since this is a memory-based implementation primarily for testing (as noted in the class documentation), this is acceptable, but it's worth documenting this limitation.@Override public long getSize(PurePosixPath name) throws IOException { + // Note: This loads the entire object into memory to determine size + // which is acceptable for test scenarios but inefficient for large objects byte[] content = getObject(name); return content.length; }java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (1)
86-98
: Well-implemented S3 size retrieval with proper exception handling.This implementation is efficient as it retrieves only the metadata rather than downloading the entire object content. The exception handling is thorough, properly differentiating between a missing object (404) and other potential errors.
Consider adding braces around the single-line if statement at line 92-93 for consistency with the codebase style and to prevent potential future bugs:
catch (AmazonS3Exception e) { if (e.getStatusCode() == 404) + { throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); + } throw new IOException("Failed to get object metadata: " + e.getMessage(), e); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
java/src/main/java/com/esamtrade/bucketbase/IBucket.java
(1 hunks)java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java
(1 hunks)java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
(4 hunks)java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java
(2 hunks)java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java
(4 hunks)java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java
(1 hunks)java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java
(1 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java
(2 hunks)java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (6)
- java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java
- java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java
- java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java
- java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java
- java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
- java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java
🔇 Additional comments (2)
java/src/main/java/com/esamtrade/bucketbase/IBucket.java (1)
30-30
: Interface extension looks good.The new method
getSize
is a solid addition to theIBucket
interface and provides a standardized way to retrieve object sizes. The method signature appropriately follows the same pattern as other interface methods withIOException
handling.java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java (1)
3-3
: LGTM: Required import for SDK exception handling.The added import for
SdkClientException
is necessary for the appropriate exception handling in the newgetSize
method.
and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files
and added testPutAndGetMultiUploadObjectStream to test S3 multipart upload with large files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only some minor changes suggested here.
CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); | ||
return CompletedPart.builder() | ||
.partNumber(partNumber) | ||
.eTag(uploadPartResponse.join().eTag()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if you join() here on the future response, you choose to perform the uploads sequentially. Is this the intention?
}); | ||
writerThread.start(); | ||
try { | ||
Thread.sleep(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary?
|
||
// wrap pipe in our PositionOutputStream | ||
ParquetUtils.PositionOutputStreamWrapper posOutStream = new ParquetUtils.PositionOutputStreamWrapper(pipedOutputStream); | ||
ParquetUtils.OutputFileWrapper OutputFileWrapper = new ParquetUtils.OutputFileWrapper(posOutStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please consider renaming this to camelcase: outputFileWrapper
<!-- Parquet dependencies --> | ||
<dependency> | ||
<groupId>org.apache.parquet</groupId> | ||
<artifactId>parquet-hadoop-bundle</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't parquet-hadoop enough? Do we need the whole bundle?
This doesn't change the library itself (excepting a minor add of buffering support in S3 bucket),
but it just adds a functional test that writes and reads in a stream manner a Parquet file to buckets.
Summary by CodeRabbit