-
Notifications
You must be signed in to change notification settings - Fork 26
[FSTORE-1617] Integrate delta-rs HopsFS writer to hsfs python client #583
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some feedback
python/hsfs/feature_group.py
Outdated
self._stream = True | ||
if time_travel_format is None: | ||
if engine.get_type() == "python": | ||
if not online_enabled and not self._stream: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if not online_enabled and not self._stream: | |
if not self._stream: |
Does having it online enabled change anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is for now we don't have a way to push data from offline to online. And delta-rs only write to offline. And according to Jim, the main use case for using delta is primarily fast write/read from offline only.
@@ -2478,8 +2477,15 @@ def __init__( | |||
else: | |||
# initialized by user | |||
# for python engine we always use stream feature group | |||
if engine.get_type() == "python": | |||
self._stream = True | |||
if time_travel_format is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if time_travel_format
is HUDI
wont it fail? since self._stream = True
would not be set.
if isinstance(engine.get_instance(), engine.spark.Engine): | ||
spark_session = engine.get_instance()._spark_session | ||
spark_context = engine.get_instance()._spark_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this work if engine is python?
for delta_vacuum you did:
if isinstance(engine.get_instance(), engine.spark.Engine):
spark_session = engine.get_instance()._spark_session
spark_context = engine.get_instance()._spark_context
else:
spark_session = None
spark_context = None
@@ -1487,6 +1488,63 @@ def test_save_dataframe_stream(self, mocker): | |||
assert mock_python_engine_write_dataframe_kafka.call_count == 1 | |||
assert mock_python_engine_legacy_save_dataframe.call_count == 0 | |||
|
|||
def test_save_dataframe_delta_time_travel_format(self, mocker): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will need workflow tests + havent checked, but i think we should have a test to make sure that if no time travel format or stream is not specified we should pick hudi and have it stream enabled by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we have the workflow test test_feature_pipeline.py
https://github.com/logicalclocks/loadtest/pull/609
This PR adds/fixes/changes...
JIRA Issue: -
Priority for Review: -
Related PRs: -
How Has This Been Tested?
Checklist For The Assigned Reviewer: