Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@
import static org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
import static org.apache.hudi.config.HoodieErrorTableConfig.ERROR_TABLE_PERSIST_SOURCE_RDD;
import static org.apache.hudi.config.HoodieWriteConfig.STREAMER_CHECKPOINT_KEY;
import static org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -3376,6 +3377,333 @@ void testErrorTableSourcePersist(WriteOperationType writeOperationType, boolean
assertRecordCount(950, tableBasePath, sqlContext);
}

/**
* Test incremental source functionality when source table is upgraded from v6 to v8/v9
* while target table remains at v6. This validates backward compatibility for cross-version
* incremental sync scenarios.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extend the test to also upgrade target table to V9 as last step. and add few more commits to source table and do one round of validation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok will try making that change.

*/
@ParameterizedTest
@EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion targetUpgradeVersion) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets rename the arg to sourceTableFinalTableVersion
its bit confusing as of now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do so

// Create unique paths for both tables
String sourceTablePath = basePath + "/source_table_v6_to_v" + targetUpgradeVersion.versionCode();
String targetTablePath = basePath + "/target_table_v6_" + targetUpgradeVersion.versionCode();

// Phase 1: Create source table at v6 with initial commits
HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
sourceTablePath, WriteOperationType.BULK_INSERT);
sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + HoodieTableVersion.SIX.versionCode());
sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
sourceConfig.sourceLimit = 100;

// Initialize source table with first commit
HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig, jsc);
sourceStreamer.sync();

// Add 2 more commits to source table (total 3 commits)
sourceConfig.operation = WriteOperationType.BULK_INSERT;
for (int i = 0; i < 2; i++) {
// Create fresh config to avoid conflicts
HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
sourceTablePath, WriteOperationType.BULK_INSERT);
newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
newSourceConfig.sourceLimit = 100;
sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
sourceStreamer.sync();
}

// Verify source has 3 commits and is at v6
HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
.setConf(context.getStorageConf())
.setBasePath(sourceTablePath)
.build();
assertEquals(3, sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
assertEquals(HoodieTableVersion.SIX, sourceMetaClient.getTableConfig().getTableVersion());

// Phase 2: Setup target table at v6 and sync first 3 commits
HoodieDeltaStreamer.Config targetConfig = TestHelpers.makeConfigForHudiIncrSrc(
sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, false, null);
targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + HoodieTableVersion.SIX.versionCode());
targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");

// Sync all 3 commits from source to target, one by one
HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig, jsc);
targetStreamer.sync();

// Verify checkpoint is established in V1 format
HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
.setConf(context.getStorageConf())
.setBasePath(targetTablePath)
.build();
HoodieInstant lastTargetInstant = targetMetaClient.getActiveTimeline().lastInstant().get();
Option<HoodieCommitMetadata> commitMetadata = HoodieClientTestUtils.getCommitMetadataForInstant(
targetMetaClient, lastTargetInstant);
assertTrue(commitMetadata.isPresent());
// Checkpoint should be in V1 format
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
String checkpointBeforeUpgrade = commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);

// Verify record counts match between source and target
long sourceRecordCountOriginal = sqlContext.read()
.format("org.apache.hudi")
.load(sourceTablePath)
.count();
long targetRecordCountOriginal = sqlContext.read()
.format("org.apache.hudi")
.load(targetTablePath)
.count();

assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are doing bulk_insert, should be easy to do entire data equality right?
just drop meta fields from both table and compare the dataframes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok will do a df equality comparison to ensure data is same.

"Target should have all records from source");

// Phase 3: Upgrade source table from v6 to target version
HoodieDeltaStreamer.Config upgradeConfig = TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
upgradeConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + targetUpgradeVersion.versionCode());
upgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=true");
upgradeConfig.sourceLimit = 100;

// This sync will trigger the upgrade
sourceStreamer = new HoodieDeltaStreamer(upgradeConfig, jsc);
sourceStreamer.sync();

