Skip to content
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 99 additions & 42 deletions src/nested_pandas/nestedframe/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
FSSPEC_FILESYSTEMS = ("http", "https")
FSSPEC_BLOCK_SIZE = 32 * 1024

# Filesystems for which calling .is_dir() may be very slow and/or .iterdir()
# may yield non-parquet paths.
NO_ITERDIR_FILESYSTEMS = (
"http",
"https",
)


def read_parquet(
data: str | UPath | bytes,
Expand All @@ -42,13 +49,16 @@ def read_parquet(
it can be a single file name, directory name, or a remote path
(e.g., HTTP/HTTPS or S3). If a file-like object is passed, it
must support the ``read`` method. You can also pass a
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which will
be passed along to the underlying file-reading method.
A file URL can also be a path to a directory that contains multiple
``filesystem`` keyword argument with a ``pyarrow.fs`` object, which
will be passed along to the underlying file-reading method.
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables`` or ``s3://bucket/partition_dir``.
If the path is to a single Parquet file, it will be loaded using
``file://localhost/path/to/tables/`` or ``s3://bucket/partition_dir/``
(note trailing slash for web locations, since it may be expensive
to test a path for being a directory). Directory reading is not
supported for HTTP(S). If the path is to a single Parquet file, it will
be loaded using
``fsspec.parquet.open_parquet_file``, which has optimized handling
for remote Parquet files.
columns : list, default=None
Expand Down Expand Up @@ -112,41 +122,7 @@ def read_parquet(
elif isinstance(reject_nesting, str):
reject_nesting = [reject_nesting]

# For single Parquet file paths, we want to use
# `fsspec.parquet.open_parquet_file`. But for any other usage
# (which includes file-like objects, directories and lists
# thereof), we want to defer to `pq.read_table`.

# At the end of this block, `table` will contain the data.

# NOTE: the test for _is_local_dir is sufficient, because we're
# preserving a path to pq.read_table, which can read local
# directories, but not remote directories. Remote directories
# cannot be read by either of these methods.
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
storage_options = _get_storage_options(path_to_data)
filesystem = kwargs.get("filesystem")
if not filesystem:
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
with fsspec.parquet.open_parquet_file(
str(path_to_data),
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
table = pq.read_table(parquet_file, columns=columns, **kwargs)
else:
# All other cases, including file-like objects, directories, and
# even lists of the foregoing.

# If `filesystem` is specified - use it, passing it as part of **kwargs
if kwargs.get("filesystem") is not None:
table = pq.read_table(data, columns=columns, **kwargs)
else:
# Otherwise convert with a special function
data, filesystem = _transform_read_parquet_data_arg(data)
table = pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)
table = _read_parquet_into_table(data, columns, **kwargs)

# Resolve partial loading of nested structures
# Using pyarrow to avoid naming conflicts from partial loading ("flux" vs "lc.flux")
Expand Down Expand Up @@ -206,14 +182,95 @@ def read_parquet(
return from_pyarrow(table, reject_nesting=reject_nesting, autocast_list=autocast_list)


def _is_local_dir(path_to_data: UPath):
def _read_parquet_into_table(
data: str | UPath | bytes,
columns: list[str] | None,
**kwargs,
) -> pa.Table:
# For single Parquet file paths, we want to use
# `fsspec.parquet.open_parquet_file`. But for any other usage
# (which includes file-like objects, local directories and lists
# thereof), we want to defer to `pq.read_table`.

# NOTE: the test for _is_local_dir is sufficient, because we're
# preserving a path to pq.read_table, which can read local
# directories, but not remote directories. Remote directories
# are supported separately via _read_parquet_directory.
# We don't support HTTP "directories", because 1) calling .is_dir()
# may be very expensive, because it downloads content first,
# 2) because .iter_dir() is likely to return a lot of "junk"
# besides of the actual parquet files.
if isinstance(data, str | Path | UPath) and not _is_local_dir(path_to_data := UPath(data)):
storage_options = _get_storage_options(path_to_data)
filesystem = kwargs.get("filesystem")
if not filesystem:
_, filesystem = _transform_read_parquet_data_arg(path_to_data)
# Will not detect HTTP(S) directories.
if _is_remote_dir(data, path_to_data):
return _read_remote_parquet_directory(
path_to_data, filesystem, storage_options, columns, **kwargs
)
with fsspec.parquet.open_parquet_file(
path_to_data.path,
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
return pq.read_table(parquet_file, columns=columns, **kwargs)

# All other cases, including file-like objects, directories, and
# even lists of the foregoing.

# If `filesystem` is specified - use it, passing it as part of **kwargs
if kwargs.get("filesystem") is not None:
return pq.read_table(data, columns=columns, **kwargs)

# Otherwise convert with a special function
data, filesystem = _transform_read_parquet_data_arg(data)
return pq.read_table(data, filesystem=filesystem, columns=columns, **kwargs)


def _is_local_dir(upath: UPath) -> bool:
"""Returns True if the given path refers to a local directory.

It's necessary to have this function, rather than simply checking
``UPath(p).is_dir()``, because ``UPath.is_dir`` can be quite
expensive in the case of a remote file path that isn't a directory.
"""
return path_to_data.protocol in ("", "file") and path_to_data.is_dir()
return upath.protocol in ("", "file") and upath.is_dir()


def _is_remote_dir(orig_data: str | Path | UPath, upath: UPath) -> bool:
# Iterating over HTTP(S) directories is very difficult, let's just not do that.
if upath.protocol in NO_ITERDIR_FILESYSTEMS:
return False
if str(orig_data).endswith("/"):
return True
return upath.is_dir()


def _read_remote_parquet_directory(
dir_upath: UPath, filesystem, storage_options, columns: list[str] | None, **kwargs
) -> pa.Table:
"""Read files one-by-one with fsspec.open_parquet_file and concat the result"""
tables = []
for upath in dir_upath.iterdir():
# Go recursively for filesystems which support file/directory identification with fsspec file
# handlers. This would work for e.g. S3, but not for HTTP(S).
if _is_remote_dir(upath, upath):
table = _read_remote_parquet_directory(upath, filesystem, storage_options, columns, **kwargs)
else:
with fsspec.parquet.open_parquet_file(
upath.path,
columns=columns,
storage_options=storage_options,
fs=filesystem,
engine="pyarrow",
) as parquet_file:
table = pq.read_table(parquet_file, columns=columns, **kwargs)
tables.append(table)
return pa.concat_tables(tables)


def _get_storage_options(path_to_data: UPath):
Expand Down