-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53323][CONNECT] Support df.asTable() for Arrow UDTF in Spark Connect #52320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-53323][CONNECT] Support df.asTable() for Arrow UDTF in Spark Connect #52320
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise, LGTM.
def test_arrow_udtf_with_table_argument_basic(self): | ||
super().test_arrow_udtf_with_table_argument_basic() | ||
|
||
# TODO(SPARK-53323): Support table arguments in Spark Connect Arrow UDTFs | ||
@unittest.skip("asTable() is not supported in Spark Connect") | ||
def test_arrow_udtf_with_table_argument_and_scalar(self): | ||
super().test_arrow_udtf_with_table_argument_and_scalar() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove these and add pass
on the class body.
flattened_arrays = struct.flatten() | ||
field_names = [field.name for field in struct.type] | ||
flattened_batch = pa.RecordBatch.from_arrays( | ||
struct.flatten(), schema=pa.schema(struct.type) | ||
flattened_arrays, names=field_names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, but why it's working for Spark classic but not Spark Connect? This code path should be shared by both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked the generated schema from both with pyarrow 15.0.2
and 20.0.0
:
print(f"{flattened_batch.schema} <=====> {pa.schema(struct.type)}")
and saw from the tests:
id: int64 <=====> id: int64 not null
or
id: int64 <=====> id: int64
Seems like the original schema is more accurate?
What changes were proposed in this pull request?
This PR supports df.asTable for Arrow UDTF in spark connect by correcting the schema creation in ArrowStreamArrowUDTFSerializer.load_stream(). The original code was incorrectly using pa.schema(struct.type) which created a schema with the entire struct as a single field, instead of extracting the individual field names with [field.name for field in struct.type] to properly create a RecordBatch from the flattened arrays.
This ensures table arguments are passed as proper pa.RecordBatch objects to Arrow UDTFs
Why are the changes needed?
This is a fix
Does this PR introduce any user-facing change?
No
How was this patch tested?
unit tests
Was this patch authored or co-authored using generative AI tooling?
No