// Verify source table is now at target version - create fresh metaclient after upgrade
sourceMetaClient = HoodieTableMetaClient.builder()
.setConf(HoodieTestUtils.getDefaultStorageConf())
.setBasePath(sourceTablePath)
.build();
assertEquals(targetUpgradeVersion, sourceMetaClient.getTableConfig().getTableVersion());

// Phase 4: Add 2 more commits to upgraded source table
// After upgrade, don't specify version - let it use the existing table version
for (int i = 0; i < 2; i++) {
HoodieDeltaStreamer.Config postUpgradeConfig = TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
postUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
postUpgradeConfig.sourceLimit = 100;
sourceStreamer = new HoodieDeltaStreamer(postUpgradeConfig, jsc);
sourceStreamer.sync();
}

// Verify source now has 6 total commits
sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
assertEquals(6, sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());

// Phase 5: Resume incremental sync from target table (still at v6)
// Create base config following existing test patterns
HoodieDeltaStreamer.Config resumeTargetConfig = TestHelpers.makeConfigForHudiIncrSrc(
sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, false, null);
resumeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + HoodieTableVersion.SIX.versionCode());
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");

// This should successfully pull all remaining commits from upgraded source
targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
targetStreamer.sync();

// Phase 6: Validate data integrity and checkpoint continuity
targetMetaClient.reloadActiveTimeline();
assertEquals(HoodieTableVersion.SIX, targetMetaClient.getTableConfig().getTableVersion());

// Verify record counts match between source and target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the test is very monolith or rather fatter.
can we de-compose this into multiple smaller methods and re-use.
for eg, ingestion 100 records across 3 commits should be a private method.

  • comparing num commits in timeline
  • Comparing versions for a given table
  • comparing data equality across source and target tables.
    all these should be moved to private method and should be re-used

long sourceRecordCount = sqlContext.read()
.format("org.apache.hudi")
.load(sourceTablePath)
.count();
long targetRecordCount = sqlContext.read()
.format("org.apache.hudi")
.load(targetTablePath)
.count();

assertEquals(sourceRecordCount, targetRecordCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data equality checks please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will make this a util and call it in several places

"Target should have all records from source despite version difference");

// Verify checkpoint was properly updated in target
HoodieInstant finalTargetInstant = targetMetaClient.getActiveTimeline().lastInstant().get();
Option<HoodieCommitMetadata> finalCommitMetadata = HoodieClientTestUtils.getCommitMetadataForInstant(
targetMetaClient, finalTargetInstant);
assertTrue(finalCommitMetadata.isPresent());
// Target still uses V1 checkpoint format
assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
String finalCheckpoint = finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);

// Checkpoint should have advanced from the pre-upgrade checkpoint
assertTrue(Long.parseLong(finalCheckpoint) > Long.parseLong(checkpointBeforeUpgrade),
"Final checkpoint should be greater than checkpoint before upgrade");

// Verify target has correct number of commits
assertEquals(2, targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
"Target should have 2 commits (as its batches 3 source table commits into one target table commit)");

// Clean up
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
}

@ParameterizedTest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we extend the test to also upgrade source table to V9 as last step. and add few more commits to source table and do one round of validation.

@EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"})
public void testIncrementalSourceWithTargetTableUpgrade(HoodieTableVersion targetUpgradeVersion) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java docs similar to the other test would be nice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets fix the arg name similar to other test

// Create unique paths for both tables
String sourceTablePath = basePath + "/source_table_v6_target_upgrade_" + targetUpgradeVersion.versionCode();
String targetTablePath = basePath + "/target_table_v6_to_v" + targetUpgradeVersion.versionCode();

// Phase 1: Create source table at v6 with initial commits
HoodieDeltaStreamer.Config sourceConfig = TestHelpers.makeConfig(
sourceTablePath, WriteOperationType.BULK_INSERT);
sourceConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + HoodieTableVersion.SIX.versionCode());
sourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
sourceConfig.sourceLimit = 100;

