diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index a88e1fec5cfc3..c6c1b3d2b871c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -80,6 +80,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.InputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -253,7 +254,12 @@ private DataStream addFileDistributionStrategy(SingleOutp } else if (OptionsResolver.isAppendMode(conf)) { return source.partitionCustom(new StreamReadAppendPartitioner(conf.get(FlinkOptions.READ_TASKS)), new StreamReadAppendKeySelector()); } else { - return source.keyBy(MergeOnReadInputSplit::getFileId); + return source.keyBy(new KeySelector() { + @Override + public String getKey(MergeOnReadInputSplit split) throws Exception { + return split.getFileId(); + } + }); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java index 78ba30207353b..5df02bae6e064 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadMonitoringFunction.java @@ -466,7 +466,6 @@ public void testStopWithSavepointAndRestore() throws Exception { public void testCheckpointRestoreWithLimit() throws Exception { TestData.writeData(TestData.DATA_SET_INSERT, conf); conf.set(FlinkOptions.READ_SPLITS_LIMIT, 2); - conf.set(FlinkOptions.READ_STREAMING_CHECK_INTERVAL, 1); StreamReadMonitoringFunction function = TestUtils.getMonitorFunc(conf); OperatorSubtaskState state; try (AbstractStreamOperatorTestHarness harness = createHarness(function)) { @@ -483,6 +482,7 @@ public void testCheckpointRestoreWithLimit() throws Exception { assertTrue(sourceContext.splits.stream().allMatch(split -> split.getInstantRange().isPresent()), "All instants should have range limit"); } + conf.set(FlinkOptions.READ_SPLITS_LIMIT, Integer.MAX_VALUE); TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf); StreamReadMonitoringFunction function2 = TestUtils.getMonitorFunc(conf); try (AbstractStreamOperatorTestHarness harness = createHarness(function2)) {