11
11
import javax .crypto .Cipher ;
12
12
import java .nio .ByteBuffer ;
13
13
import java .security .GeneralSecurityException ;
14
+ import java .util .concurrent .atomic .AtomicBoolean ;
14
15
import java .util .concurrent .atomic .AtomicLong ;
15
16
16
17
public class CipherSubscriber implements Subscriber <ByteBuffer > {
17
18
private final AtomicLong contentRead = new AtomicLong (0 );
18
19
private final Subscriber <? super ByteBuffer > wrappedSubscriber ;
19
- private Cipher cipher ;
20
+ private final Cipher cipher ;
20
21
private final Long contentLength ;
21
- private boolean isLastPart ;
22
+ private final boolean isLastPart ;
23
+ private final int tagLength ;
24
+ private final AtomicBoolean finalBytesCalled = new AtomicBoolean (false );
22
25
23
26
private byte [] outputBuffer ;
24
27
25
28
CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv , boolean isLastPart ) {
26
29
this .wrappedSubscriber = wrappedSubscriber ;
27
30
this .contentLength = contentLength ;
28
- cipher = materials .getCipher (iv );
31
+ this . cipher = materials .getCipher (iv );
29
32
this .isLastPart = isLastPart ;
33
+ this .tagLength = materials .algorithmSuite ().cipherTagLengthBytes ();
30
34
}
31
35
32
36
CipherSubscriber (Subscriber <? super ByteBuffer > wrappedSubscriber , Long contentLength , CryptographicMaterials materials , byte [] iv ) {
@@ -46,20 +50,48 @@ public void onNext(ByteBuffer byteBuffer) {
46
50
if (amountToReadFromByteBuffer > 0 ) {
47
51
byte [] buf = BinaryUtils .copyBytesFrom (byteBuffer , amountToReadFromByteBuffer );
48
52
outputBuffer = cipher .update (buf , 0 , amountToReadFromByteBuffer );
53
+
49
54
if (outputBuffer == null || outputBuffer .length == 0 ) {
50
55
// The underlying data is too short to fill in the block cipher.
51
56
// Note that while the JCE Javadoc specifies that the outputBuffer is null in this case,
52
57
// in practice SunJCE and ACCP return an empty buffer instead, hence checks for
53
58
// null OR length == 0.
54
- if (contentRead .get () = = contentLength ) {
59
+ if (contentRead .get () + tagLength > = contentLength ) {
55
60
// All content has been read, so complete to get the final bytes
56
- this .onComplete ();
61
+ finalBytes ();
62
+ return ;
57
63
}
58
64
// Otherwise, wait for more bytes. To avoid blocking,
59
65
// send an empty buffer to the wrapped subscriber.
60
66
wrappedSubscriber .onNext (ByteBuffer .allocate (0 ));
61
67
} else {
62
- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
68
+ /*
69
+ Check if stream has read all expected content.
70
+ Once all content has been read, call `finalBytes`.
71
+
72
+ This determines that all content has been read by checking if
73
+ the amount of data read so far plus the tag length is at least the content length.
74
+ Once this is true, downstream will never call `request` again
75
+ (beyond the current request that is being responded to in this onNext invocation.)
76
+ As a result, this class can only call `wrappedSubscriber.onNext` one more time.
77
+ (Reactive streams require that downstream sends a `request(n)`
78
+ to indicate it is ready for more data, and upstream responds to that request by calling `onNext`.
79
+ The `n` in request is the maximum number of `onNext` calls that downstream
80
+ will allow upstream to make, and seems to always be 1 for the AsyncBodySubscriber.)
81
+ Since this class can only call `wrappedSubscriber.onNext` once,
82
+ it must send all remaining data in the next onNext call,
83
+ including the result of cipher.doFinal(), if applicable.
84
+ Calling `wrappedSubscriber.onNext` more than once for `request(1)`
85
+ violates the Reactive Streams specification and can cause exceptions downstream.
86
+ */
87
+ if (contentRead .get () + tagLength >= contentLength ) {
88
+ // All content has been read; complete the stream.
89
+ finalBytes ();
90
+ } else {
91
+ // Needs to read more data, so send the data downstream,
92
+ // expecting that downstream will continue to request more data.
93
+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
94
+ }
63
95
}
64
96
} else {
65
97
// Do nothing
@@ -91,21 +123,63 @@ public void onError(Throwable t) {
91
123
92
124
@ Override
93
125
public void onComplete () {
126
+ // In rare cases, e.g. when the last part of a low-level MPU has 0 length,
127
+ // onComplete will be called before onNext is called once.
128
+ if (contentRead .get () + tagLength <= contentLength ) {
129
+ finalBytes ();
130
+ }
131
+ wrappedSubscriber .onComplete ();
132
+ }
133
+
134
+ /**
135
+ * Finalize encryption, including calculating the auth tag for AES-GCM.
136
+ * As such this method MUST only be called once, which is enforced using
137
+ * `finalBytesCalled`.
138
+ */
139
+ private void finalBytes () {
140
+ if (!finalBytesCalled .compareAndSet (false , true )) {
141
+ // already called, don't repeat
142
+ return ;
143
+ }
144
+
145
+ // If this isn't the last part, skip doFinal and just send outputBuffer downstream.
146
+ // doFinal requires that all parts have been processed to compute the tag,
147
+ // so the tag will only be computed when the last part is processed.
94
148
if (!isLastPart ) {
95
- // If this isn't the last part, skip doFinal, we aren't done
96
- wrappedSubscriber .onComplete ();
149
+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
97
150
return ;
98
151
}
152
+
153
+ // If this is the last part, compute doFinal and include its result in the value sent downstream.
154
+ // The result of doFinal MUST be included with the bytes that were in outputBuffer in the final onNext call.
155
+ byte [] finalBytes ;
99
156
try {
100
- outputBuffer = cipher .doFinal ();
101
- // Send the final bytes to the wrapped subscriber
102
- wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
157
+ finalBytes = cipher .doFinal ();
103
158
} catch (final GeneralSecurityException exception ) {
159
+ // Even if doFinal fails, downstream still expects to receive the bytes that were in outputBuffer
160
+ wrappedSubscriber .onNext (ByteBuffer .wrap (outputBuffer ));
104
161
// Forward error, else the wrapped subscriber waits indefinitely
105
162
wrappedSubscriber .onError (exception );
106
163
throw new S3EncryptionClientSecurityException (exception .getMessage (), exception );
107
164
}
108
- wrappedSubscriber .onComplete ();
165
+
166
+ // Combine the bytes from outputBuffer and finalBytes into one onNext call.
167
+ // Downstream has requested one item in its request method, so this class can only call onNext once.
168
+ // This single onNext call must contain both the bytes from outputBuffer and the tag.
169
+ byte [] combinedBytes ;
170
+ if (outputBuffer != null && outputBuffer .length > 0 && finalBytes != null && finalBytes .length > 0 ) {
171
+ combinedBytes = new byte [outputBuffer .length + finalBytes .length ];
172
+ System .arraycopy (outputBuffer , 0 , combinedBytes , 0 , outputBuffer .length );
173
+ System .arraycopy (finalBytes , 0 , combinedBytes , outputBuffer .length , finalBytes .length );
174
+ } else if (outputBuffer != null && outputBuffer .length > 0 ) {
175
+ combinedBytes = outputBuffer ;
176
+ } else if (finalBytes != null && finalBytes .length > 0 ) {
177
+ combinedBytes = finalBytes ;
178
+ } else {
179
+ combinedBytes = new byte [0 ];
180
+ }
181
+
182
+ wrappedSubscriber .onNext (ByteBuffer .wrap (combinedBytes ));
109
183
}
110
184
111
185
}
0 commit comments