// Initialize source table with first commit
HoodieDeltaStreamer sourceStreamer = new HoodieDeltaStreamer(sourceConfig, jsc);
sourceStreamer.sync();

// Add 2 more commits to source table (total 3 commits)
sourceConfig.operation = WriteOperationType.BULK_INSERT;
for (int i = 0; i < 2; i++) {
// Create fresh config to avoid conflicts
HoodieDeltaStreamer.Config newSourceConfig = TestHelpers.makeConfig(
sourceTablePath, WriteOperationType.BULK_INSERT);
newSourceConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
newSourceConfig.sourceLimit = 100;
sourceStreamer = new HoodieDeltaStreamer(newSourceConfig, jsc);
sourceStreamer.sync();
}

// Verify source has 3 commits and is at v6
HoodieTableMetaClient sourceMetaClient = HoodieTableMetaClient.builder()
.setConf(context.getStorageConf())
.setBasePath(sourceTablePath)
.build();
assertEquals(3, sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
assertEquals(HoodieTableVersion.SIX, sourceMetaClient.getTableConfig().getTableVersion());

// Phase 2: Setup target table at v6 and sync first 3 commits
HoodieDeltaStreamer.Config targetConfig = TestHelpers.makeConfigForHudiIncrSrc(
sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, false, null);
targetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + HoodieTableVersion.SIX.versionCode());
targetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
targetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");

// Sync all 3 commits from source to target
HoodieDeltaStreamer targetStreamer = new HoodieDeltaStreamer(targetConfig, jsc);
targetStreamer.sync();

// Verify checkpoint is established in V1 format
HoodieTableMetaClient targetMetaClient = HoodieTableMetaClient.builder()
.setConf(context.getStorageConf())
.setBasePath(targetTablePath)
.build();
HoodieInstant lastTargetInstant = targetMetaClient.getActiveTimeline().lastInstant().get();
Option<HoodieCommitMetadata> commitMetadata = HoodieClientTestUtils.getCommitMetadataForInstant(
targetMetaClient, lastTargetInstant);
assertTrue(commitMetadata.isPresent());
// Checkpoint should be in V1 format
assertTrue(commitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY_V1));
String checkpointBeforeUpgrade = commitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY_V1);

// Verify record counts match between source and target
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above test wrt data validation

long sourceRecordCountOriginal = sqlContext.read()
.format("org.apache.hudi")
.load(sourceTablePath)
.count();
long targetRecordCountOriginal = sqlContext.read()
.format("org.apache.hudi")
.load(targetTablePath)
.count();

assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal,
"Target should have all records from source");

// Phase 3: Upgrade target table from v6 to target version
HoodieDeltaStreamer.Config upgradeTargetConfig = TestHelpers.makeConfigForHudiIncrSrc(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to consider upgrade to v8, having some commit, and then upgrade to v9?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think am thinking to maybe just parametrize this test at least for the targetTable version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is end to end functional test which will add up to test run time.
So, I was trying to be cautious.
most folks will likely either be in V6 or V9. so, we can leave it as is for now.

sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, false, null);
upgradeTargetConfig.configs.add(HoodieWriteConfig.WRITE_TABLE_VERSION.key() + "=" + targetUpgradeVersion.versionCode());
upgradeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=true");
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
upgradeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
upgradeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow commit even with no new data to trigger upgrade

// This sync will trigger the upgrade of target table and create a new commit
targetStreamer = new HoodieDeltaStreamer(upgradeTargetConfig, jsc);
targetStreamer.sync();

// Verify target table is now at target version - create fresh metaclient after upgrade
targetMetaClient = HoodieTableMetaClient.builder()
.setConf(HoodieTestUtils.getDefaultStorageConf())
.setBasePath(targetTablePath)
.build();
assertEquals(targetUpgradeVersion, targetMetaClient.getTableConfig().getTableVersion());

