Skip to content

Commit 9b69b63

Browse files
committed
Add JavaBucketIndexPartitioner and integrate it with BUCKET_INDEX
1 parent d10310f commit 9b69b63

File tree

8 files changed

+334
-13
lines changed

8 files changed

+334
-13
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@
3939
import java.io.File;
4040
import java.io.FileReader;
4141
import java.io.IOException;
42-
import java.util.Arrays;
4342
import java.util.Properties;
44-
import java.util.stream.Collectors;
4543

4644
import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
4745
import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
@@ -766,17 +764,10 @@ public EngineType getEngineType() {
766764
private void validateBucketIndexConfig() {
767765
if (hoodieIndexConfig.getString(INDEX_TYPE).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.toString())) {
768766
// check the bucket index hash field
767+
// TODO: How to diable this validation only for append-only streams?
769768
if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) {
770769
hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD,
771770
hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
772-
} else {
773-
boolean valid = Arrays
774-
.stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(","))
775-
.collect(Collectors.toSet())
776-
.containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(",")));
777-
if (!valid) {
778-
throw new HoodieIndexException("Bucket index key (if configured) must be subset of record key.");
779-
}
780771
}
781772
// check the bucket num
782773
if (hoodieIndexConfig.getIntOrDefault(BUCKET_INDEX_NUM_BUCKETS) <= 0) {

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndexFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.hudi.exception.HoodieIndexException;
2626
import org.apache.hudi.index.bloom.HoodieBloomIndex;
2727
import org.apache.hudi.index.bloom.ListBasedHoodieBloomIndexHelper;
28+
import org.apache.hudi.index.bucket.HoodieConsistentBucketIndex;
29+
import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex;
2830
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex;
2931
import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex;
3032
import org.apache.hudi.index.simple.HoodieSimpleIndex;
@@ -54,6 +56,15 @@ public static HoodieIndex createIndex(HoodieWriteConfig config) {
5456
return new HoodieInMemoryHashIndex(config);
5557
case BLOOM:
5658
return new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance());
59+
case BUCKET:
60+
switch (config.getBucketIndexEngineType()) {
61+
case SIMPLE:
62+
return new HoodieSimpleBucketIndex(config);
63+
case CONSISTENT_HASHING:
64+
return new HoodieConsistentBucketIndex(config);
65+
default:
66+
throw new HoodieIndexException("Unknown bucket index engine type: " + config.getBucketIndexEngineType());
67+
}
5768
default:
5869
throw new HoodieIndexException("Unsupported index type " + config.getIndexType());
5970
}

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMet
209209
@SuppressWarnings("unchecked")
210210
protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr,
211211
Partitioner partitioner) {
212-
JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner;
213-
BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
212+
JavaPartitioner javaPartitioner = (JavaPartitioner) partitioner;
213+
BucketInfo binfo = javaPartitioner.getBucketInfo(partition);
214214
BucketType btype = binfo.bucketType;
215215
try {
216216
if (btype.equals(BucketType.INSERT)) {
@@ -278,6 +278,10 @@ public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
278278
if (profile == null) {
279279
throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
280280
}
281+
// Use bucket index partitioner if bucket index is enabled
282+
if (table.getIndex() instanceof org.apache.hudi.index.bucket.HoodieBucketIndex) {
283+
return new JavaBucketIndexPartitioner(profile, context, table, config);
284+
}
281285
return new JavaUpsertPartitioner(profile, context, table, config);
282286
}
283287

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.action.commit;
21+
22+
import org.apache.hudi.common.engine.HoodieEngineContext;
23+
import org.apache.hudi.common.model.HoodieKey;
24+
import org.apache.hudi.common.model.HoodieRecordLocation;
25+
import org.apache.hudi.common.model.WriteOperationType;
26+
import org.apache.hudi.common.util.Option;
27+
import org.apache.hudi.common.util.ValidationUtils;
28+
import org.apache.hudi.common.util.collection.Pair;
29+
import org.apache.hudi.config.HoodieWriteConfig;
30+
import org.apache.hudi.exception.HoodieException;
31+
import org.apache.hudi.index.bucket.BucketIdentifier;
32+
import org.apache.hudi.index.bucket.HoodieBucketIndex;
33+
import org.apache.hudi.table.HoodieTable;
34+
import org.apache.hudi.table.WorkloadProfile;
35+
import org.apache.hudi.table.WorkloadStat;
36+
37+
import java.util.ArrayList;
38+
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.HashSet;
41+
import java.util.List;
42+
import java.util.Map;
43+
import java.util.Set;
44+
45+
import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE;
46+
import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_TABLE;
47+
48+
/**
49+
* Packs incoming records to be inserted into buckets (1 bucket = 1 partition) for Java engine.
50+
* Equivalent to SparkBucketIndexPartitioner.
51+
* TODO: Reduce duplicate code between Spark and Java version.
52+
*/
53+
public class JavaBucketIndexPartitioner implements JavaPartitioner {
54+
55+
private final int numBuckets;
56+
private final String indexKeyField;
57+
private final int totalPartitionPaths;
58+
private final List<String> partitionPaths;
59+
/**
60+
* Helps get the partition id, partition id is partition offset + bucket id.
61+
* The partition offset is a multiple of the bucket num.
62+
*/
63+
private final Map<String, Integer> partitionPathOffset;
64+
private final boolean isOverwrite;
65+
66+
/**
67+
* Partition path and file groups in it pair. Decide the file group an incoming update should go to.
68+
*/
69+
private final Map<String, Set<String>> updatePartitionPathFileIds;
70+
71+
/**
72+
* Bucket number to bucket info mapping.
73+
*/
74+
private final Map<Integer, BucketInfo> bucketInfoMap;
75+
76+
private final boolean isNonBlockingConcurrencyControl;
77+
78+
public JavaBucketIndexPartitioner(WorkloadProfile profile,
79+
HoodieEngineContext context,
80+
HoodieTable table,
81+
HoodieWriteConfig config) {
82+
if (!(table.getIndex() instanceof HoodieBucketIndex)) {
83+
throw new HoodieException(
84+
"Bucket index partitioner should only be used by BucketIndex other than "
85+
+ table.getIndex().getClass().getSimpleName());
86+
}
87+
this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets();
88+
this.indexKeyField = config.getBucketIndexHashField();
89+
this.totalPartitionPaths = profile.getPartitionPaths().size();
90+
this.partitionPaths = new ArrayList<>(profile.getPartitionPaths());
91+
this.partitionPathOffset = new HashMap<>();
92+
int i = 0;
93+
for (Object partitionPath : profile.getPartitionPaths()) {
94+
partitionPathOffset.put(partitionPath.toString(), i);
95+
i += numBuckets;
96+
}
97+
this.updatePartitionPathFileIds = new HashMap<>();
98+
this.bucketInfoMap = new HashMap<>();
99+
assignUpdates(profile);
100+
WriteOperationType operationType = profile.getOperationType();
101+
this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || INSERT_OVERWRITE_TABLE.equals(operationType);
102+
this.isNonBlockingConcurrencyControl = config.isNonBlockingConcurrencyControl();
103+
104+
if (isOverwrite) {
105+
ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl,
106+
"Insert overwrite is not supported with non-blocking concurrency control");
107+
}
108+
}
109+
110+
private void assignUpdates(WorkloadProfile profile) {
111+
// Each update location gets tracked
112+
Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = profile.getInputPartitionPathStatMap()
113+
.entrySet();
114+
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
115+
if (!updatePartitionPathFileIds.containsKey(partitionStat.getKey())) {
116+
updatePartitionPathFileIds.put(partitionStat.getKey(), new HashSet<>());
117+
}
118+
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
119+
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
120+
updatePartitionPathFileIds.get(partitionStat.getKey()).add(updateLocEntry.getKey());
121+
}
122+
}
123+
}
124+
125+
@Override
126+
public int getNumPartitions() {
127+
return totalPartitionPaths * numBuckets;
128+
}
129+
130+
@Override
131+
public int getPartition(Object key) {
132+
Pair<HoodieKey, Option<HoodieRecordLocation>> keyLocation = (Pair<HoodieKey, Option<HoodieRecordLocation>>) key;
133+
String partitionPath = keyLocation.getLeft().getPartitionPath();
134+
Option<HoodieRecordLocation> location = keyLocation.getRight();
135+
int bucketId = location.isPresent()
136+
? BucketIdentifier.bucketIdFromFileId(location.get().getFileId())
137+
: BucketIdentifier.getBucketId(keyLocation.getLeft().getRecordKey(), indexKeyField, numBuckets);
138+
return partitionPathOffset.get(partitionPath) + bucketId;
139+
}
140+
141+
public BucketInfo getBucketInfo(int bucketNumber) {
142+
return bucketInfoMap.computeIfAbsent(bucketNumber, k -> {
143+
int bucketId = bucketNumber % numBuckets;
144+
String partitionPath = partitionPaths.get(bucketNumber / numBuckets);
145+
return getBucketInfo(bucketId, partitionPath);
146+
});
147+
}
148+
149+
protected BucketInfo getBucketInfo(int bucketId, String partitionPath) {
150+
String bucketIdStr = BucketIdentifier.bucketIdStr(bucketId);
151+
// Insert overwrite always generates new bucket file id
152+
if (isOverwrite) {
153+
return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketIdStr), partitionPath);
154+
}
155+
Option<String> fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds
156+
.getOrDefault(partitionPath, Collections.emptySet()).stream()
157+
.filter(e -> e.startsWith(bucketIdStr))
158+
.findFirst());
159+
if (fileIdOption.isPresent()) {
160+
return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), partitionPath);
161+
} else {
162+
// Always write into log file instead of base file if using NB-CC
163+
if (isNonBlockingConcurrencyControl) {
164+
return new BucketInfo(BucketType.UPDATE, BucketIdentifier.newBucketFileIdForNBCC(bucketIdStr), partitionPath);
165+
}
166+
return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketIdStr), partitionPath);
167+
}
168+
}
169+
170+
@Override
171+
public List<String> getSmallFileIds() {
172+
return Collections.emptyList();
173+
}
174+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hudi.table.action.commit;
21+
22+
import java.util.List;
23+
24+
// TODO: Rename this to a better one?
25+
public interface JavaPartitioner extends Partitioner {
26+
BucketInfo getBucketInfo(int bucketNumber);
27+
28+
public List<String> getSmallFileIds();
29+
}

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
/**
5252
* Packs incoming records to be upserted, into buckets.
5353
*/
54-
public class JavaUpsertPartitioner<T> implements Partitioner {
54+
public class JavaUpsertPartitioner<T> implements JavaPartitioner {
5555

5656
private static final Logger LOG = LoggerFactory.getLogger(JavaUpsertPartitioner.class);
5757

@@ -264,6 +264,7 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {
264264
return smallFileLocations;
265265
}
266266

267+
@Override
267268
public BucketInfo getBucketInfo(int bucketNumber) {
268269
return bucketInfoMap.get(bucketNumber);
269270
}
@@ -336,6 +337,7 @@ protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, Hoodi
336337
return avgSize;
337338
}
338339

340+
@Override
339341
public List<String> getSmallFileIds() {
340342
return smallFiles.stream().map(smallFile -> smallFile.location.getFileId())
341343
.collect(Collectors.toList());

hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.hudi.table.HoodieTable;
3232
import org.apache.hudi.table.WorkloadProfile;
3333
import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor;
34+
import org.apache.hudi.table.action.commit.JavaBucketIndexPartitioner;
3435
import org.apache.hudi.table.action.commit.JavaUpsertPartitioner;
3536
import org.apache.hudi.table.action.commit.Partitioner;
3637

@@ -64,6 +65,10 @@ protected BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context,
6465

6566
@Override
6667
public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
68+
// Use bucket index partitioner if bucket index is enabled
69+
if (table.getIndex() instanceof org.apache.hudi.index.bucket.HoodieBucketIndex) {
70+
return new JavaBucketIndexPartitioner(profile, context, table, config);
71+
}
6772
this.partitioner = (JavaUpsertPartitioner) super.getUpsertPartitioner(profile);
6873
return this.partitioner;
6974
}

0 commit comments

Comments
 (0)