Skip to content

Commit 1e8bf4b

Browse files
committed
[Support] double buffer based async write in AppendWriteFunctionWithBufferSort
1 parent 4d95b2c commit 1e8bf4b

File tree

2 files changed

+210
-16
lines changed

2 files changed

+210
-16
lines changed

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java

Lines changed: 97 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@
4646
import java.util.Arrays;
4747
import java.util.Iterator;
4848
import java.util.List;
49+
import java.util.concurrent.CompletableFuture;
50+
import java.util.concurrent.ExecutorService;
51+
import java.util.concurrent.Executors;
52+
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.concurrent.atomic.AtomicReference;
4954
import java.util.stream.Collectors;
5055

5156
/**
@@ -61,7 +66,13 @@
6166
public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> {
6267
private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class);
6368
private final long writeBufferSize;
64-
private transient BinaryInMemorySortBuffer buffer;
69+
private transient BinaryInMemorySortBuffer activeBuffer;
70+
private transient BinaryInMemorySortBuffer backgroundBuffer;
71+
private transient ExecutorService asyncWriteExecutor;
72+
private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
73+
private transient AtomicBoolean isBackgroundBufferBeingProcessed;
74+
private transient GeneratedNormalizedKeyComputer keyComputer;
75+
private transient GeneratedRecordComparator recordComparator;
6576

6677
public AppendWriteFunctionWithBufferSort(Configuration config, RowType rowType) {
6778
super(config, rowType);
@@ -78,41 +89,55 @@ public void open(Configuration parameters) throws Exception {
7889
List<String> sortKeyList = Arrays.stream(sortKeys.split(",")).map(key -> key.trim()).collect(Collectors.toList());
7990
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, sortKeyList.toArray(new String[0]));
8091
SortCodeGenerator codeGenerator = sortOperatorGen.createSortCodeGenerator();
81-
GeneratedNormalizedKeyComputer keyComputer = codeGenerator.generateNormalizedKeyComputer("SortComputer");
82-
GeneratedRecordComparator recordComparator = codeGenerator.generateRecordComparator("SortComparator");
92+
this.keyComputer = codeGenerator.generateNormalizedKeyComputer("SortComputer");
93+
this.recordComparator = codeGenerator.generateRecordComparator("SortComparator");
8394
MemorySegmentPool memorySegmentPool = MemorySegmentPoolFactory.createMemorySegmentPool(config);
84-
this.buffer = BufferUtils.createBuffer(rowType,
95+
96+
this.activeBuffer = BufferUtils.createBuffer(rowType,
8597
memorySegmentPool,
8698
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
8799
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
88-
LOG.info("{} is initialized successfully.", getClass().getSimpleName());
100+
this.backgroundBuffer = BufferUtils.createBuffer(rowType,
101+
MemorySegmentPoolFactory.createMemorySegmentPool(config),
102+
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
103+
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
104+
105+
this.asyncWriteExecutor = Executors.newSingleThreadExecutor(r -> {
106+
Thread t = new Thread(r, "async-write-thread");
107+
t.setDaemon(true);
108+
return t;
109+
});
110+
this.asyncWriteTask = new AtomicReference<>(CompletableFuture.completedFuture(null));
111+
this.isBackgroundBufferBeingProcessed = new AtomicBoolean(false);
112+
113+
LOG.info("{} is initialized successfully with double buffer.", getClass().getSimpleName());
89114
}
90115

91116
@Override
92117
public void processElement(T value, Context ctx, Collector<RowData> out) throws Exception {
93118
RowData data = (RowData) value;
94119

95-
// 1.try to write data into memory pool
96-
boolean success = buffer.write(data);
120+
// try to write data into active buffer
121+
boolean success = activeBuffer.write(data);
97122
if (!success) {
98-
// 2. flushes the bucket if the memory pool is full
99-
sortAndSend();
100-
// 3. write the row again
101-
success = buffer.write(data);
123+
swapAndFlushAsync();
124+
// write the row again to the new active buffer
125+
success = activeBuffer.write(data);
102126
if (!success) {
103127
throw new HoodieException("Buffer is too small to hold a single record.");
104128
}
105129
}
106130

107-
if (buffer.size() >= writeBufferSize) {
108-
sortAndSend();
131+
if (activeBuffer.size() >= writeBufferSize) {
132+
swapAndFlushAsync();
109133
}
110134
}
111135

112136
@Override
113137
public void snapshotState() {
114138
try {
115-
sortAndSend();
139+
waitForAsyncWriteCompletion();
140+
sortAndSend(activeBuffer);
116141
} catch (IOException e) {
117142
throw new HoodieIOException("Fail to sort and flush data in buffer during snapshot state.", e);
118143
}
@@ -122,20 +147,64 @@ public void snapshotState() {
122147
@Override
123148
public void endInput() {
124149
try {
125-
sortAndSend();
150+
waitForAsyncWriteCompletion();
151+
sortAndSend(activeBuffer);
126152
} catch (IOException e) {
127153
throw new HoodieIOException("Fail to sort and flush data in buffer during endInput.", e);
128154
}
129155
super.endInput();
130156
}
131157

158+
/**
159+
* Swaps the active and background buffers and triggers async flush of the background buffer.
160+
*/
161+
private void swapAndFlushAsync() throws IOException {
162+
waitForAsyncWriteCompletion();
163+
164+
// Swap buffers
165+
BinaryInMemorySortBuffer temp = activeBuffer;
166+
activeBuffer = backgroundBuffer;
167+
backgroundBuffer = temp;
168+
169+
// Start async processing of the background buffer
170+
if (!backgroundBuffer.isEmpty()) {
171+
isBackgroundBufferBeingProcessed.set(true);
172+
CompletableFuture<Void> newTask = CompletableFuture.runAsync(() -> {
173+
try {
174+
sortAndSend(backgroundBuffer);
175+
} catch (IOException e) {
176+
LOG.error("Error during async write", e);
177+
throw new RuntimeException(e);
178+
} finally {
179+
isBackgroundBufferBeingProcessed.set(false);
180+
}
181+
}, asyncWriteExecutor);
182+
asyncWriteTask.set(newTask);
183+
}
184+
}
185+
186+
/**
187+
* Waits for any ongoing async write operation to complete.
188+
*/
189+
private void waitForAsyncWriteCompletion() {
190+
try {
191+
CompletableFuture<Void> currentTask = asyncWriteTask.get();
192+
if (currentTask != null) {
193+
currentTask.join();
194+
}
195+
} catch (Exception e) {
196+
LOG.error("Error waiting for async write completion", e);
197+
throw new RuntimeException(e);
198+
}
199+
}
200+
132201
/**
133202
* For append writing, the flushing can be triggered with the following conditions:
134203
* 1. Checkpoint trigger, in which the current remaining data in buffer are flushed and committed.
135204
* 2. Binary buffer is full.
136205
* 3. `endInput` is called for pipelines with a bounded source.
137206
*/
138-
private void sortAndSend() throws IOException {
207+
private void sortAndSend(BinaryInMemorySortBuffer buffer) throws IOException {
139208
if (buffer.isEmpty()) {
140209
return;
141210
}
@@ -155,4 +224,16 @@ private void sortAndSend() throws IOException {
155224
private static void sort(BinaryInMemorySortBuffer dataBuffer) {
156225
new QuickSort().sort(dataBuffer);
157226
}
227+
228+
@Override
229+
public void close() throws Exception {
230+
try {
231+
waitForAsyncWriteCompletion();
232+
if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
233+
asyncWriteExecutor.shutdown();
234+
}
235+
} finally {
236+
super.close();
237+
}
238+
}
158239
}

hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.table.data.RowData;
3434
import org.apache.flink.table.data.StringData;
3535
import org.apache.flink.table.data.TimestampData;
36+
3637
import org.junit.jupiter.api.BeforeEach;
3738
import org.junit.jupiter.api.Test;
3839
import org.junit.jupiter.api.io.TempDir;
@@ -48,6 +49,7 @@
4849

4950
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
5051
import static org.junit.jupiter.api.Assertions.assertEquals;
52+
import static org.junit.jupiter.api.Assertions.assertTrue;
5153

5254
/**
5355
* Test cases for {@link AppendWriteFunctionWithBufferSort}.
@@ -175,6 +177,117 @@ public void testSortedResult() throws Exception {
175177
assertArrayEquals(expected.toArray(), filteredResult.toArray());
176178
}
177179

180+
@Test
181+
public void testMultipleCheckpoints() throws Exception {
182+
List<RowData> batch1 = Arrays.asList(
183+
createRowData("uuid1", "Charlie", 35, "1970-01-01 00:00:01.123", "p1"),
184+
createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1")
185+
);
186+
187+
List<RowData> batch2 = Arrays.asList(
188+
createRowData("uuid3", "Bob", 30, "1970-01-01 00:00:01.125", "p1"),
189+
createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
190+
);
191+
192+
TestHarness testHarness = TestWriteBase.TestHarness.instance()
193+
.preparePipeline(tempFile, conf);
194+
195+
testHarness.consume(batch1).checkpoint(1);
196+
testHarness.consume(batch2).checkpoint(2);
197+
testHarness.endInput();
198+
199+
List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1);
200+
assertEquals(4, actualData.size());
201+
}
202+
203+
@Test
204+
public void testLargeDatasetWithMultipleFlushes() throws Exception {
205+
this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 50L);
206+
207+
List<RowData> inputData = new ArrayList<>();
208+
for (int i = 0; i < 500; i++) {
209+
inputData.add(createRowData("uuid" + i, "Name" + (i % 10), i % 100, "1970-01-01 00:00:01.123", "p" + (i % 3)));
210+
}
211+
212+
TestWriteBase.TestHarness.instance()
213+
.preparePipeline(tempFile, conf)
214+
.consume(inputData)
215+
.endInput();
216+
217+
List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 3);
218+
assertEquals(500, actualData.size());
219+
}
220+
221+
@Test
222+
public void testSortStabilityWithDuplicateKeys() throws Exception {
223+
List<RowData> inputData = Arrays.asList(
224+
createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
225+
createRowData("uuid2", "Alice", 25, "1970-01-01 00:00:01.124", "p1"),
226+
createRowData("uuid3", "Alice", 25, "1970-01-01 00:00:01.125", "p1"),
227+
createRowData("uuid4", "Bob", 30, "1970-01-01 00:00:01.126", "p1")
228+
);
229+
230+
TestWriteBase.TestHarness.instance()
231+
.preparePipeline(tempFile, conf)
232+
.consume(inputData)
233+
.endInput();
234+
235+
List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1);
236+
assertEquals(4, actualData.size());
237+
238+
List<String> filteredResult = actualData.stream()
239+
.map(TestData::filterOutVariablesWithoutHudiMetadata)
240+
.collect(Collectors.toList());
241+
242+
assertTrue(filteredResult.get(0).contains("Alice"));
243+
assertTrue(filteredResult.get(1).contains("Alice"));
244+
assertTrue(filteredResult.get(2).contains("Alice"));
245+
assertTrue(filteredResult.get(3).contains("Bob"));
246+
}
247+
248+
@Test
249+
public void testDifferentPartitions() throws Exception {
250+
List<RowData> inputData = Arrays.asList(
251+
createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.123", "p1"),
252+
createRowData("uuid2", "Bob", 30, "1970-01-01 00:00:01.124", "p2"),
253+
createRowData("uuid3", "Charlie", 35, "1970-01-01 00:00:01.125", "p3"),
254+
createRowData("uuid4", "Diana", 28, "1970-01-01 00:00:01.126", "p1")
255+
);
256+
257+
TestWriteBase.TestHarness.instance()
258+
.preparePipeline(tempFile, conf)
259+
.consume(inputData)
260+
.endInput();
261+
262+
List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 3);
263+
assertEquals(4, actualData.size());
264+
}
265+
266+
@Test
267+
public void testConcurrentWriteScenario() throws Exception {
268+
this.conf.set(FlinkOptions.WRITE_BUFFER_SIZE, 20L);
269+
270+
List<RowData> inputData = new ArrayList<>();
271+
for (int i = 0; i < 200; i++) {
272+
inputData.add(createRowData("uuid" + i, "Name" + (i % 5), i % 50, "1970-01-01 00:00:01.123", "p1"));
273+
}
274+
275+
TestHarness testHarness = TestWriteBase.TestHarness.instance()
276+
.preparePipeline(tempFile, conf);
277+
278+
for (int i = 0; i < inputData.size(); i += 10) {
279+
List<RowData> batch = inputData.subList(i, Math.min(i + 10, inputData.size()));
280+
testHarness.consume(batch);
281+
if (i % 50 == 0) {
282+
testHarness.checkpoint(i / 50 + 1);
283+
}
284+
}
285+
testHarness.endInput();
286+
287+
List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1);
288+
assertEquals(200, actualData.size());
289+
}
290+
178291
private GenericRowData createRowData(String uuid, String name, int age, String timestamp, String partition) {
179292
return GenericRowData.of(StringData.fromString(uuid), StringData.fromString(name),
180293
age, TimestampData.fromTimestamp(Timestamp.valueOf(timestamp)), StringData.fromString(partition));

0 commit comments

Comments
 (0)