Skip to content

Added test for read/writing Parquet files as streams to IBucket #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,21 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<!-- Parquet dependencies -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't parquet-hadoop enough? Do we need the whole bundle?

<version>1.15.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.4.1</version>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
3 changes: 2 additions & 1 deletion java/src/main/java/com/esamtrade/bucketbase/IBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -28,6 +27,8 @@ public interface IBucket {

ObjectStream getObjectStream(PurePosixPath name) throws IOException;

long getSize(PurePosixPath name) throws IOException;

List<PurePosixPath> listObjects(PurePosixPath prefix) throws IOException;

ShallowListing shallowListObjects(PurePosixPath prefix) throws IOException;
Expand Down
6 changes: 6 additions & 0 deletions java/src/main/java/com/esamtrade/bucketbase/MemoryBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ public ObjectStream getObjectStream(PurePosixPath name) throws FileNotFoundExcep
return new ObjectStream(new ByteArrayInputStream(content), name.toString());
}

@Override
public long getSize(PurePosixPath name) throws IOException {
byte[] content = getObject(name);
return content.length;
}

@Override
public List<PurePosixPath> listObjects(PurePosixPath prefix) {
splitPrefix(prefix); // validate prefix
Expand Down
129 changes: 97 additions & 32 deletions java/src/main/java/com/esamtrade/bucketbase/S3Bucket.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.esamtrade.bucketbase;

import com.amazonaws.SdkClientException;
import org.apache.commons.codec.digest.DigestUtils;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
Expand All @@ -11,11 +13,13 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.List;
Expand All @@ -24,20 +28,27 @@
import java.util.stream.Collectors;

public class S3Bucket extends BaseBucket {

private final static int DEFAULT_BUF_SIZE = 8 * 1024;
private final static int DEFAULT_UPLOAD_PART_SIZE = 5 * 1024 * 1024;
protected final int BUF_SIZE; // 8 KB by default
protected S3Client s3Client;
protected S3AsyncClient s3AsyncClient;
protected String bucketName;


public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName) {
public S3Bucket(S3Client s3Client, S3AsyncClient s3AsyncClient, String bucketName, int bufSize) {
this.s3Client = s3Client;
this.s3AsyncClient = s3AsyncClient;
this.bucketName = bucketName;
this.BUF_SIZE = bufSize;
}

public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName) {
this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName);
this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, DEFAULT_BUF_SIZE);
}

public S3Bucket(String endpoint, String accessKey, String secretKey, String bucketName, int bufSize) {
this(createS3Client(endpoint, accessKey, secretKey), createS3AsyncClient(endpoint, accessKey, secretKey), bucketName, bufSize);
}

