Skip to content

Commit 077dd35

Browse files
authored
Merge branch 'apache:main' into cloudwatch
2 parents 157fccd + d22078f commit 077dd35

File tree

9 files changed

+204
-142
lines changed

9 files changed

+204
-142
lines changed

.github/workflows/nightly.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
if: github.repository_owner == 'apache'
2626
strategy:
2727
matrix:
28-
flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT]
28+
flink: [1.20-SNAPSHOT]
2929
java: [ '8, 11, 17']
3030
uses: ./.github/workflows/common.yml
3131
with:
@@ -38,7 +38,7 @@ jobs:
3838
python_test:
3939
strategy:
4040
matrix:
41-
flink: [1.19-SNAPSHOT, 1.20-SNAPSHOT]
41+
flink: [1.20-SNAPSHOT]
4242
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
4343
with:
4444
flink_version: ${{ matrix.flink }}

.github/workflows/push_pr.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ jobs:
2626
uses: ./.github/workflows/common.yml
2727
strategy:
2828
matrix:
29-
flink: [1.19.1, 1.20.0]
29+
flink: [1.20.0]
3030
java: [ '8, 11, 17']
3131
with:
3232
flink_version: ${{ matrix.flink }}
@@ -38,7 +38,7 @@ jobs:
3838
python_test:
3939
strategy:
4040
matrix:
41-
flink: [1.19.0, 1.20.0]
41+
flink: [1.20.0]
4242
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
4343
with:
4444
flink_version: ${{ matrix.flink }}

flink-connector-aws/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
2727
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2828
import org.apache.flink.connector.base.sink.writer.ElementConverter;
29+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
2930
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
3031
import org.apache.flink.metrics.Counter;
3132
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -45,7 +46,6 @@
4546
import java.util.List;
4647
import java.util.Properties;
4748
import java.util.concurrent.CompletableFuture;
48-
import java.util.function.Consumer;
4949

5050
import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
5151
import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -171,7 +171,7 @@ private static FirehoseAsyncClient createFirehoseClient(
171171

172172
@Override
173173
protected void submitRequestEntries(
174-
List<Record> requestEntries, Consumer<List<Record>> requestResult) {
174+
List<Record> requestEntries, ResultHandler<Record> resultHandler) {
175175

176176
PutRecordBatchRequest batchRequest =
177177
PutRecordBatchRequest.builder()
@@ -185,11 +185,11 @@ protected void submitRequestEntries(
185185
future.whenComplete(
186186
(response, err) -> {
187187
if (err != null) {
188-
handleFullyFailedRequest(err, requestEntries, requestResult);
188+
handleFullyFailedRequest(err, requestEntries, resultHandler);
189189
} else if (response.failedPutCount() > 0) {
190-
handlePartiallyFailedRequest(response, requestEntries, requestResult);
190+
handlePartiallyFailedRequest(response, requestEntries, resultHandler);
191191
} else {
192-
requestResult.accept(Collections.emptyList());
192+
resultHandler.complete();
193193
}
194194
});
195195
}
@@ -205,35 +205,37 @@ public void close() {
205205
}
206206

207207
private void handleFullyFailedRequest(
208-
Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
208+
Throwable err, List<Record> requestEntries, ResultHandler<Record> resultHandler) {
209209

210210
numRecordsOutErrorsCounter.inc(requestEntries.size());
211-
boolean isFatal = FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(err, getFatalExceptionCons());
211+
boolean isFatal =
212+
FIREHOSE_EXCEPTION_HANDLER.consumeIfFatal(
213+
err, resultHandler::completeExceptionally);
212214
if (isFatal) {
213215
return;
214216
}
215217

216218
if (failOnError) {
217-
getFatalExceptionCons()
218-
.accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err));
219+
resultHandler.completeExceptionally(
220+
new KinesisFirehoseException.KinesisFirehoseFailFastException(err));
219221
return;
220222
}
221223

222224
LOG.warn(
223225
"KDF Sink failed to write and will retry {} entries to KDF",
224226
requestEntries.size(),
225227
err);
226-
requestResult.accept(requestEntries);
228+
resultHandler.retryForEntries(requestEntries);
227229
}
228230

229231
private void handlePartiallyFailedRequest(
230232
PutRecordBatchResponse response,
231233
List<Record> requestEntries,
232-
Consumer<List<Record>> requestResult) {
234+
ResultHandler<Record> resultHandler) {
233235
numRecordsOutErrorsCounter.inc(response.failedPutCount());
234236
if (failOnError) {
235-
getFatalExceptionCons()
236-
.accept(new KinesisFirehoseException.KinesisFirehoseFailFastException());
237+
resultHandler.completeExceptionally(
238+
new KinesisFirehoseException.KinesisFirehoseFailFastException());
237239
return;
238240
}
239241

@@ -248,6 +250,6 @@ private void handlePartiallyFailedRequest(
248250
}
249251
}
250252

