Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
846560d
Adding completion time instead of commit time for Incremental queries…
vamshikrishnakyatham Sep 26, 2025
3260aa4
replacing commit time from requested to complete
vamshikrishnakyatham Sep 27, 2025
726b93d
adding complete time column for V2 relations for incremental queries
vamshikrishnakyatham Sep 27, 2025
76f093b
adding complete time column for V2 relations for incremental queries …
vamshikrishnakyatham Sep 27, 2025
a60096f
Merge remote-tracking branch 'upstream/master' into adding-completion…
vamshikrishnakyatham Sep 27, 2025
5ef7a9b
adding complete time column for V2 relations for incremental queries …
vamshikrishnakyatham Sep 27, 2025
349f4fd
adding complete time column for V2 relations for incremental queries …
vamshikrishnakyatham Sep 27, 2025
8e60409
Fix missing newline in MergeOnReadIncrementalRelationV2.scala
vamshikrishnakyatham Sep 27, 2025
320b578
Add missing newline at end of MergeOnReadIncrementalRelationV2.scala
vamshikrishnakyatham Sep 27, 2025
9e8a01e
adding complete time column for V2 relations for incremental queries …
vamshikrishnakyatham Sep 27, 2025
6690dc9
Merge remote-tracking branch 'refs/remotes/origin/adding-completion-t…
vamshikrishnakyatham Sep 27, 2025
c0e8ee8
blueprint for MoR - doesnt work currently
vamshikrishnakyatham Sep 27, 2025
cc57195
part phase
vamshikrishnakyatham Sep 30, 2025
0ee7b8f
MOR fail
vamshikrishnakyatham Oct 1, 2025
18a686b
test
vamshikrishnakyatham Oct 1, 2025
826acce
schema modifications on hudi
vamshikrishnakyatham Oct 2, 2025
b4ce002
Adding _hoodie_completion_time as part of output schema for increment…
vamshikrishnakyatham Oct 2, 2025
ca001d5
Adding _hoodie_completion_time as part of output schema for increment…
vamshikrishnakyatham Oct 2, 2025
e8674ec
Adding _hoodie_completion_time as part of output schema for increment…
vamshikrishnakyatham Oct 2, 2025
e29faf2
Adding _hoodie_completion_time as part of output schema for increment…
vamshikrishnakyatham Oct 2, 2025
f70ac82
Adding _hoodie_completion_time as part of output schema for increment…
vamshikrishnakyatham Oct 2, 2025
8573ba8
Adding _hoodie_completion_time as part of output schema for increment…
vamshikrishnakyatham Oct 2, 2025
a3094ae
scala style fix
vamshikrishnakyatham Oct 2, 2025
11cd775
conditionality fix proper
vamshikrishnakyatham Oct 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public HoodieReaderContext<InternalRow> getContext() {
JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
JavaConverters.asScalaBufferConverter(filters).asScala().toSeq(),
new HadoopStorageConfiguration(configurationBroadcast.getValue().value()),
tableConfigBroadcast.getValue());
tableConfigBroadcast.getValue(),
Option.empty());
} else {
throw new HoodieException("Cannot get the broadcast Spark Parquet reader.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hudi.common.engine.HoodieReaderContext;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
import org.apache.hudi.common.util.HoodieRecordUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -58,6 +59,13 @@ protected BaseSparkInternalRowReaderContext(StorageConfiguration<?> storageConfi
super(storageConfig, tableConfig, Option.empty(), Option.empty(), recordContext);
}

protected BaseSparkInternalRowReaderContext(StorageConfiguration<?> storageConfig,
HoodieTableConfig tableConfig,
Option<InstantRange> instantRange,
BaseSparkInternalRecordContext recordContext) {
super(storageConfig, tableConfig, instantRange, Option.empty(), recordContext);
}

@Override
public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) {
// TODO(HUDI-7843):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.table.log.InstantRange
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator, ClosableIterator, Pair => HPair}
import org.apache.hudi.common.util.Option
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory, HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration, StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator
Expand Down Expand Up @@ -59,8 +61,9 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
filters: Seq[Filter],
requiredFilters: Seq[Filter],
storageConfiguration: StorageConfiguration[_],
tableConfig: HoodieTableConfig)
extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig, SparkFileFormatInternalRecordContext.apply(tableConfig)) {
tableConfig: HoodieTableConfig,
instantRangeOpt: Option[InstantRange] = Option.empty())
extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig, instantRangeOpt, SparkFileFormatInternalRecordContext.apply(tableConfig)) {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private lazy val bootstrapSafeFilters: Seq[Filter] = filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
private lazy val allFilters = filters ++ requiredFilters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public abstract class HoodieRecord<T> implements HoodieRecordCompatibilityInterf
public static final String FILENAME_METADATA_FIELD = HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName();
public static final String OPERATION_METADATA_FIELD = HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName();
public static final String HOODIE_IS_DELETED_FIELD = "_hoodie_is_deleted";
public static final String COMMIT_COMPLETION_TIME_METADATA_FIELD = "_hoodie_commit_completion_time";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata fields are typically persisted to the files. In this case it is just a field we add at query time so maybe we can come up with a better name. You called this a virtual field in the description so maybe something along those lines?

// If the ordering value is not set, this default order value is set and
// always treated as the commit time ordering.
public static final int DEFAULT_ORDERING_VALUE = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
Expand All @@ -55,6 +56,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.avro.JsonProperties.NULL_VALUE;
import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
import static org.apache.hudi.avro.AvroSchemaUtils.createNewSchemaFromFieldsWithReference;
import static org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
Expand Down Expand Up @@ -85,6 +87,9 @@ public class FileGroupReaderSchemaHandler<T> {
protected final TypedProperties properties;
private final DeleteContext deleteContext;
private final HoodieTableMetaClient metaClient;
private final boolean shouldAddCompletionTime;
private final Map<String, String> commitTimeToCompletionTimeMap;
private final Schema requestedSchemaWithCompletionTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the completion time is required, then the requested schema should be updated to include the completion time. I don't think we need a second instance variable.


public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Schema tableSchema,
Expand All @@ -98,10 +103,24 @@ public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
this.hoodieTableConfig = metaClient.getTableConfig();
this.deleteContext = new DeleteContext(properties, tableSchema);
this.metaClient = metaClient;

boolean hasInstantRange = readerContext.getInstantRange().isPresent();
boolean shouldAddCompletionTimeField = !metaClient.isMetadataTable()
&& metaClient.getTableConfig() != null && metaClient.getTableConfig().getTableVersion() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table config and version should always be non-null so we can simplify this

&& metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.SIX)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The completion time is only available in version 8 and above if I remember correctly

&& hasInstantRange;

this.shouldAddCompletionTime = shouldAddCompletionTimeField;
this.requestedSchemaWithCompletionTime = shouldAddCompletionTimeField
? addCompletionTimeField(this.requestedSchema)
: this.requestedSchema;
this.commitTimeToCompletionTimeMap = this.shouldAddCompletionTime
? buildCompletionTimeMapping(metaClient)
: Collections.emptyMap();
this.requiredSchema = AvroSchemaCache.intern(prepareRequiredSchema(this.deleteContext));
this.internalSchema = pruneInternalSchema(requiredSchema, internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
this.metaClient = metaClient;
}

public Schema getTableSchema() {
Expand All @@ -125,12 +144,63 @@ public Option<InternalSchema> getInternalSchemaOpt() {
}

public Option<UnaryOperator<T>> getOutputConverter() {
if (!AvroSchemaUtils.areSchemasProjectionEquivalent(requiredSchema, requestedSchema)) {
return Option.of(readerContext.getRecordContext().projectRecord(requiredSchema, requestedSchema));
Schema targetSchema = shouldAddCompletionTime ? requestedSchemaWithCompletionTime : requestedSchema;
UnaryOperator<T> projectionConverter = null;
UnaryOperator<T> completionTimeConverter = null;
boolean schemasEquivalent = AvroSchemaUtils.areSchemasProjectionEquivalent(requiredSchema, targetSchema);
if (!schemasEquivalent) {
projectionConverter = readerContext.getRecordContext().projectRecord(requiredSchema, targetSchema);
}
if (shouldAddCompletionTime) {
completionTimeConverter = getCompletionTimeTransformer();
}
if (projectionConverter != null && completionTimeConverter != null) {
final UnaryOperator<T> finalProjectionConverter = projectionConverter;
final UnaryOperator<T> finalCompletionTimeConverter = completionTimeConverter;
UnaryOperator<T> composed = t -> finalCompletionTimeConverter.apply(finalProjectionConverter.apply(t));
return Option.of(composed);
} else if (projectionConverter != null) {
return Option.of(projectionConverter);
} else if (completionTimeConverter != null) {
return Option.of(completionTimeConverter);
}
return Option.empty();
}

private UnaryOperator<T> getCompletionTimeTransformer() {
return record -> {
try {
Object commitTimeObj = readerContext.getRecordContext().getValue(
record,
requestedSchemaWithCompletionTime,
HoodieRecord.COMMIT_TIME_METADATA_FIELD
);
if (commitTimeObj == null) {
return record;
}
String commitTime = commitTimeObj.toString();
String completionTime = commitTimeToCompletionTimeMap.getOrDefault(commitTime, commitTime);
Schema.Field completionTimeField = requestedSchemaWithCompletionTime.getField(HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD);
if (completionTimeField == null) {
return record;
}
int completionTimePos = completionTimeField.pos();
Object[] fieldValues = new Object[requestedSchemaWithCompletionTime.getFields().size()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to simply set the value in the existing object instead of creating a new one? At this point the record should have a null for the completion time based on my understanding of the code.

for (int i = 0; i < fieldValues.length; i++) {
if (i == completionTimePos) {
fieldValues[i] = completionTime;
} else {
Schema.Field field = requestedSchemaWithCompletionTime.getFields().get(i);
fieldValues[i] = readerContext.getRecordContext().getValue(record, requestedSchemaWithCompletionTime, field.name());
}
}
return readerContext.getRecordContext().constructEngineRecord(requestedSchemaWithCompletionTime, fieldValues);
} catch (Exception e) {
return record;
}
};
}

public DeleteContext getDeleteContext() {
return deleteContext;
}
Expand Down Expand Up @@ -172,12 +242,24 @@ Schema generateRequiredSchema(DeleteContext deleteContext) {
boolean hasInstantRange = readerContext.getInstantRange().isPresent();
//might need to change this if other queries than mor have mandatory fields
if (!readerContext.getHasLogFiles()) {
List<Schema.Field> addedFields = new ArrayList<>();
if (hasInstantRange && !findNestedField(requestedSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
List<Schema.Field> addedFields = new ArrayList<>();
addedFields.add(getField(this.tableSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD));
return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
}
return requestedSchema;
if (shouldAddCompletionTime && !findNestedField(requestedSchemaWithCompletionTime, HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD).isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expectation is that the completion time is a top level field, not nested, so we can simplify this

Schema unionSchema = Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING));
Schema.Field completionTimeField = new Schema.Field(
HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD,
unionSchema,
"Completion time of the commit",
NULL_VALUE
);
addedFields.add(completionTimeField);
}
if (!addedFields.isEmpty()) {
return appendFieldsToSchemaDedupNested(requestedSchemaWithCompletionTime, addedFields);
}
return requestedSchemaWithCompletionTime;
}

if (hoodieTableConfig.getRecordMergeMode() == RecordMergeMode.CUSTOM) {
Expand All @@ -190,16 +272,30 @@ Schema generateRequiredSchema(DeleteContext deleteContext) {
for (String field : getMandatoryFieldsForMerging(
hoodieTableConfig, this.properties, this.tableSchema, readerContext.getRecordMerger(),
deleteContext.hasBuiltInDeleteField(), deleteContext.getCustomDeleteMarkerKeyValue(), hasInstantRange)) {
if (!findNestedField(requestedSchema, field).isPresent()) {
if (!findNestedField(requestedSchemaWithCompletionTime, field).isPresent()) {
addedFields.add(getField(this.tableSchema, field));
}
}

if (hasInstantRange && !findNestedField(requestedSchemaWithCompletionTime, HoodieRecord.COMMIT_TIME_METADATA_FIELD).isPresent()) {
addedFields.add(getField(this.tableSchema, HoodieRecord.COMMIT_TIME_METADATA_FIELD));
}
if (shouldAddCompletionTime && !findNestedField(requestedSchemaWithCompletionTime, HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD).isPresent()) {
Schema unionSchema = Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING));
Schema.Field completionTimeField = new Schema.Field(
HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD,
unionSchema,
"Completion time of the commit",
NULL_VALUE
);
addedFields.add(completionTimeField);
}

if (addedFields.isEmpty()) {
return requestedSchema;
return requestedSchemaWithCompletionTime;
}

return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
return appendFieldsToSchemaDedupNested(requestedSchemaWithCompletionTime, addedFields);
}

private static String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg,
Expand Down Expand Up @@ -308,4 +404,26 @@ private static Schema.Field getField(Schema schema, String fieldName) {
}
return foundFieldOpt.get();
}

private Map<String, String> buildCompletionTimeMapping(HoodieTableMetaClient metaClient) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getInstants().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not account for instants that are no longer in the active timeline. We'll need to use the CompletionTimeQueryView for this

.collect(Collectors.toMap(
HoodieInstant::requestedTime,
instant -> instant.getCompletionTime() != null ? instant.getCompletionTime() : instant.requestedTime()
));
}

private Schema addCompletionTimeField(Schema schema) {
if (findNestedField(schema, HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD).isPresent()) {
return schema;
}
Schema unionSchema = Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING));
Schema.Field completionTimeField = new Schema.Field(
HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD,
unionSchema,
"Completion time of the commit",
NULL_VALUE
);
return appendFieldsToSchemaDedupNested(schema, Collections.singletonList(completionTimeField));
}
}
Loading
Loading