private static S3Client createS3Client(String endpoint, String accessKey, String secretKey) {
Expand Down Expand Up @@ -70,26 +81,40 @@ public void putObject(PurePosixPath name, byte[] content) {
}

@Override
public void putObjectStream(PurePosixPath name, InputStream stream) {
public void putObjectStream(PurePosixPath name, InputStream stream) throws IOException {
String _name = validateName(name);
try {
uploadLargeStream(_name, stream);
} catch (Exception e) {
throw new RuntimeException("Failed to upload object: " + _name, e);
throw new IOException("Failed to upload object: " + _name, e);
} finally {
s3AsyncClient.close();
}
}


private void uploadLargeStream(String key, InputStream inputStream) {
int partSize = 5 * 1024 * 1024; // 5 MB
private void uploadLargeStream(String key, InputStream inputStream) throws IOException {
// Please note, that if the input stream will have less than 5MB, the S3 multipart upload throws 400 code (upload is smaller than the minimum allowed object size)
List<CompletedPart> completedParts = new ArrayList<>();
byte[] buffer = new byte[partSize];
byte[] buffer = new byte[DEFAULT_UPLOAD_PART_SIZE];
int bytesRead;
int partNumber = 1;

// 1. Initiate the multipart upload
bytesRead = readUploadBuffer(inputStream, buffer);
if (bytesRead == 0) {
// Empty stream, create empty object
this.putObject(PurePosixPath.from(key), new byte[0]);
return;
}

if (bytesRead < DEFAULT_UPLOAD_PART_SIZE) {
byte[] content = Arrays.copyOf(buffer, bytesRead);
// Small file, use regular putObject to avoid multipart upload being rejected
this.putObject(PurePosixPath.from(key), content);
return;
}

// 1. Initiate the multipart upload for large files
CreateMultipartUploadRequest createMultipartUploadRequest = CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(key)
Expand All @@ -98,22 +123,16 @@ private void uploadLargeStream(String key, InputStream inputStream) {
String uploadId = response.uploadId();

try {
// 2. Read the input stream and upload each part
while ((bytesRead = inputStream.read(buffer)) != -1) {
byte[] bytesToUpload = (bytesRead < partSize) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer;
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
.contentLength((long) bytesRead)
.build();
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload);
CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody);
completedParts.add(CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.join().eTag())
.build());
// Upload the first buffer we already read
CompletedPart firstPart = uploadPart(key, uploadId, partNumber, bytesRead, buffer);
completedParts.add(firstPart);
partNumber++;

// 2. Continue reading and uploading remaining parts
while ((bytesRead = readUploadBuffer(inputStream, buffer)) != 0) {
byte[] bytesToUpload = (bytesRead < DEFAULT_UPLOAD_PART_SIZE) ? java.util.Arrays.copyOf(buffer, bytesRead) : buffer;
CompletedPart completedPart = uploadPart(key, uploadId, partNumber, bytesRead, bytesToUpload);
completedParts.add(completedPart);
partNumber++;
}

Expand All @@ -133,10 +152,43 @@ private void uploadLargeStream(String key, InputStream inputStream) {
.uploadId(uploadId)
.build();
s3AsyncClient.abortMultipartUpload(abortMultipartUploadRequest).join();
throw new RuntimeException("Failed to upload object: " + key, e);
throw new IOException("Failed to upload object: " + key, e);
}
}

private int readUploadBuffer(InputStream inputStream, byte[] buffer) throws IOException {
int totalBytesRead = inputStream.read(buffer);

if (totalBytesRead == -1) {
return 0;
}

if (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE) {
int bytesRead;
while (totalBytesRead < DEFAULT_UPLOAD_PART_SIZE && (bytesRead = inputStream.read(buffer, totalBytesRead, DEFAULT_UPLOAD_PART_SIZE - totalBytesRead)) != -1) {
totalBytesRead += bytesRead;
}
}

return totalBytesRead;
}

private CompletedPart uploadPart(String key, String uploadId, int partNumber, long bytesRead, byte[] bytesToUpload) {
UploadPartRequest uploadPartRequest = UploadPartRequest.builder()
.bucket(bucketName)
.key(key)
.uploadId(uploadId)
.partNumber(partNumber)
.contentLength(bytesRead)
.build();
AsyncRequestBody requestBody = AsyncRequestBody.fromBytes(bytesToUpload);
CompletableFuture<UploadPartResponse> uploadPartResponse = s3AsyncClient.uploadPart(uploadPartRequest, requestBody);
return CompletedPart.builder()
.partNumber(partNumber)
.eTag(uploadPartResponse.join().eTag())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if you join() here on the future response, you choose to perform the uploads sequentially. Is this the intention?

.build();
}


@Override
public byte[] getObject(PurePosixPath name) throws IOException {
Expand All @@ -159,14 +211,29 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException {
.key(name.toString())
.build();
InputStream inputStream = s3Client.getObject(request, ResponseTransformer.toInputStream());
return new ObjectStream(inputStream, name.toString());
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, BUF_SIZE);
return new ObjectStream(bufferedInputStream, name.toString());
} catch (NoSuchKeyException e) {
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
}
}


@Override
public long getSize(PurePosixPath name) throws IOException {
try {
HeadObjectRequest request = HeadObjectRequest.builder()
.bucket(bucketName)
.key(name.toString())
.build();
HeadObjectResponse response = s3Client.headObject(request);
return response.contentLength();
} catch (NoSuchKeyException e) {
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
} catch (AwsServiceException | SdkClientException e) {
throw new IOException("Failed to get object size: " + name, e);
}
}

/**
* Lists all objects in the S3 bucket with the given prefix.
*
Expand All @@ -175,17 +242,15 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException {
* @param prefix The prefix to filter objects by.
* @return A list of paths to the objects in the bucket.
*/
@Override
public List<PurePosixPath> listObjects(PurePosixPath prefix) {
splitPrefix(prefix); // validate prefix
List<PurePosixPath> result = new ArrayList<>();
ListObjectsV2Request request = ListObjectsV2Request.builder()
.bucket(bucketName)
.prefix(prefix.toString())
.build();

List<PurePosixPath> results = s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList();

return results;
return s3Client.listObjectsV2Paginator(request).contents().stream().map(S3Object::key).map(PurePosixPath::from).toList();
}

@Override
Expand Down
14 changes: 14 additions & 0 deletions java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.esamtrade.bucketbase;

import com.amazonaws.SdkClientException;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
Expand Down Expand Up @@ -82,6 +83,19 @@ public ObjectStream getObjectStream(PurePosixPath name) throws IOException {
return new ObjectStream(inputStream, name.toString());
}

@Override
public long getSize(PurePosixPath name) throws IOException {
try {
return s3Client.getObjectMetadata(bucketName, name.toString()).getContentLength();
}
catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404)
throw new FileNotFoundException("Object " + name + " not found in S3 bucket " + bucketName);
throw new IOException("Failed to get object metadata: " + e.getMessage(), e);
} catch (SdkClientException e) {
throw new IOException("Failed to get object metadata: " + e.getMessage(), e);
}
}

/**
* Retrieves a list of object paths stored in the bucket that match the given prefix.
Expand Down
Loading