Skip to content

Conversation

vamshikrishnakyatham
Copy link
Contributor

Describe the issue this Pull Request addresses

Filtering for Hudi incremental queries on tables with version >= 6 depends on commit completion time, but this information is not present in the query output for users. Currently, only requested times are outputted, which leads to inconsistent and incorrect filtering results. Users cannot provide proper filter times because the actual completion times used for filtering are not visible in the results.

fixes: #14036

Summary and Changelog

Summary:

  1. Users can now filter based on actual commit completion times instead of inconsistent requested times
  2. New _hoodie_commit_completion_time virtual column provides visibility into the actual times used for filtering
  3. This eliminates inconsistencies between internal filtering logic and user visible timestamps

Changelog:

  1. Added _hoodie_commit_completion_time virtual column for Hudi tables version >= 6
  2. Extended V2 factory classes (HoodieCopyOnWriteIncrementalHadoopFsRelationFactoryV2, HoodieMergeOnReadIncrementalHadoopFsRelationFactoryV2) to include completion time field in schema
  3. Supported for file group reader optimized queries (both MoR and CoW) and base CoW

Impact

  1. New virtual column _hoodie_commit_completion_time available in incremental query results for tables version >= 6
  2. No breaking changes - existing _hoodie_commit_time column remains unchanged
  3. Backward compatible - only affects V2 incremental relations

Risk Level

low

Documentation Update

Doc, API update with new _hoodie_commit_completion_time virtual column

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

vamshikrishnakyatham and others added 19 commits September 26, 2025 01:46
… V2 as filtering is based off of completion time here
Add missing newline at the end of the file.
…ime-instead-of-commit-time-for-V2' into adding-completion-time-instead-of-commit-time-for-V2
@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Oct 2, 2025
@hudi-bot
Copy link

hudi-bot commented Oct 2, 2025

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

public static final String FILENAME_METADATA_FIELD = HoodieMetadataField.FILENAME_METADATA_FIELD.getFieldName();
public static final String OPERATION_METADATA_FIELD = HoodieMetadataField.OPERATION_METADATA_FIELD.getFieldName();
public static final String HOODIE_IS_DELETED_FIELD = "_hoodie_is_deleted";
public static final String COMMIT_COMPLETION_TIME_METADATA_FIELD = "_hoodie_commit_completion_time";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadata fields are typically persisted to the files. In this case it is just a field we add at query time so maybe we can come up with a better name. You called this a virtual field in the description so maybe something along those lines?

boolean hasInstantRange = readerContext.getInstantRange().isPresent();
boolean shouldAddCompletionTimeField = !metaClient.isMetadataTable()
&& metaClient.getTableConfig() != null && metaClient.getTableConfig().getTableVersion() != null
&& metaClient.getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.SIX)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The completion time is only available in version 8 and above if I remember correctly


boolean hasInstantRange = readerContext.getInstantRange().isPresent();
boolean shouldAddCompletionTimeField = !metaClient.isMetadataTable()
&& metaClient.getTableConfig() != null && metaClient.getTableConfig().getTableVersion() != null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table config and version should always be non-null so we can simplify this

}

private Map<String, String> buildCompletionTimeMapping(HoodieTableMetaClient metaClient) {
return metaClient.getCommitsTimeline().filterCompletedInstants().getInstants().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not account for instants that are no longer in the active timeline. We'll need to use the CompletionTimeQueryView for this

private final HoodieTableMetaClient metaClient;
private final boolean shouldAddCompletionTime;
private final Map<String, String> commitTimeToCompletionTimeMap;
private final Schema requestedSchemaWithCompletionTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the completion time is required, then the requested schema should be updated to include the completion time. I don't think we need a second instance variable.

return record;
}
int completionTimePos = completionTimeField.pos();
Object[] fieldValues = new Object[requestedSchemaWithCompletionTime.getFields().size()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to simply set the value in the existing object instead of creating a new one? At this point the record should have a null for the completion time based on my understanding of the code.

return appendFieldsToSchemaDedupNested(requestedSchema, addedFields);
}
return requestedSchema;
if (shouldAddCompletionTime && !findNestedField(requestedSchemaWithCompletionTime, HoodieRecord.COMMIT_COMPLETION_TIME_METADATA_FIELD).isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the expectation is that the completion time is a top level field, not nested, so we can simplify this

StructType(baseSchema.fields :+ completionTimeField)
}

private def extendAvroSchemaForCompletionTime(baseAvroSchema: Schema): Schema = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This repeats code in another section of this PR. Can we move this into a common utility?

override protected def getMandatoryFields: Seq[String] = {
val baseMandatory = Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
orderingFields ++ partitionColumnsToRead
if (!metaClient.isMetadataTable && metaClient.getTableConfig.getTableVersion.greaterThanOrEquals(HoodieTableVersion.SIX)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be version 8 here as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Addition of virtual _hoodie_commit_completion_time column which is used for Incremental Queries

3 participants