-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add functional compat tests for Incremental Source #13954
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@linliu-code can you take a look? |
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Outdated
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Outdated
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Outdated
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Show resolved
Hide resolved
"Target should have all records from source"); | ||
|
||
// Phase 3: Upgrade target table from v6 to v9 | ||
HoodieDeltaStreamer.Config upgradeTargetConfig = TestHelpers.makeConfigForHudiIncrSrc( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to consider upgrade to v8, having some commit, and then upgrade to v9?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think am thinking to maybe just parametrize this test at least for the targetTable version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is end to end functional test which will add up to test run time.
So, I was trying to be cautious.
most folks will likely either be in V6 or V9. so, we can leave it as is for now.
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Outdated
Show resolved
Hide resolved
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Show resolved
Hide resolved
String finalCheckpoint = finalCommitMetadata.get().getExtraMetadata().get(STREAMER_CHECKPOINT_KEY); | ||
|
||
// Checkpoint should have advanced from the pre-upgrade checkpoint | ||
assertNotEquals(checkpointBeforeUpgrade, finalCheckpoint, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we check if finalCheckpoint
should be larger than checkpointBeforeUpgrade
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me add that
.load(targetTablePath) | ||
.count(); | ||
|
||
assertEquals(sourceRecordCountOriginal, targetRecordCountOriginal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we are doing bulk_insert, should be easy to do entire data equality right?
just drop meta fields from both table and compare the dataframes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok will do a df equality comparison to ensure data is same.
*/ | ||
@ParameterizedTest | ||
@EnumSource(value = HoodieTableVersion.class, names = {"EIGHT", "NINE"}) | ||
public void testIncrementalSourceWithSourceTableUpgrade(HoodieTableVersion targetUpgradeVersion) throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets rename the arg to sourceTableFinalTableVersion
its bit confusing as of now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do so
...utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
Show resolved
Hide resolved
targetMetaClient.reloadActiveTimeline(); | ||
assertEquals(HoodieTableVersion.SIX, targetMetaClient.getTableConfig().getTableVersion()); | ||
|
||
// Verify record counts match between source and target |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see the test is very monolith or rather fatter.
can we de-compose this into multiple smaller methods and re-use.
for eg, ingestion 100 records across 3 commits should be a private method.
- comparing num commits in timeline
- Comparing versions for a given table
- comparing data equality across source and target tables.
all these should be moved to private method and should be re-used
.load(targetTablePath) | ||
.count(); | ||
|
||
assertEquals(sourceRecordCount, targetRecordCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data equality checks please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will make this a util and call it in several places
.load(targetTablePath) | ||
.count(); | ||
|
||
assertEquals(sourceRecordCount, targetRecordCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets validate full data
you can cache the source data if need be to avoid triggering reads to hudi table everytime.
targetMetaClient, finalTargetInstant); | ||
assertTrue(finalCommitMetadata.isPresent()); | ||
|
||
// The first time after upgrading, target checkpoint read is still in v1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, is this expected to be in V1 or V2?
and how are we validating the checkpoint format?
assertEquals(3, targetMetaClient.getActiveTimeline().getCommitsTimeline().countInstants(), | ||
"Target should have 3 commits (as it batches source table commits into target table commits) + upgrade commit"); | ||
|
||
// Clean up |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move clean up to finally block
/** | ||
* Test incremental source functionality when source table is upgraded from v6 to v8/v9 | ||
* while target table remains at v6. This validates backward compatibility for cross-version | ||
* incremental sync scenarios. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we extend the test to also upgrade target table to V9 as last step. and add few more commits to source table and do one round of validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok will try making that change.
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, targetTablePath); | ||
} | ||
|
||
@ParameterizedTest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we extend the test to also upgrade source table to V9 as last step. and add few more commits to source table and do one round of validation.
Describe the issue this Pull Request addresses
Summary and Changelog
Impact
Risk Level
Documentation Update
Contributor's checklist