Skip to content

feat(yaml): add schema unification for Flatten transform #35672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from

Conversation

liferoad
Copy link
Contributor

Implement schema merging for Flatten transform to handle PCollections with different schemas. The unified schema contains all fields from input PCollections, making fields optional to handle missing values. Added a test case to verify the behavior.

Fixes #35666


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Implement schema merging for Flatten transform to handle PCollections with different schemas. The unified schema contains all fields from input PCollections, making fields optional to handle missing values. Added a test case to verify the behavior.
liferoad added 5 commits July 23, 2025 18:43
…nal types

Extract inner types from Optional when unifying schemas to properly handle type unions. Also improve code readability by breaking long lines and clarifying comments.
Fix type resolution for nested generic types by properly extracting inner types when comparing field types. This ensures correct type hints are generated for optional fields in YAML provider.
…ma unification

Handle list types more carefully during schema unification to avoid unsupported Union types. Also ensure iterable values are properly converted to lists when needed for schema compatibility.
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

liferoad added 3 commits July 24, 2025 08:05
… tests

add comprehensive test cases for schema unification in Flatten transform

# Merge all field names and types, making them optional
all_fields = {}
for schema in schemas:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that there aren't conflicting types here (e.g. pcoll1 wants 'foo': int, pcoll2 wants 'foo': str)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally this would yield a Union type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the tests below to validate this works with _unify_field_types by treating them as Optional[Any] to simply the logics for Flatten given the Union could be a very long list (e.g., Optional[Union[int, str, list,....]]). Probably very hard to handle the nested structures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to handle nested structures? Could we just say given:

pcoll1: {'foo': TypeA}
pcoll2: {'foo': TypeB}

outPcoll: {'foo': Union[TypeA, TypeB]}

and ignore the nested representations?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_unify_field_types for now at least does not use Union. Whenever two types are different, it uses Optional[Any]. I have some bit concerns about how accurate we need to infer the schema (e.g., stop at the list level like what you suggest or just do the simplest one my PR uses). I also think we should support specifying the schema and then it will make no sense for us to unify the schemas with our rules.

Copy link
Contributor

Assigning reviewers:

R: @claudevdm for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

Important

Installation incomplete: to start using Gemini Code Assist, please ask the organization owner(s) to visit the Gemini Code Assist Admin Console and sign the Terms of Services.


# Merge all field names and types, making them optional
all_fields = {}
for schema in schemas:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need to handle nested structures? Could we just say given:

pcoll1: {'foo': TypeA}
pcoll2: {'foo': TypeB}

outPcoll: {'foo': Union[TypeA, TypeB]}

and ignore the nested representations?

Comment on lines +938 to +949
existing_inner = (
existing_type.__args__[0] if hasattr(existing_type, '__args__') and
len(existing_type.__args__) == 1 else existing_type)
field_inner = (
field_type.__args__[0] if hasattr(field_type, '__args__') and
len(field_type.__args__) == 1 else field_type)

# Handle type unification more carefully
if existing_inner == Any or field_inner == Any:
return Optional[Any]
elif existing_inner == field_inner:
return Optional[existing_inner]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this logic unify Iterable[str], str to Optional[str] since Dict also has args of length 1? I think we want to actually check if the outer type is Optional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current way is to prioritize List, so it should be Optional[Iterable[str]]. _unify_element_with_schema does this conversion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see - could you add a comment here explaining that this function expects all iterables to already be coerced to lists? I agree we do that correctly below, but it is probably a little brittle to rely on this without being explicit about the API

Comment on lines +969 to +974
elif existing_is_list:
# Existing is list, keep it as list type
return Optional[existing_inner]
elif field_is_list:
# New field is list, use list type
return Optional[field_inner]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not just Optional[Union[existing_inner, field_inner]]? Isn't either list or single element valid?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see we're trying to avoid Union types. Probably this just needs to be encoded as Any then, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what line 978 does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but if I'm unifying List[int] and int, right now it unifies to Optional[List[int]], right? But that isn't right if I'm flattening {foo: 1} and {foo: [1,2,3]}

existing_type, field_type)
else:
# Make field optional since not all PCollections may have it
all_fields[field_name] = Optional[field_type]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep track of when one of these schema difference conditions is hit and warn?

@liferoad
Copy link
Contributor Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

An excellent addition of schema unification for the Flatten transform. The implementation is well-structured and the accompanying tests are thorough, covering a good range of scenarios.

I've identified a critical issue in the type unification logic that could lead to incorrect schema inference. I've provided a detailed explanation and a suggested replacement for the affected method. Additionally, I've included a minor suggestion to improve the readability of the new test code by reducing duplication.

Once the critical issue is addressed, this will be a solid contribution.

Comment on lines +935 to +978
def _unify_field_types(self, existing_type, field_type):
"""Unify two field types, handling Optional and List types."""
# Extract inner types from Optional if needed
existing_inner = (
existing_type.__args__[0] if hasattr(existing_type, '__args__') and
len(existing_type.__args__) == 1 else existing_type)
field_inner = (
field_type.__args__[0] if hasattr(field_type, '__args__') and
len(field_type.__args__) == 1 else field_type)

# Handle type unification more carefully
if existing_inner == Any or field_inner == Any:
return Optional[Any]
elif existing_inner == field_inner:
return Optional[existing_inner]
else:
# Check for list types and prioritize them over other types
from apache_beam.typehints import typehints as th
existing_is_list = (
hasattr(existing_inner, '__origin__') and
existing_inner.__origin__ in (list, th.List))
field_is_list = (
hasattr(field_inner, '__origin__') and
field_inner.__origin__ in (list, th.List))

