diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java index 12b3256c07f1c..9952f47b34b98 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java @@ -39,9 +39,7 @@ import java.io.File; import java.io.FileReader; import java.io.IOException; -import java.util.Arrays; import java.util.Properties; -import java.util.stream.Collectors; import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES; import static org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE; @@ -766,17 +764,10 @@ public EngineType getEngineType() { private void validateBucketIndexConfig() { if (hoodieIndexConfig.getString(INDEX_TYPE).equalsIgnoreCase(HoodieIndex.IndexType.BUCKET.toString())) { // check the bucket index hash field + // TODO: How to diable this validation only for append-only streams? if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) { hoodieIndexConfig.setValue(BUCKET_INDEX_HASH_FIELD, hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME)); - } else { - boolean valid = Arrays - .stream(hoodieIndexConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME).split(",")) - .collect(Collectors.toSet()) - .containsAll(Arrays.asList(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD).split(","))); - if (!valid) { - throw new HoodieIndexException("Bucket index key (if configured) must be subset of record key."); - } } // check the bucket num if (hoodieIndexConfig.getIntOrDefault(BUCKET_INDEX_NUM_BUCKETS) <= 0) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndexFactory.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndexFactory.java index e400c0b4e8bf5..40c692a6ce5de 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndexFactory.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/index/JavaHoodieIndexFactory.java @@ -25,6 +25,8 @@ import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.index.bloom.HoodieBloomIndex; import org.apache.hudi.index.bloom.ListBasedHoodieBloomIndexHelper; +import org.apache.hudi.index.bucket.HoodieConsistentBucketIndex; +import org.apache.hudi.index.bucket.HoodieSimpleBucketIndex; import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex; import org.apache.hudi.index.simple.HoodieGlobalSimpleIndex; import org.apache.hudi.index.simple.HoodieSimpleIndex; @@ -54,6 +56,15 @@ public static HoodieIndex createIndex(HoodieWriteConfig config) { return new HoodieInMemoryHashIndex(config); case BLOOM: return new HoodieBloomIndex(config, ListBasedHoodieBloomIndexHelper.getInstance()); + case BUCKET: + switch (config.getBucketIndexEngineType()) { + case SIMPLE: + return new HoodieSimpleBucketIndex(config); + case CONSISTENT_HASHING: + return new HoodieConsistentBucketIndex(config); + default: + throw new HoodieIndexException("Unknown bucket index engine type: " + config.getBucketIndexEngineType()); + } default: throw new HoodieIndexException("Unsupported index type " + config.getIndexType()); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 092daf2cead25..460869a718415 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -209,8 +209,8 @@ protected Map> getPartitionToReplacedFileIds(HoodieWriteMet @SuppressWarnings("unchecked") protected Iterator> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) { - JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner) partitioner; - BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition); + JavaPartitioner javaPartitioner = (JavaPartitioner) partitioner; + BucketInfo binfo = javaPartitioner.getBucketInfo(partition); BucketType btype = binfo.bucketType; try { if (btype.equals(BucketType.INSERT)) { @@ -278,6 +278,10 @@ public Partitioner getUpsertPartitioner(WorkloadProfile profile) { if (profile == null) { throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); } + // Use bucket index partitioner if bucket index is enabled + if (table.getIndex() instanceof org.apache.hudi.index.bucket.HoodieBucketIndex) { + return new JavaBucketIndexPartitioner(profile, context, table, config); + } return new JavaUpsertPartitioner(profile, context, table, config); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBucketIndexPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBucketIndexPartitioner.java new file mode 100644 index 0000000000000..f3a526db4c5f8 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaBucketIndexPartitioner.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.index.bucket.HoodieBucketIndex; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE; +import static org.apache.hudi.common.model.WriteOperationType.INSERT_OVERWRITE_TABLE; + +/** + * Packs incoming records to be inserted into buckets (1 bucket = 1 partition) for Java engine. + * Equivalent to SparkBucketIndexPartitioner. + * TODO: Reduce duplicate code between Spark and Java version. + */ +public class JavaBucketIndexPartitioner implements JavaPartitioner { + + private final int numBuckets; + private final String indexKeyField; + private final int totalPartitionPaths; + private final List partitionPaths; + /** + * Helps get the partition id, partition id is partition offset + bucket id. + * The partition offset is a multiple of the bucket num. + */ + private final Map partitionPathOffset; + private final boolean isOverwrite; + + /** + * Partition path and file groups in it pair. Decide the file group an incoming update should go to. + */ + private final Map> updatePartitionPathFileIds; + + /** + * Bucket number to bucket info mapping. + */ + private final Map bucketInfoMap; + + private final boolean isNonBlockingConcurrencyControl; + + public JavaBucketIndexPartitioner(WorkloadProfile profile, + HoodieEngineContext context, + HoodieTable table, + HoodieWriteConfig config) { + if (!(table.getIndex() instanceof HoodieBucketIndex)) { + throw new HoodieException( + "Bucket index partitioner should only be used by BucketIndex other than " + + table.getIndex().getClass().getSimpleName()); + } + this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets(); + this.indexKeyField = config.getBucketIndexHashField(); + this.totalPartitionPaths = profile.getPartitionPaths().size(); + this.partitionPaths = new ArrayList<>(profile.getPartitionPaths()); + this.partitionPathOffset = new HashMap<>(); + int i = 0; + for (Object partitionPath : profile.getPartitionPaths()) { + partitionPathOffset.put(partitionPath.toString(), i); + i += numBuckets; + } + this.updatePartitionPathFileIds = new HashMap<>(); + this.bucketInfoMap = new HashMap<>(); + assignUpdates(profile); + WriteOperationType operationType = profile.getOperationType(); + this.isOverwrite = INSERT_OVERWRITE.equals(operationType) || INSERT_OVERWRITE_TABLE.equals(operationType); + this.isNonBlockingConcurrencyControl = config.isNonBlockingConcurrencyControl(); + + if (isOverwrite) { + ValidationUtils.checkArgument(!isNonBlockingConcurrencyControl, + "Insert overwrite is not supported with non-blocking concurrency control"); + } + } + + private void assignUpdates(WorkloadProfile profile) { + // Each update location gets tracked + Set> partitionStatEntries = profile.getInputPartitionPathStatMap() + .entrySet(); + for (Map.Entry partitionStat : partitionStatEntries) { + if (!updatePartitionPathFileIds.containsKey(partitionStat.getKey())) { + updatePartitionPathFileIds.put(partitionStat.getKey(), new HashSet<>()); + } + for (Map.Entry> updateLocEntry : + partitionStat.getValue().getUpdateLocationToCount().entrySet()) { + updatePartitionPathFileIds.get(partitionStat.getKey()).add(updateLocEntry.getKey()); + } + } + } + + @Override + public int getNumPartitions() { + return totalPartitionPaths * numBuckets; + } + + @Override + public int getPartition(Object key) { + Pair> keyLocation = (Pair>) key; + String partitionPath = keyLocation.getLeft().getPartitionPath(); + Option location = keyLocation.getRight(); + int bucketId = location.isPresent() + ? BucketIdentifier.bucketIdFromFileId(location.get().getFileId()) + : BucketIdentifier.getBucketId(keyLocation.getLeft().getRecordKey(), indexKeyField, numBuckets); + return partitionPathOffset.get(partitionPath) + bucketId; + } + + public BucketInfo getBucketInfo(int bucketNumber) { + return bucketInfoMap.computeIfAbsent(bucketNumber, k -> { + int bucketId = bucketNumber % numBuckets; + String partitionPath = partitionPaths.get(bucketNumber / numBuckets); + return getBucketInfo(bucketId, partitionPath); + }); + } + + protected BucketInfo getBucketInfo(int bucketId, String partitionPath) { + String bucketIdStr = BucketIdentifier.bucketIdStr(bucketId); + // Insert overwrite always generates new bucket file id + if (isOverwrite) { + return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketIdStr), partitionPath); + } + Option fileIdOption = Option.fromJavaOptional(updatePartitionPathFileIds + .getOrDefault(partitionPath, Collections.emptySet()).stream() + .filter(e -> e.startsWith(bucketIdStr)) + .findFirst()); + if (fileIdOption.isPresent()) { + return new BucketInfo(BucketType.UPDATE, fileIdOption.get(), partitionPath); + } else { + // Always write into log file instead of base file if using NB-CC + if (isNonBlockingConcurrencyControl) { + return new BucketInfo(BucketType.UPDATE, BucketIdentifier.newBucketFileIdForNBCC(bucketIdStr), partitionPath); + } + return new BucketInfo(BucketType.INSERT, BucketIdentifier.newBucketFileIdPrefix(bucketIdStr), partitionPath); + } + } + + @Override + public List getSmallFileIds() { + return Collections.emptyList(); + } +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaPartitioner.java new file mode 100644 index 0000000000000..7ec8bdcdfebb0 --- /dev/null +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaPartitioner.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import java.util.List; + +// TODO: Rename this to a better one? +public interface JavaPartitioner extends Partitioner { + BucketInfo getBucketInfo(int bucketNumber); + + public List getSmallFileIds(); +} diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index 4d3c78676dca4..0d51b0187f3f7 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -51,7 +51,7 @@ /** * Packs incoming records to be upserted, into buckets. */ -public class JavaUpsertPartitioner implements Partitioner { +public class JavaUpsertPartitioner implements JavaPartitioner { private static final Logger LOG = LoggerFactory.getLogger(JavaUpsertPartitioner.class); @@ -264,6 +264,7 @@ protected List getSmallFiles(String partitionPath) { return smallFileLocations; } + @Override public BucketInfo getBucketInfo(int bucketNumber) { return bucketInfoMap.get(bucketNumber); } @@ -336,6 +337,7 @@ protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, Hoodi return avgSize; } + @Override public List getSmallFileIds() { return smallFiles.stream().map(smallFile -> smallFile.location.getFileId()) .collect(Collectors.toList()); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java index f144c1576eb90..bd745fa1f1fd7 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/deltacommit/BaseJavaDeltaCommitActionExecutor.java @@ -31,6 +31,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; import org.apache.hudi.table.action.commit.BaseJavaCommitActionExecutor; +import org.apache.hudi.table.action.commit.JavaBucketIndexPartitioner; import org.apache.hudi.table.action.commit.JavaUpsertPartitioner; import org.apache.hudi.table.action.commit.Partitioner; @@ -64,6 +65,10 @@ protected BaseJavaDeltaCommitActionExecutor(HoodieEngineContext context, @Override public Partitioner getUpsertPartitioner(WorkloadProfile profile) { + // Use bucket index partitioner if bucket index is enabled + if (table.getIndex() instanceof org.apache.hudi.index.bucket.HoodieBucketIndex) { + return new JavaBucketIndexPartitioner(profile, context, table, config); + } this.partitioner = (JavaUpsertPartitioner) super.getUpsertPartitioner(profile); return this.partitioner; } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientBucketIndex.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientBucketIndex.java new file mode 100644 index 0000000000000..67fdbee0be186 --- /dev/null +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientBucketIndex.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client.functional; + +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.testutils.GenericRecordValidationTestUtils; +import org.apache.hudi.testutils.HoodieJavaClientTestHarness; + +import org.apache.avro.generic.GenericRecord; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test Bucket Index with Java Write Client + */ +public class TestHoodieJavaClientBucketIndex extends HoodieJavaClientTestHarness { + + @Override + protected HoodieTableType getTableType() { + return HoodieTableType.MERGE_ON_READ; + } + + @Test + public void testWriteData() throws Exception { + HoodieWriteConfig config = getConfigBuilder() + .withIndexConfig(HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.BUCKET) + .withBucketIndexEngineType(HoodieIndex.BucketIndexEngineType.SIMPLE) + .withBucketNum("8") + .withIndexKeyField("_row_key") + .build()) + .build(); + HoodieJavaWriteClient writeClient = getHoodieWriteClient(config); + + int totalRecords = 20; + List records = dataGen.generateInserts("001", totalRecords); + + // Insert totalRecords records + List writeStatuses = writeData(writeClient, records, true); + long numFileGroups = writeStatuses.stream().map(WriteStatus::getFileId).distinct().count(); + assertTrue(numFileGroups > 0, "Should create file groups"); + + // Verify records written + metaClient = HoodieTableMetaClient.reload(metaClient); + Map recordMap = GenericRecordValidationTestUtils.getRecordsMap(config, storageConf, dataGen); + assertEquals(totalRecords, recordMap.size(), "Should have written " + totalRecords + " records"); + + // Upsert the same set of records, the number of records should remain the same (deduplication) + writeData(writeClient, dataGen.generateUpdates("002", totalRecords), true); + metaClient = HoodieTableMetaClient.reload(metaClient); + recordMap = GenericRecordValidationTestUtils.getRecordsMap(config, storageConf, dataGen); + assertEquals(totalRecords, recordMap.size(), "Should still have " + totalRecords + " records after deduplication"); + + // Upsert new set of records, and validate the total number of records + writeData(writeClient, dataGen.generateInserts("003", totalRecords), true); + metaClient = HoodieTableMetaClient.reload(metaClient); + recordMap = GenericRecordValidationTestUtils.getRecordsMap(config, storageConf, dataGen); + assertEquals(totalRecords * 2, recordMap.size(), "Should have " + (totalRecords * 2) + " total records"); + } + + private List writeData(HoodieJavaWriteClient writeClient, List records, boolean doCommit) { + metaClient = HoodieTableMetaClient.reload(metaClient); + String commitTime = writeClient.startCommit(HoodieActiveTimeline.DELTA_COMMIT_ACTION); + List writeStatuses = writeClient.upsert(records, commitTime); + org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses); + + if (doCommit) { + boolean success = writeClient.commit(commitTime, writeStatuses, Option.empty()); + assertTrue(success, "Commit should succeed"); + } + metaClient = HoodieTableMetaClient.reload(metaClient); + return writeStatuses; + } +}