// Phase 4: Add 2 more commits to source table (keeping it at v6)
for (int i = 0; i < 2; i++) {
HoodieDeltaStreamer.Config postTargetUpgradeConfig = TestHelpers.makeConfig(sourceTablePath, WriteOperationType.BULK_INSERT);
postTargetUpgradeConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
postTargetUpgradeConfig.sourceLimit = 100;
sourceStreamer = new HoodieDeltaStreamer(postTargetUpgradeConfig, jsc);
sourceStreamer.sync();
}

// Verify source now has 5 total commits and is still at v6
sourceMetaClient = HoodieTableMetaClient.reload(sourceMetaClient);
assertEquals(5, sourceMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
assertEquals(HoodieTableVersion.SIX, sourceMetaClient.getTableConfig().getTableVersion());

// Phase 5: Resume incremental sync from upgraded target table (now at target version)
// Create base config for resuming sync
HoodieDeltaStreamer.Config resumeTargetConfig = TestHelpers.makeConfigForHudiIncrSrc(
sourceTablePath, targetTablePath, WriteOperationType.BULK_INSERT, false, null);
resumeTargetConfig.configs.add(HoodieWriteConfig.AUTO_UPGRADE_VERSION.key() + "=false");
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.num_instants=3");
resumeTargetConfig.configs.add("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy=READ_UPTO_LATEST_COMMIT");
resumeTargetConfig.allowCommitOnNoCheckpointChange = true; // Allow commit even with no new data to trigger upgrade

// This should successfully pull all remaining commits from v6 source to upgraded target
targetStreamer = new HoodieDeltaStreamer(resumeTargetConfig, jsc);
targetStreamer.sync();

// Phase 6: Validate data integrity and checkpoint continuity
targetMetaClient.reloadActiveTimeline();
assertEquals(targetUpgradeVersion, targetMetaClient.getTableConfig().getTableVersion());

// Verify record counts match between source and target
long sourceRecordCount = sqlContext.read()
.format("org.apache.hudi")
.load(sourceTablePath)
.count();
long targetRecordCount = sqlContext.read()
.format("org.apache.hudi")
.load(targetTablePath)
.count();

assertEquals(sourceRecordCount, targetRecordCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets validate full data
you can cache the source data if need be to avoid triggering reads to hudi table everytime.

"Target should have all records from source despite version difference");

// Verify checkpoint was properly updated and migrated to V2 format after upgrade
HoodieInstant finalTargetInstant = targetMetaClient.getActiveTimeline().lastInstant().get();
Option<HoodieCommitMetadata> finalCommitMetadata = HoodieClientTestUtils.getCommitMetadataForInstant(
targetMetaClient, finalTargetInstant);
assertTrue(finalCommitMetadata.isPresent());

// The first time after upgrading, target checkpoint read is still in v1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, is this expected to be in V1 or V2?
and how are we validating the checkpoint format?

assertTrue(finalCommitMetadata.get().getExtraMetadata().containsKey(STREAMER_CHECKPOINT_KEY));
String finalCheckpoint = finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY);

// Checkpoint should have advanced from the pre-upgrade checkpoint
assertTrue(Long.parseLong(finalCheckpoint) > Long.parseLong(checkpointBeforeUpgrade),
"Final checkpoint should be greater than checkpoint before upgrade");

// Verify target has correct number of commits
assertEquals(3, targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(),
"Target should have 3 commits (as it batches source table commits into target table commits) + upgrade commit");

// Clean up
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move clean up to finally block

UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, sourceTablePath);
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath);
}

private Set<String> getAllFileIDsInTable(String tableBasePath, Option<String> partition) {
HoodieTableMetaClient metaClient = createMetaClient(jsc, tableBasePath);
final HoodieTableFileSystemView fsView = HoodieTableFileSystemView.fileListingBasedFileSystemView(context, metaClient, metaClient.getCommitsAndCompactionTimeline());
Expand Down
Loading