Skip to content

Commit a34b347

Browse files
committed
Merge branch 'main' of github.com:aws/amazon-s3-encryption-client-java into inst-file-put
2 parents ed761d9 + 99cce95 commit a34b347

File tree

10 files changed

+718
-26
lines changed

10 files changed

+718
-26
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# Changelog
22

3+
## [3.3.3](https://github.com/aws/aws-s3-encryption-client-java/compare/v3.3.2...v3.3.3) (2025-05-05)
4+
5+
### Fixes
6+
7+
* fix CipherSubscriber to only call onNext once per request ([#456](https://github.com/aws/aws-s3-encryption-client-java/issues/456)) ([646b735](https://github.com/aws/aws-s3-encryption-client-java/commit/646b735b052ced18a5c01f9d369ac6a81c8e2ce1))
8+
9+
## [3.3.2](https://github.com/aws/aws-s3-encryption-client-java/compare/v3.3.1...v3.3.2) (2025-04-16)
10+
11+
### Fixes
12+
13+
* add builders to S3EncryptionClientException class ([#450](https://github.com/aws/aws-s3-encryption-client-java/issues/450)) ([647c809](https://github.com/aws/aws-s3-encryption-client-java/commit/647c809a0e0cc44abdd0c9bd192a3c78d29fdc71))
14+
* allow CipherSubscriber to determine if the part is last part ([#453](https://github.com/aws/aws-s3-encryption-client-java/issues/453)) ([12355a1](https://github.com/aws/aws-s3-encryption-client-java/commit/12355a11e29e81ebc3c903aaa7caee5a271a0ea8))
15+
316
## [3.3.1](https://github.com/aws/aws-s3-encryption-client-java/compare/v3.3.0...v3.3.1) (2025-01-24)
417

518
### Fixes

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>software.amazon.encryption.s3</groupId>
88
<artifactId>amazon-s3-encryption-client-java</artifactId>
9-
<version>3.3.1</version>
9+
<version>3.3.3</version>
1010
<packaging>jar</packaging>
1111

1212
<name>Amazon S3 Encryption Client</name>

src/main/java/software/amazon/encryption/s3/internal/CipherSubscriber.java

Lines changed: 86 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,26 @@
1111
import javax.crypto.Cipher;
1212
import java.nio.ByteBuffer;
1313
import java.security.GeneralSecurityException;
14+
import java.util.concurrent.atomic.AtomicBoolean;
1415
import java.util.concurrent.atomic.AtomicLong;
1516

1617
public class CipherSubscriber implements Subscriber<ByteBuffer> {
1718
private final AtomicLong contentRead = new AtomicLong(0);
1819
private final Subscriber<? super ByteBuffer> wrappedSubscriber;
19-
private Cipher cipher;
20+
private final Cipher cipher;
2021
private final Long contentLength;
21-
private boolean isLastPart;
22+
private final boolean isLastPart;
23+
private final int tagLength;
24+
private final AtomicBoolean finalBytesCalled = new AtomicBoolean(false);
2225

2326
private byte[] outputBuffer;
2427

2528
CipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv, boolean isLastPart) {
2629
this.wrappedSubscriber = wrappedSubscriber;
2730
this.contentLength = contentLength;
28-
cipher = materials.getCipher(iv);
31+
this.cipher = materials.getCipher(iv);
2932
this.isLastPart = isLastPart;
33+
this.tagLength = materials.algorithmSuite().cipherTagLengthBytes();
3034
}
3135

3236
CipherSubscriber(Subscriber<? super ByteBuffer> wrappedSubscriber, Long contentLength, CryptographicMaterials materials, byte[] iv) {
@@ -46,20 +50,48 @@ public void onNext(ByteBuffer byteBuffer) {
4650
if (amountToReadFromByteBuffer > 0) {
4751
byte[] buf = BinaryUtils.copyBytesFrom(byteBuffer, amountToReadFromByteBuffer);
4852
outputBuffer = cipher.update(buf, 0, amountToReadFromByteBuffer);
53+
4954
if (outputBuffer == null || outputBuffer.length == 0) {
5055
// The underlying data is too short to fill in the block cipher.
5156
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
5257
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
5358
// null OR length == 0.
54-
if (contentRead.get() == contentLength) {
59+
if (contentRead.get() + tagLength >= contentLength) {
5560
// All content has been read, so complete to get the final bytes
56-
this.onComplete();
61+
finalBytes();
62+
return;
5763
}
5864
// Otherwise, wait for more bytes. To avoid blocking,
5965
// send an empty buffer to the wrapped subscriber.
6066
wrappedSubscriber.onNext(ByteBuffer.allocate(0));
6167
} else {
62-
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
68+
/*
69+
Check if stream has read all expected content.
70+
Once all content has been read, call `finalBytes`.
71+
72+
This determines that all content has been read by checking if
73+
the amount of data read so far plus the tag length is at least the content length.
74+
Once this is true, downstream will never call `request` again
75+
(beyond the current request that is being responded to in this onNext invocation.)
76+
As a result, this class can only call `wrappedSubscriber.onNext` one more time.
77+
(Reactive streams require that downstream sends a `request(n)`
78+
to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
79+
The `n` in request is the maximum number of `onNext` calls that downstream
80+
will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
81+
Since this class can only call `wrappedSubscriber.onNext` once,
82+
it must send all remaining data in the next onNext call,
83+
including the result of cipher.doFinal(), if applicable.
84+
Calling `wrappedSubscriber.onNext` more than once for `request(1)`
85+
violates the Reactive Streams specification and can cause exceptions downstream.
86+
*/
87+
if (contentRead.get() + tagLength >= contentLength) {
88+
// All content has been read; complete the stream.
89+
finalBytes();
90+
} else {
91+
// Needs to read more data, so send the data downstream,
92+
// expecting that downstream will continue to request more data.
93+
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
94+
}
6395
}
6496
} else {
6597
// Do nothing
@@ -91,21 +123,63 @@ public void onError(Throwable t) {
91123

92124
@Override
93125
public void onComplete() {
126+
// In rare cases, e.g. when the last part of a low-level MPU has 0 length,
127+
// onComplete will be called before onNext is called once.
128+
if (contentRead.get() + tagLength <= contentLength) {
129+
finalBytes();
130+
}
131+
wrappedSubscriber.onComplete();
132+
}
133+
134+
/**
135+
* Finalize encryption, including calculating the auth tag for AES-GCM.
136+
* As such this method MUST only be called once, which is enforced using
137+
* `finalBytesCalled`.
138+
*/
139+
private void finalBytes() {
140+
if (!finalBytesCalled.compareAndSet(false, true)) {
141+
// already called, don't repeat
142+
return;
143+
}
144+
145+
// If this isn't the last part, skip doFinal and just send outputBuffer downstream.
146+
// doFinal requires that all parts have been processed to compute the tag,
147+
// so the tag will only be computed when the last part is processed.
94148
if (!isLastPart) {
95-
// If this isn't the last part, skip doFinal, we aren't done
96-
wrappedSubscriber.onComplete();
149+
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
97150
return;
98151
}
152+
153+
// If this is the last part, compute doFinal and include its result in the value sent downstream.
154+
// The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
155+
byte[] finalBytes;
99156
try {
100-
outputBuffer = cipher.doFinal();
101-
// Send the final bytes to the wrapped subscriber
102-
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
157+
finalBytes = cipher.doFinal();
103158
} catch (final GeneralSecurityException exception) {
159+
// Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
160+
wrappedSubscriber.onNext(ByteBuffer.wrap(outputBuffer));
104161
// Forward error, else the wrapped subscriber waits indefinitely
105162
wrappedSubscriber.onError(exception);
106163
throw new S3EncryptionClientSecurityException(exception.getMessage(), exception);
107164
}
108-
wrappedSubscriber.onComplete();
165+
166+
// Combine the bytes from outputBuffer and finalBytes into one onNext call.
167+
// Downstream has requested one item in its request method, so this class can only call onNext once.
168+
// This single onNext call must contain both the bytes from outputBuffer and the tag.
169+
byte[] combinedBytes;
170+
if (outputBuffer != null && outputBuffer.length > 0 && finalBytes != null && finalBytes.length > 0) {
171+
combinedBytes = new byte[outputBuffer.length + finalBytes.length];
172+
System.arraycopy(outputBuffer, 0, combinedBytes, 0, outputBuffer.length);
173+
System.arraycopy(finalBytes, 0, combinedBytes, outputBuffer.length, finalBytes.length);
174+
} else if (outputBuffer != null && outputBuffer.length > 0) {
175+
combinedBytes = outputBuffer;
176+
} else if (finalBytes != null && finalBytes.length > 0) {
177+
combinedBytes = finalBytes;
178+
} else {
179+
combinedBytes = new byte[0];
180+
}
181+
182+
wrappedSubscriber.onNext(ByteBuffer.wrap(combinedBytes));
109183
}
110184

111185
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package software.amazon.encryption.s3.internal;
2+
3+
import software.amazon.awssdk.services.s3.model.ChecksumType;
4+
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
5+
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
6+
import java.time.Instant;
7+
import java.util.Map;
8+
9+
public class ConvertSDKRequests {
10+
11+
public static CreateMultipartUploadRequest convert(PutObjectRequest request) {
12+
13+
final CreateMultipartUploadRequest.Builder output = CreateMultipartUploadRequest.builder();
14+
request
15+
.toBuilder()
16+
.sdkFields()
17+
.forEach(f -> {
18+
final Object value = f.getValueOrDefault(request);
19+
if (value != null) {
20+
switch (f.memberName()) {
21+
case "ACL":
22+
output.acl((String) value);
23+
break;
24+
case "Bucket":
25+
output.bucket((String) value);
26+
break;
27+
case "BucketKeyEnabled":
28+
output.bucketKeyEnabled((Boolean) value);
29+
break;
30+
case "CacheControl":
31+
output.cacheControl((String) value);
32+
break;
33+
case "ChecksumAlgorithm":
34+
output.checksumAlgorithm((String) value);
35+
break;
36+
case "ChecksumType":
37+
output.checksumType((ChecksumType) value);
38+
case "ContentDisposition":
39+
assert value instanceof String;
40+
output.contentDisposition((String) value);
41+
break;
42+
case "ContentEncoding":
43+
output.contentEncoding((String) value);
44+
break;
45+
case "ContentLanguage":
46+
output.contentLanguage((String) value);
47+
break;
48+
case "ContentType":
49+
output.contentType((String) value);
50+
break;
51+
case "ExpectedBucketOwner":
52+
output.expectedBucketOwner((String) value);
53+
break;
54+
case "Expires":
55+
output.expires((Instant) value);
56+
break;
57+
case "GrantFullControl":
58+
output.grantFullControl((String) value);
59+
break;
60+
case "GrantRead":
61+
output.grantRead((String) value);
62+
break;
63+
case "GrantReadACP":
64+
output.grantReadACP((String) value);
65+
break;
66+
case "GrantWriteACP":
67+
output.grantWriteACP((String) value);
68+
break;
69+
case "Key":
70+
output.key((String) value);
71+
break;
72+
case "Metadata":
73+
// The PutObjectRequest.builder().metadata(value)
74+
// only takes Map<String, String> therefore it should not be possible
75+
// to get here with anything other than a Map<String, String>
76+
// This may be overkill, but this map should be small
77+
// so the performance hit to verify this is worth the correctness.
78+
if (!isStringStringMap(value)) {
79+
throw new IllegalArgumentException("Metadata must be a Map<String, String>");
80+
}
81+
@SuppressWarnings("unchecked")
82+
Map<String, String> metadata = (Map<String, String>) value;
83+
output.metadata(metadata);
84+
break;
85+
case "ObjectLockLegalHoldStatus":
86+
output.objectLockLegalHoldStatus((String) value);
87+
break;
88+
case "ObjectLockMode":
89+
output.objectLockMode((String) value);
90+
break;
91+
case "ObjectLockRetainUntilDate":
92+
output.objectLockRetainUntilDate((Instant) value);
93+
break;
94+
case "RequestPayer":
95+
output.requestPayer((String) value);
96+
break;
97+
case "ServerSideEncryption":
98+
output.serverSideEncryption((String) value);
99+
break;
100+
case "SSECustomerAlgorithm":
101+
output.sseCustomerAlgorithm((String) value);
102+
break;
103+
case "SSECustomerKey":
104+
output.sseCustomerKey((String) value);
105+
break;
106+
case "SSEKMSEncryptionContext":
107+
output.ssekmsEncryptionContext((String) value);
108+
break;
109+
case "SSEKMSKeyId":
110+
output.ssekmsKeyId((String) value);
111+
break;
112+
case "StorageClass":
113+
output.storageClass((String) value);
114+
break;
115+
case "Tagging":
116+
output.tagging((String) value);
117+
break;
118+
case "WebsiteRedirectLocation":
119+
output.websiteRedirectLocation((String) value);
120+
break;
121+
default:
122+
// Rather than silently dropping the value,
123+
// we loudly signal that we don't know how to handle this field.
124+
throw new IllegalArgumentException("Unknown PutObjectRequest field " + f.locationName() + ".");
125+
}
126+
}
127+
});
128+
return output
129+
// OverrideConfiguration is not as SDKField but still needs to be supported
130+
.overrideConfiguration(request.overrideConfiguration().orElse(null))
131+
.build();
132+
}
133+
134+
private static boolean isStringStringMap(Object value) {
135+
if (!(value instanceof Map)) {
136+
return false;
137+
}
138+
Map<?, ?> map = (Map<?, ?>) value;
139+
return map.entrySet().stream()
140+
.allMatch(entry -> entry != null
141+
&& ((Map.Entry<?, ?>) entry).getKey() instanceof String
142+
&& ((Map.Entry<?, ?>) entry).getValue() instanceof String);
143+
}
144+
}

src/main/java/software/amazon/encryption/s3/internal/UploadObjectObserver.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import software.amazon.awssdk.services.s3.S3AsyncClient;
88
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
99
import software.amazon.awssdk.services.s3.model.CompletedPart;
10-
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
1110
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
1211
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
1312
import software.amazon.awssdk.services.s3.model.SdkPartType;
@@ -42,20 +41,10 @@ public UploadObjectObserver init(PutObjectRequest req,
4241
this.es = es;
4342
return this;
4443
}
45-
46-
protected CreateMultipartUploadRequest newCreateMultipartUploadRequest(
47-
PutObjectRequest request) {
48-
return CreateMultipartUploadRequest.builder()
49-
.bucket(request.bucket())
50-
.key(request.key())
51-
.metadata(request.metadata())
52-
.overrideConfiguration(request.overrideConfiguration().orElse(null))
53-
.build();
54-
}
55-
44+
5645
public String onUploadCreation(PutObjectRequest req) {
5746
CreateMultipartUploadResponse res =
58-
s3EncryptionClient.createMultipartUpload(newCreateMultipartUploadRequest(req));
47+
s3EncryptionClient.createMultipartUpload(ConvertSDKRequests.convert(req));
5948
return this.uploadId = res.uploadId();
6049
}
6150

src/test/java/software/amazon/encryption/s3/S3AsyncEncryptionClientTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,8 @@ public void copyObjectTransparentlyAsync() {
729729
* Test which artificially limits the size of buffers using {@link TinyBufferAsyncRequestBody}.
730730
* This tests edge cases where network conditions result in buffers with length shorter than
731731
* the cipher's block size.
732+
* Note that TinyAsyncRequestBody is not fully spec-compliant, and will cause IllegalStateExceptions
733+
* to be logged when debug logging is enabled.
732734
* @throws IOException
733735
*/
734736
@Test

0 commit comments

Comments
 (0)