if existing_is_list and field_is_list:
# Both are list types, unify their element types
existing_elem = existing_inner.__args__[
0] if existing_inner.__args__ else Any
field_elem = field_inner.__args__[0] if field_inner.__args__ else Any
if existing_elem == field_elem:
return Optional[th.List[existing_elem]]
else:
return Optional[th.List[Any]]
elif existing_is_list:
# Existing is list, keep it as list type
return Optional[existing_inner]
elif field_is_list:
# New field is list, use list type
return Optional[field_inner]
else:
# Neither is a list, use Any to avoid unsupported Union
# types in schema translation
return Optional[Any]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation of _unify_field_types has several issues that can lead to incorrect schema unification and potential runtime errors:

  1. The logic to extract inner types from Optional is incorrect. It uses len(type.__args__) == 1, which is false for typing.Optional[T] (which is an alias for Union[T, NoneType], having 2 type arguments).
  2. The same logic incorrectly treats list-like types such as th.List[T] as their element type T, because th.List[T].__args__ has a length of 1.
  3. The *_is_list checks are unreliable because they operate on these incorrectly "unwrapped" types.

This can lead to incorrect schema inference, for example, treating a list field as a primitive, or failing to correctly unify list types.

I suggest replacing the method with a more robust implementation using typing.get_origin and typing.get_args for type introspection. This will correctly handle Optional types and list types.

from typing import get_origin, get_args

    def _unify_field_types(self, existing_type, field_type):
      """Unify two field types, handling Optional and List types."""

      existing_origin = get_origin(existing_type) or existing_type
      field_origin = get_origin(field_type) or field_type

      existing_inner = get_args(existing_type)[0] if existing_origin is Optional else existing_type
      field_inner = get_args(field_type)[0] if field_origin is Optional else field_type

      if existing_inner == Any or field_inner == Any:
        return Optional[Any]
      elif existing_inner == field_inner:
        return Optional[existing_inner]
      else:
        existing_is_list = existing_origin in (list, th.List)
        field_is_list = field_origin in (list, th.List)

        if existing_is_list and field_is_list:
          existing_elem = get_args(existing_inner)[0] if get_args(existing_inner) else Any
          field_elem = get_args(field_inner)[0] if get_args(field_inner) else Any
          if existing_elem == field_elem:
            return Optional[th.List[existing_elem]]
          else:
            return Optional[th.List[Any]]
        elif existing_is_list:
          return Optional[existing_inner]
        elif field_is_list:
          return Optional[field_inner]
        else:
          return Optional[Any]

Comment on lines +681 to +709
def check_result(actual):
expected_ids = {1, 2, 3, 4, 5}
actual_ids = {
getattr(row, 'id', row.get('id') if hasattr(row, 'get') else None)
for row in actual
}
assert actual_ids == expected_ids, (
f"Expected IDs {expected_ids}, got {actual_ids}")

# Check that all rows have required fields
for row in actual:
row_id = getattr(
row, 'id', row.get('id') if hasattr(row, 'get') else None)
name = getattr(
row, 'name', row.get('name') if hasattr(row, 'get') else None)
assert row_id is not None, f"Missing id field in row {row}"
assert name is not None, f"Missing name field in row {row}"
# Optional fields should be present but may be None/empty
price = getattr(
row, 'price', row.get('price') if hasattr(row, 'get') else None)
categories = getattr(
row,
'categories',
row.get('categories') if hasattr(row, 'get') else None)
assert price is not None or row_id == 3, \
f"Missing price field in row {row}"
assert categories is not None or row_id == 4, \
f"Missing categories field in row {row}"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check_result function contains repetitive logic to access fields from row objects, which can be either beam.Row or dict. This makes the code harder to read and maintain.

To improve this, you can introduce a helper function within check_result to abstract away the field access logic. This will reduce duplication and make the test assertions cleaner and more readable.

Suggested change
def check_result(actual):
expected_ids = {1, 2, 3, 4, 5}
actual_ids = {
getattr(row, 'id', row.get('id') if hasattr(row, 'get') else None)
for row in actual
}
assert actual_ids == expected_ids, (
f"Expected IDs {expected_ids}, got {actual_ids}")
# Check that all rows have required fields
for row in actual:
row_id = getattr(
row, 'id', row.get('id') if hasattr(row, 'get') else None)
name = getattr(
row, 'name', row.get('name') if hasattr(row, 'get') else None)
assert row_id is not None, f"Missing id field in row {row}"
assert name is not None, f"Missing name field in row {row}"
# Optional fields should be present but may be None/empty
price = getattr(
row, 'price', row.get('price') if hasattr(row, 'get') else None)
categories = getattr(
row,
'categories',
row.get('categories') if hasattr(row, 'get') else None)
assert price is not None or row_id == 3, \
f"Missing price field in row {row}"
assert categories is not None or row_id == 4, \
f"Missing categories field in row {row}"
def check_result(actual):
def get_field(row, field):
return getattr(row, field, row.get(field) if hasattr(row, 'get') else None)
expected_ids = {1, 2, 3, 4, 5}
actual_ids = {get_field(row, 'id') for row in actual}
assert actual_ids == expected_ids, (
f"Expected IDs {expected_ids}, got {actual_ids}")
for row in actual:
row_id = get_field(row, 'id')
name = get_field(row, 'name')
assert row_id is not None, f"Missing id field in row {row}"
assert name is not None, f"Missing name field in row {row}"
price = get_field(row, 'price')
categories = get_field(row, 'categories')
assert price is not None or row_id == 3, \
f"Missing price field in row {row}"
assert categories is not None or row_id == 4, \
f"Missing categories field in row {row}"

@liferoad
Copy link
Contributor Author

switched to #35728 to simply the logics.

@liferoad liferoad closed this Jul 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: YAML Flatten incorrectly drops fields when input PCollections' schema are different
2 participants