251-
requestResult.accept(failedRequestEntries);
253+
resultHandler.retryForEntries(failedRequestEntries);
252254
}
253255
}

flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
2626
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2727
import org.apache.flink.connector.base.sink.writer.ElementConverter;
28+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
2829
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
2930
import org.apache.flink.connector.base.sink.writer.strategy.AIMDScalingStrategy;
3031
import org.apache.flink.connector.base.sink.writer.strategy.CongestionControlRateLimitingStrategy;
@@ -48,7 +49,6 @@
4849
import java.util.List;
4950
import java.util.Properties;
5051
import java.util.concurrent.CompletableFuture;
51-
import java.util.function.Consumer;
5252

5353
import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
5454
import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier;
@@ -199,7 +199,7 @@ private static RateLimitingStrategy buildRateLimitingStrategy(
199199
@Override
200200
protected void submitRequestEntries(
201201
List<PutRecordsRequestEntry> requestEntries,
202-
Consumer<List<PutRecordsRequestEntry>> requestResult) {
202+
ResultHandler<PutRecordsRequestEntry> resultHandler) {
203203

204204
PutRecordsRequest batchRequest =
205205
PutRecordsRequest.builder()
@@ -213,11 +213,11 @@ protected void submitRequestEntries(
213213
future.whenComplete(
214214
(response, err) -> {
215215
if (err != null) {
216-
handleFullyFailedRequest(err, requestEntries, requestResult);
216+
handleFullyFailedRequest(err, requestEntries, resultHandler);
217217
} else if (response.failedRecordCount() > 0) {
218-
handlePartiallyFailedRequest(response, requestEntries, requestResult);
218+
handlePartiallyFailedRequest(response, requestEntries, resultHandler);
219219
} else {
220-
requestResult.accept(Collections.emptyList());
220+
resultHandler.complete();
221221
}
222222
});
223223
}
@@ -230,15 +230,15 @@ protected long getSizeInBytes(PutRecordsRequestEntry requestEntry) {
230230
private void handleFullyFailedRequest(
231231
Throwable err,
232232
List<PutRecordsRequestEntry> requestEntries,
233-
Consumer<List<PutRecordsRequestEntry>> requestResult) {
233+
ResultHandler<PutRecordsRequestEntry> resultHandler) {
234234
LOG.warn(
235235
"KDS Sink failed to write and will retry {} entries to KDS",
236236
requestEntries.size(),
237237
err);
238238
numRecordsOutErrorsCounter.inc(requestEntries.size());
239239

240-
if (isRetryable(err)) {
241-
requestResult.accept(requestEntries);
240+
if (isRetryable(err, resultHandler)) {
241+
resultHandler.retryForEntries(requestEntries);
242242
}
243243
}
244244

@@ -250,15 +250,15 @@ public void close() {
250250
private void handlePartiallyFailedRequest(
251251
PutRecordsResponse response,
252252
List<PutRecordsRequestEntry> requestEntries,
253-
Consumer<List<PutRecordsRequestEntry>> requestResult) {
253+
ResultHandler<PutRecordsRequestEntry> resultHandler) {
254254
LOG.warn(
255255
"KDS Sink failed to write and will retry {} entries to KDS",
256256
response.failedRecordCount());
257257
numRecordsOutErrorsCounter.inc(response.failedRecordCount());
258258

259259
if (failOnError) {
260-
getFatalExceptionCons()
261-
.accept(new KinesisStreamsException.KinesisStreamsFailFastException());
260+
resultHandler.completeExceptionally(
261+
new KinesisStreamsException.KinesisStreamsFailFastException());
262262
return;
263263
}
264264
List<PutRecordsRequestEntry> failedRequestEntries =
@@ -271,17 +271,19 @@ private void handlePartiallyFailedRequest(
271271
}
272272
}
273273

274-
requestResult.accept(failedRequestEntries);
274+
resultHandler.retryForEntries(failedRequestEntries);
275275
}
276276

277-
private boolean isRetryable(Throwable err) {
277+
private boolean isRetryable(
278+
Throwable err, ResultHandler<PutRecordsRequestEntry> resultHandler) {
278279

279-
if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
280+
if (!KINESIS_FATAL_EXCEPTION_CLASSIFIER.isFatal(
281+
err, resultHandler::completeExceptionally)) {
280282
return false;
281283
}
282284
if (failOnError) {
283-
getFatalExceptionCons()
284-
.accept(new KinesisStreamsException.KinesisStreamsFailFastException(err));
285+
resultHandler.completeExceptionally(
286+
new KinesisStreamsException.KinesisStreamsFailFastException(err));
285287
return false;
286288
}
287289

flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
2626
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2727
import org.apache.flink.connector.base.sink.writer.ElementConverter;
28+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
2829
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
2930
import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder;
3031
import org.apache.flink.metrics.Counter;
@@ -45,12 +46,10 @@
4546

4647
import java.util.ArrayList;
4748
import java.util.Collection;
48-
import java.util.Collections;
4949
import java.util.HashMap;
5050
import java.util.List;
5151
import java.util.Map;
5252
import java.util.concurrent.CompletableFuture;
53-
import java.util.function.Consumer;
5453

5554
import static java.util.Collections.singletonMap;
5655
import static org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier;
@@ -160,7 +159,7 @@ public DynamoDbSinkWriter(
160159
@Override
161160
protected void submitRequestEntries(
162161
List<DynamoDbWriteRequest> requestEntries,
163-
Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {
162+
ResultHandler<DynamoDbWriteRequest> resultHandler) {
164163

165164
List<WriteRequest> items = new ArrayList<>();
166165

@@ -190,17 +189,17 @@ protected void submitRequestEntries(
190189
future.whenComplete(
191190
(response, err) -> {
192191
if (err != null) {
193-
handleFullyFailedRequest(err, requestEntries, requestResultConsumer);
192+
handleFullyFailedRequest(err, requestEntries, resultHandler);
194193
} else if (!CollectionUtil.isNullOrEmpty(response.unprocessedItems())) {
195-
handlePartiallyUnprocessedRequest(response, requestResultConsumer);
194+
handlePartiallyUnprocessedRequest(response, resultHandler);
196195
} else {
197-
requestResultConsumer.accept(Collections.emptyList());
196+
resultHandler.complete();
198197
}
199198
});
200199
}
201200

202201
private void handlePartiallyUnprocessedRequest(
203-
BatchWriteItemResponse response, Consumer<List<DynamoDbWriteRequest>> requestResult) {
202+
BatchWriteItemResponse response, ResultHandler<DynamoDbWriteRequest> resultHandler) {
204203
List<DynamoDbWriteRequest> unprocessed = new ArrayList<>();
205204

206205
for (WriteRequest writeRequest : response.unprocessedItems().get(tableName)) {
@@ -211,32 +210,33 @@ private void handlePartiallyUnprocessedRequest(
211210
numRecordsSendErrorsCounter.inc(unprocessed.size());
212211
numRecordsSendPartialFailure.inc(unprocessed.size());
213212

214-
requestResult.accept(unprocessed);
213+
resultHandler.retryForEntries(unprocessed);
215214
}
216215

217216
private void handleFullyFailedRequest(
218217
Throwable err,
219218
List<DynamoDbWriteRequest> requestEntries,
220-
Consumer<List<DynamoDbWriteRequest>> requestResult) {
219+
ResultHandler<DynamoDbWriteRequest> resultHandler) {
221220
LOG.warn(
222221
"DynamoDB Sink failed to persist and will retry {} entries.",
223222
requestEntries.size(),
224223
err);
225224
numRecordsSendErrorsCounter.inc(requestEntries.size());
226225

227-
if (isRetryable(err.getCause())) {
228-
requestResult.accept(requestEntries);
226+
if (isRetryable(err.getCause(), resultHandler)) {
227+
resultHandler.retryForEntries(requestEntries);
229228
}
230229
}
231230

232-
private boolean isRetryable(Throwable err) {
231+
private boolean isRetryable(Throwable err, ResultHandler<DynamoDbWriteRequest> resultHandler) {
233232
// isFatal() is really isNotFatal()
234-
if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, getFatalExceptionCons())) {
233+
if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(
234+
err, resultHandler::completeExceptionally)) {
235235
return false;
236236
}
237237
if (failOnError) {
238-
getFatalExceptionCons()
239-
.accept(new DynamoDbSinkException.DynamoDbSinkFailFastException(err));
238+
resultHandler.completeExceptionally(
239+
new DynamoDbSinkException.DynamoDbSinkFailFastException(err));
240240
return false;
241241
}
242242

0 commit comments

Comments
 (0)