Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 83 additions & 40 deletions src/nested_pandas/nestedframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ def read_parquet(
it can be a single file name, directory name, or a remote path
(e.g., HTTP/HTTPS or S3). If a file-like object is passed, it
must support the ``read`` method. You can also pass a
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which will
be passed along to the underlying file-reading method.
A file URL can also be a path to a directory that contains multiple
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which
will be passed along to the underlying file-reading method.
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
If the path is to a single Parquet file, it will be loaded using
``file://localhost/path/to/tables/`` or ``s3://bucket/partition_dir/``
(note trailing slash for web locations, since it may be expensive
to test a path for being a directory). If the path is to a single
Parquet file, it will be loaded using
``fsspec.parquet.open_parquet_file``, which has optimized handling
for remote Parquet files.
columns : list, default=None
Expand Down Expand Up @@ -112,41 +114,7 @@ def read_parquet(
elif isinstance(reject_nesting, str):
reject_nesting = [reject_nesting]

# For single Parquet file paths, we want to use
# `fsspec.parquet.open_parquet_file`. But for any other usage
# (which includes file-like objects, directories and lists
# thereof), we want to defer to `pq.read_table`.

# At the end of this block, `table` will contain the data.

# NOTE: the test for _is_local_dir is sufficient, because we're
# preserving a path to pq.read_table, which can read local
# directories, but not remote directories. Remote directories
# cannot be read by either of these methods.
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
storage_options = _get_storage_options(path_to_data)
filesystem = kwargs.get("filesystem")
if not filesystem:
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
with fsspec.parquet.open_parquet_file(
str(path_to_data),
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
table = pq.read_table(parquet_file, columns=columns, **kwargs)
else:
# All other cases, including file-like objects, directories, and
# even lists of the foregoing.

# If `filesystem` is specified - use it, passing it as part of **kwargs
if kwargs.get("filesystem") is not None:
table = pq.read_table(data, columns=columns, **kwargs)
else:
# Otherwise convert with a special function
data, filesystem = _transform_read_parquet_data_arg(data)
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
table = _read_parquet_into_table(data, columns, **kwargs)

# Resolve partial loading of nested structures
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
Expand Down Expand Up @@ -206,6 +174,51 @@ def read_parquet(
return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list)


def _read_parquet_into_table(
data: str | UPath | bytes,
columns: list[str] | None,
**kwargs,
) -> pa.Table:
# For single Parquet file paths, we want to use
# `fsspec.parquet.open_parquet_file`. But for any other usage
# (which includes file-like objects, local directories and lists
# thereof), we want to defer to `pq.read_table`.

# NOTE: the test for _is_local_dir is sufficient, because we're
# preserving a path to pq.read_table, which can read local
# directories, but not remote directories. Remote directories
# are supported separately via _read_parquet_directory
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
storage_options = _get_storage_options(path_to_data)
filesystem = kwargs.get("filesystem")
if not filesystem:
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
# Check original string, because UPath may chomp trailing "/"
if str(data).endswith("/"):
_read_remote_parquet_directory(path_to_data, filesystem, storage_options, columns, **kwargs)
with fsspec.parquet.open_parquet_file(
path_to_data.path,
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
if _is_fh_a_dir(parquet_file):
_read_remote_parquet_directory(path_to_data, filesystem, storage_options, columns, **kwargs)
return pq.read_table(parquet_file, columns=columns, **kwargs)

# All other cases, including file-like objects, directories, and
# even lists of the foregoing.

# If `filesystem` is specified - use it, passing it as part of **kwargs
if kwargs.get("filesystem") is not None:
return pq.read_table(data, columns=columns, **kwargs)

# Otherwise convert with a special function
data, filesystem = _transform_read_parquet_data_arg(data)
return pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)


def _is_local_dir(path_to_data: UPath):
"""Returns True if the given path refers to a local directory.

Expand All @@ -216,6 +229,36 @@ def _is_local_dir(path_to_data: UPath):
return path_to_data.protocol in ("", "file") and path_to_data.is_dir()


def _is_fh_a_dir(reader):
"""Check if fsspec.parquet.open_parquet_file output tells us that path is a directory"""
if not hasattr(reader, "details"):
return False
if "type" not in reader.details:
return False
return reader.details["type"] == "directory"


def _read_remote_parquet_directory(
dir_upath: UPath, filesystem, storage_options, columns: list[str] | None, **kwargs
) -> pa.Table:
"""Read files one-by-one with fsspec.open_parquet_file and concat the result"""
tables = []
for upath in dir_upath.iterdir():
with fsspec.parquet.open_parquet_file(
upath.path,
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
# Go recursively for supported filesystems, etc. for S3, but not for HTTP(S).
if _is_fh_a_dir(parquet_file):
_read_remote_parquet_directory(upath, columns, **kwargs)
table = pq.read_table(parquet_file, columns=columns, **kwargs)
tables.append(table)
return pa.concat_tables(tables)


def _get_storage_options(path_to_data: UPath):
"""Get storage options for fsspec.parquet.open_parquet_file.

Expand Down