diff --git a/java/pom.xml b/java/pom.xml index f8102ef..71138f4 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -51,6 +51,21 @@ ${junit.version} test + + + + org.apache.parquet + parquet-hadoop-bundle + 1.15.1 + test + + + org.apache.hadoop + hadoop-client + 3.4.1 + test + + diff --git a/java/src/main/java/com/esamtrade/bucketbase/IBucket.java b/java/src/main/java/com/esamtrade/bucketbase/IBucket.java index ecd6ec4..2e4fc14 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/IBucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/IBucket.java @@ -7,7 +7,6 @@ import java.nio.file.StandardCopyOption; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -28,6 +27,8 @@ public interface IBucket { ObjectStream getObjectStream(PurePosixPath name) throws IOException; + long getSize(PurePosixPath name) throws IOException; + List listObjects(PurePosixPath prefix) throws IOException; ShallowListing shallowListObjects(PurePosixPath prefix) throws IOException; diff --git a/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java b/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java index 25eaab1..4714d0b 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java @@ -60,6 +60,12 @@ public ObjectStream getObjectStream(PurePosixPath name) throws FileNotFoundExcep return new ObjectStream(new ByteArrayInputStream(content), name.toString()); } + @Override + public long getSize(PurePosixPath name) throws IOException { + byte[] content = getObject(name); + return content.length; + } + @Override public List listObjects(PurePosixPath prefix) { splitPrefix(prefix); // validate prefix diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java index f50cfea..3a72013 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java @@ -1,8 +1,10 @@ package com.esamtrade.bucketbase; +import com.amazonaws.SdkClientException; import org.apache.commons.codec.digest.DigestUtils; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; @@ -11,11 +13,13 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.*; +import java.io.BufferedInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; import java.util.HashSet; import java.util.List; @@ -24,20 +28,27 @@ import java.util.stream.Collectors; public class S3Bucket extends BaseBucket { - + private final static int DEFAULT_BUF_SIZE = 8 * 1024; + private final static int DEFAULT_UPLOAD_PART_SIZE = 5 * 1024 * 1024; + protected final int BUF_SIZE; // 8 KB by default protected S3Client s3Client; protected S3AsyncClient s3AsyncClient; protected String bucketName; - public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName) { + public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName, int bufSize) { this.s3Client = s3Client; this.s3AsyncClient = s3AsyncClient; this.bucketName = bucketName; + this.BUF_SIZE = bufSize; } public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName) { - this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName); + this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, DEFAULT_BUF_SIZE); + } + + public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName, int bufSize) { + this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, bufSize); } private static S3Client createS3Client(String endpoint, String accessKey, String secretKey) { @@ -70,26 +81,40 @@ public void putObject(PurePosixPath name, byte[] content) { } @Override - public void putObjectStream(PurePosixPath name, InputStream stream) { + public void putObjectStream(PurePosixPath name, InputStream stream) throws IOException { String _name = validateName(name); try { uploadLargeStream(_name, stream); } catch (Exception e) { - throw new RuntimeException("Failed to upload object: " + _name, e); + throw new IOException("Failed to upload object: " + _name, e); } finally { s3AsyncClient.close(); } } - private void uploadLargeStream(String key, InputStream inputStream) { - int partSize = 5 * 1024 * 1024; // 5 MB + private void uploadLargeStream(String key, InputStream inputStream) throws IOException { + // Please note, that if the input stream will have less than 5MB, the S3 multipart upload throws 400 code (upload is smaller than the minimum allowed object size) List completedParts = new ArrayList<>(); - byte[] buffer = new byte[partSize]; + byte[] buffer = new byte[DEFAULT_UPLOAD_PART_SIZE]; int bytesRead; int partNumber = 1; - // 1. Initiate the multipart upload + bytesRead = readUploadBuffer(inputStream, buffer); + if (bytesRead == 0) { + // Empty stream, create empty object + this.putObject(PurePosixPath.from(key), new byte[0]); + return; + } + + if (bytesRead < DEFAULT_UPLOAD_PART_SIZE) { + byte[] content = Arrays.copyOf(buffer, bytesRead); + // Small file, use regular putObject to avoid multipart upload being rejected + this.putObject(PurePosixPath.from(key), content); + return; + } + + // 1. Initiate the multipart upload for large files CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder() .bucket(bucketName) .key(key) @@ -98,22 +123,16 @@ private void uploadLargeStream(String key, InputStream inputStream) { String uploadId = response.uploadId(); try { - // 2. Read the input stream and upload each part - while ((bytesRead = inputStream.read(buffer)) != -1) { - byte[] bytesToUpload = (bytesRead < partSize) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer; - UploadPartRequest uploadPartRequest = UploadPartRequest.builder() - .bucket(bucketName) - .key(key) - .uploadId(uploadId) - .partNumber(partNumber) - .contentLength((long) bytesRead) - .build(); - AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload); - CompletableFuture uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); - completedParts.add(CompletedPart.builder() - .partNumber(partNumber) - .eTag(uploadPartResponse.join().eTag()) - .build()); + // Upload the first buffer we already read + CompletedPart firstPart = uploadPart(key, uploadId, partNumber, bytesRead, buffer); + completedParts.add(firstPart); + partNumber++; + + // 2. Continue reading and uploading remaining parts + while ((bytesRead = readUploadBuffer(inputStream, buffer)) != 0) { + byte[] bytesToUpload = (bytesRead < DEFAULT_UPLOAD_PART_SIZE) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer; + CompletedPart completedPart = uploadPart(key, uploadId, partNumber, bytesRead, bytesToUpload); + completedParts.add(completedPart); partNumber++; } @@ -133,10 +152,43 @@ private void uploadLargeStream(String key, InputStream inputStream) { .uploadId(uploadId) .build(); s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest).join(); - throw new RuntimeException("Failed to upload object: " + key, e); + throw new IOException("Failed to upload object: " + key, e); } } + private int readUploadBuffer(InputStream inputStream, byte[] buffer) throws IOException { + int totalBytesRead = inputStream.read(buffer); + + if (totalBytesRead == -1) { + return 0; + } + + if (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE) { + int bytesRead; + while (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE && (bytesRead = inputStream.read(buffer, totalBytesRead, DEFAULT_UPLOAD_PART_SIZE - totalBytesRead)) != -1) { + totalBytesRead += bytesRead; + } + } + + return totalBytesRead; + } + + private CompletedPart uploadPart(String key, String uploadId, int partNumber, long bytesRead, byte[] bytesToUpload) { + UploadPartRequest uploadPartRequest = UploadPartRequest.builder() + .bucket(bucketName) + .key(key) + .uploadId(uploadId) + .partNumber(partNumber) + .contentLength(bytesRead) + .build(); + AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload); + CompletableFuture uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody); + return CompletedPart.builder() + .partNumber(partNumber) + .eTag(uploadPartResponse.join().eTag()) + .build(); + } + @Override public byte[] getObject(PurePosixPath name) throws IOException { @@ -159,14 +211,29 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { .key(name.toString()) .build(); InputStream inputStream = s3Client.getObject(request, ResponseTransformer.toInputStream()); - return new ObjectStream(inputStream, name.toString()); + BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, BUF_SIZE); + return new ObjectStream(bufferedInputStream, name.toString()); } catch (NoSuchKeyException e) { throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); } } - @Override + public long getSize(PurePosixPath name) throws IOException { + try { + HeadObjectRequest request = HeadObjectRequest.builder() + .bucket(bucketName) + .key(name.toString()) + .build(); + HeadObjectResponse response = s3Client.headObject(request); + return response.contentLength(); + } catch (NoSuchKeyException e) { + throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); + } catch (AwsServiceException | SdkClientException e) { + throw new IOException("Failed to get object size: " + name, e); + } + } + /** * Lists all objects in the S3 bucket with the given prefix. * @@ -175,17 +242,15 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { * @param prefix The prefix to filter objects by. * @return A list of paths to the objects in the bucket. */ + @Override public List listObjects(PurePosixPath prefix) { splitPrefix(prefix); // validate prefix - List result = new ArrayList<>(); ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(bucketName) .prefix(prefix.toString()) .build(); - List results = s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList(); - - return results; + return s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList(); } @Override diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java b/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java index f35507e..ad5b6ce 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java @@ -1,5 +1,6 @@ package com.esamtrade.bucketbase; +import com.amazonaws.SdkClientException; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.client.builder.AwsClientBuilder; @@ -82,6 +83,19 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException { return new ObjectStream(inputStream, name.toString()); } + @Override + public long getSize(PurePosixPath name) throws IOException { + try { + return s3Client.getObjectMetadata(bucketName, name.toString()).getContentLength(); + } + catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) + throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName); + throw new IOException("Failed to get object metadata: " + e.getMessage(), e); + } catch (SdkClientException e) { + throw new IOException("Failed to get object metadata: " + e.getMessage(), e); + } + } /** * Retrieves a list of object paths stored in the bucket that match the given prefix. diff --git a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java index 401513e..3512839 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java +++ b/java/src/test/java/com/esamtrade/bucketbase/IBucketTester.java @@ -1,11 +1,32 @@ package com.esamtrade.bucketbase; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Types; + import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream; @@ -15,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertIterableEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -24,7 +46,6 @@ public class IBucketTester { private static final List INVALID_PREFIXES = List.of("/", "/dir", "star*1", "dir1/a\\file.txt", "at@gmail", "sharp#1", "dollar$1", "comma,"); private final BaseBucket storage; private final String uniqueSuffix; - private final String PATH_WITH_2025_KEYS = "test-dir-with-2025-keys/"; public IBucketTester(BaseBucket storage) { this.storage = storage; @@ -52,7 +73,7 @@ public void testPutAndGetObject() throws IOException { String sContent = "Test content"; storage.putObject(path, sContent.getBytes()); retrievedContent = storage.getObject(path); - assertArrayEquals(retrievedContent, sContent.getBytes("utf-8")); + assertArrayEquals(retrievedContent, sContent.getBytes(StandardCharsets.UTF_8)); // ByteArray content path = PurePosixPath.from(uniqueDir, "file1.ba"); @@ -65,7 +86,7 @@ public void testPutAndGetObject() throws IOException { String stringPath = uniqueDir + "/file1.txt"; storage.putObject(PurePosixPath.from(stringPath), sContent.getBytes()); retrievedContent = storage.getObject(PurePosixPath.from(stringPath)); - assertArrayEquals(retrievedContent, sContent.getBytes("utf-8")); + assertArrayEquals(retrievedContent, sContent.getBytes(StandardCharsets.UTF_8)); // Non-existent path PurePosixPath nonExistentPath = PurePosixPath.from(uniqueDir, "inexistent.txt"); @@ -111,6 +132,81 @@ public void testPutAndGetObjectStream() throws IOException { assertThrows(FileNotFoundException.class, () -> storage.getObjectStream(nonExistentPath)); } + public void testPutAndGetParquetObjectStream() throws IOException { + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") + .named("TestSchema"); + + String uniqueDir = "dir" + uniqueSuffix; + + // Binary content + PurePosixPath path = PurePosixPath.from(uniqueDir, "file1.parquet"); + int totalRows = 3; + InputStream inputStream = generateParquetOutput(totalRows); + storage.putObjectStream(path, inputStream); + + int count = readRowsCountFromParquet(path, schema); + assertEquals(totalRows, count); + } + + public void testPutAndGetMultiUploadLargeParquetObjectStream() throws IOException { + // This test is intended to generate ~6MB parquet file, so triggering multiple multipart uploads for S3 + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") + .named("TestSchema"); + + String uniqueDir = "dir" + uniqueSuffix; + + // Binary content + PurePosixPath path = PurePosixPath.from(uniqueDir, "file20.parquet"); + int totalRows = 1_500_000; + InputStream inputStream = generateParquetOutput(totalRows); + storage.putObjectStream(path, inputStream); + + int count = readRowsCountFromParquet(path, schema); + assertEquals(totalRows, count); + } + + private int readRowsCountFromParquet(PurePosixPath path, MessageType schema) throws IOException { + ObjectStream objectStream = storage.getObjectStream(path); + assertTrue(objectStream.getStream().markSupported()); + long size = storage.getSize(path); + + InputFile inFile = new ParquetUtils.StreamInputFile(objectStream, size); + int count = 0; + try (ParquetFileReader reader = ParquetFileReader.open(inFile)) { + // get actual file schema + var metadata = reader.getFooter().getFileMetaData(); + MessageType fileSchema = metadata.getSchema(); + PageReadStore pages; + while ((pages = reader.readNextRowGroup()) != null) { + MessageColumnIO columnIO = new ColumnIOFactory() + .getColumnIO(schema, fileSchema); + RecordReader rr = columnIO.getRecordReader(pages, new GroupRecordConverter(schema)); + for (int i = 0, rows = (int) pages.getRowCount(); i < rows; i++) { + Group g = rr.read(); + assertEquals(count, g.getInteger("int_field", 0)); + count++; + } + } + } + return count; + } + + public void testGetSize() throws IOException { + String uniqueDir = "dir" + uniqueSuffix; + + // Binary content + PurePosixPath path = PurePosixPath.from(uniqueDir, "file1.bin"); + byte[] bContent = "Test\ncontent".getBytes(); + ByteArrayInputStream byteStream = new ByteArrayInputStream(bContent); + storage.putObjectStream(path, byteStream); + + long size = storage.getSize(path); + assertEquals(bContent.length, size); + assertThrows(FileNotFoundException.class, () -> storage.getSize(new PurePosixPath(uniqueDir, "inexistent.txt"))); + } + public void testListObjects() throws IOException { String uniqueDir = "dir" + uniqueSuffix; storage.putObject(PurePosixPath.from(uniqueDir, "file1.txt"), "Content 1".getBytes()); @@ -147,27 +243,28 @@ public void testListObjectsWithOver1000keys() throws IOException { } private PurePosixPath ensureDirWith2025Keys() throws IOException { + final String PATH_WITH_2025_KEYS = "test-dir-with-2025-keys/"; var pathWith2025Keys = new PurePosixPath(PATH_WITH_2025_KEYS); List existingKeys = storage.listObjects(pathWith2025Keys); if (existingKeys.isEmpty()) { // Create the directory and add 2025 files - try (ForkJoinPool customThreadPool = new ForkJoinPool(50)) { - try { - customThreadPool.submit(() -> - IntStream.range(0, 2025).parallel().forEach(i -> { - try { - var path = pathWith2025Keys.join("file" + i + ".txt"); - storage.putObject(path, ("Content " + i).getBytes()); - } catch (IOException e) { - throw new RuntimeException(e); - } - }) - ).get(); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - customThreadPool.shutdown(); - } + @SuppressWarnings("resource") + ForkJoinPool customThreadPool = new ForkJoinPool(50); + try { + customThreadPool.submit(() -> + IntStream.range(0, 2025).parallel().forEach(i -> { + try { + var path = pathWith2025Keys.join("file" + i + ".txt"); + storage.putObject(path, ("Content " + i).getBytes()); + } catch (IOException e) { + throw new RuntimeException(e); + } + }) + ).get(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + customThreadPool.shutdown(); } } return pathWith2025Keys; @@ -200,8 +297,8 @@ public void testShallowListObjects() throws IOException { ShallowListing shallowListing = storage.shallowListObjects(new PurePosixPath(uniqueDir)); expectedObjects = List.of(new PurePosixPath(uniqueDir + "file1.txt")); expectedPrefixes = List.of(PurePosixPath.from(uniqueDir + "/")); - assertTrue(shallowListing.getObjects() instanceof List); - assertTrue(shallowListing.getPrefixes() instanceof List); + assertInstanceOf(List.class, shallowListing.getObjects()); + assertInstanceOf(List.class, shallowListing.getPrefixes()); assertIterableEquals(expectedObjects, shallowListing.getObjects()); assertIterableEquals(expectedPrefixes, shallowListing.getPrefixes()); @@ -233,7 +330,7 @@ public void testRemoveObjects() throws IOException { List result = storage.removeObjects(List.of(path1, path2, new PurePosixPath(uniqueDir + "/inexistent.file"))); // Check that the files do not exist - assertTrue(result instanceof List); + assertInstanceOf(List.class, result); assertEquals(List.of(), result); assertFalse(storage.exists(path1)); assertFalse(storage.exists(path2)); @@ -243,6 +340,51 @@ public void testRemoveObjects() throws IOException { // Check that the leftover empty directories are also removed, but the bucket may contain leftovers from the other test runs ShallowListing shallowListing = storage.shallowListObjects(new PurePosixPath("")); List prefixes = shallowListing.getPrefixes(); - assertFalse(prefixes.contains(uniqueDir + "/")); + List prefixStrings = prefixes.stream().map(PurePosixPath::toString).toList(); + assertFalse(prefixStrings.contains(uniqueDir + "/")); + } + + private InputStream generateParquetOutput(int rows) throws IOException { + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.INT32).named("int_field") + .named("TestSchema"); + + // enable writing + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + int pipeBuffer = 4 * 1024 * 1024; // 4 MB internal buffer + PipedInputStream inputStreamToReturn = new PipedInputStream(pipeBuffer); + PipedOutputStream pipedOutputStream = new PipedOutputStream(inputStreamToReturn); + + // wrap pipe in our PositionOutputStream + ParquetUtils.PositionOutputStreamWrapper posOutStream = new ParquetUtils.PositionOutputStreamWrapper(pipedOutputStream); + ParquetUtils.OutputFileWrapper OutputFileWrapper = new ParquetUtils.OutputFileWrapper(posOutStream); + + // start a thread to write Parquet into the pipe + Thread writerThread = new Thread(() -> { + try (ParquetWriter writer = ExampleParquetWriter + .builder(OutputFileWrapper) + .withConf(conf) + .withWriteMode(org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE) + .withType(schema) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + for (int i = 0; i < rows; i++) { + writer.write(factory.newGroup().append("int_field", i)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + writerThread.start(); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + return inputStreamToReturn; } } diff --git a/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java index 7f056cc..fd91799 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/MemoryBucketTest.java @@ -26,6 +26,16 @@ public void testPutAndGetObject() throws IOException { tester.testPutAndGetObject(); } + @Test + void testPutAndGetParquetObjectStream() throws IOException { + tester.testPutAndGetParquetObjectStream(); + } + + @Test + void testGetSize() throws IOException { + tester.testGetSize(); + } + @Test void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); diff --git a/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java new file mode 100644 index 0000000..4958aa7 --- /dev/null +++ b/java/src/test/java/com/esamtrade/bucketbase/ParquetUtils.java @@ -0,0 +1,181 @@ +package com.esamtrade.bucketbase; + +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.SeekableInputStream; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +public class ParquetUtils { + static class StreamInputFile implements InputFile { + private final ObjectStream stream; + private final long length; + + StreamInputFile(ObjectStream stream, long length) { + this.stream = stream; + this.length = length; + } + + @Override + public long getLength() throws IOException { + return length; + } + + @Override + public SeekableInputStream newStream() throws IOException { + return new SeekableInputStreamWrapper(stream.getStream()); + } + } + + public static class SeekableInputStreamWrapper extends SeekableInputStream { + private final InputStream in; + private long pos = 0; + private final int MARK_LIMIT = Integer.MAX_VALUE; + + /** + * @param in the raw InputStream (e.g. S3ObjectInputStream) + */ + public SeekableInputStreamWrapper(InputStream in) { + this.in = in; + in.mark(MARK_LIMIT); + + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void seek(long newPos) throws IOException { + if (newPos < 0) { + throw new IOException("Cannot seek to negative position: " + newPos); + } + // if going backwards, reset to the mark and re‐mark + if (newPos < pos) { + in.reset(); + in.mark(MARK_LIMIT); + pos = 0; + } + // skip forward to the desired position + long toSkip = newPos - pos; + while (toSkip > 0) { + long skipped = in.skip(toSkip); + if (skipped <= 0) { + throw new EOFException("Unable to skip to position " + newPos); + } + toSkip -= skipped; + pos += skipped; + } + } + + @Override + public int read() throws IOException { + int b = in.read(); + if (b >= 0) + pos++; + return b; + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException { + int total = 0; + while (total < len) { + int n = in.read(b, off + total, len - total); + if (n < 0) + throw new EOFException("EOF before filling buffer"); + total += n; + } + pos += len; + } + + @Override + public void readFully(byte[] b) throws IOException { + readFully(b, 0, b.length); + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int toRead = buf.remaining(); + byte[] tmp = new byte[toRead]; + int n = in.read(tmp, 0, toRead); + if (n > 0) { + buf.put(tmp, 0, n); + pos += n; + } + return n; + } + + @Override + public void readFully(ByteBuffer buf) throws IOException { + int toRead = buf.remaining(); + byte[] tmp = new byte[toRead]; + readFully(tmp, 0, toRead); + buf.put(tmp); + } + + @Override + public void close() throws IOException { + in.close(); + } + } + + public static class PositionOutputStreamWrapper extends PositionOutputStream { + private final OutputStream out; + private long pos = 0; + + public PositionOutputStreamWrapper(OutputStream out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + pos++; + } + + @Override + public long getPos() { + return pos; + } + + @Override + public void close() throws IOException { + out.close(); + } + } + + // 2) hands Parquet the above stream when it wants to create the file + public static class OutputFileWrapper implements OutputFile { + private final PositionOutputStreamWrapper posOut; + + public OutputFileWrapper(PositionOutputStreamWrapper posOut) { + this.posOut = posOut; + } + + @Override + public PositionOutputStream create(long blockSizeHint) { + return posOut; + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) { + return posOut; + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 0; + } + } +} \ No newline at end of file diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java index 5317c32..0e8a69d 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketSDKv1Test.java @@ -3,6 +3,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -40,6 +41,17 @@ void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); } + @Test + @Disabled("AWS SDK V1's S3ObjectInputStream does not support mark/reset operations required for Parquet reading") + void testPutAndGetParquetObjectStream() throws IOException { + tester.testPutAndGetParquetObjectStream(); + } + + @Test + void testGetSize() throws IOException { + tester.testGetSize(); + } + @Test void getListObjects() throws IOException { tester.testListObjects(); diff --git a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java index 506523c..d711a42 100644 --- a/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java +++ b/java/src/test/java/com/esamtrade/bucketbase/S3BucketTest.java @@ -9,7 +9,6 @@ class S3BucketTest { - private S3Bucket bucket; private IBucketTester tester; @BeforeAll @@ -21,7 +20,7 @@ public static void setUpClass() { public void setUp() { String accessKey = System.getenv("MINIO_ACCESS_KEY"); String secretKey = System.getenv("MINIO_SECRET_KEY"); - bucket = new S3Bucket("https://minio.esamtrade.vlada.ro", accessKey, secretKey, "minio-dev-tests"); + S3Bucket bucket = new S3Bucket("https://minio.esamtrade.vlada.ro", accessKey, secretKey, "minio-dev-tests"); tester = new IBucketTester(bucket); } @@ -41,6 +40,21 @@ void putObjectAndGetObjectStream() throws IOException { tester.testPutAndGetObjectStream(); } + @Test + void testPutAndGetParquetObjectStream() throws IOException { + tester.testPutAndGetParquetObjectStream(); + } + + @Test + void testPutAndGetMultiUploadObjectStream() throws IOException { + tester.testPutAndGetMultiUploadLargeParquetObjectStream(); + } + + @Test + void testGetSize() throws IOException { + tester.testGetSize(); + } + @Test void getListObjects() throws IOException { tester.testListObjects();