Skip to content

obstore delete_dir #3310

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/3310.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add obstore implementation of delete_dir.
13 changes: 13 additions & 0 deletions src/zarr/storage/_obstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Store,
SuffixByteRequest,
)
from zarr.core.common import concurrent_map
from zarr.core.config import config

if TYPE_CHECKING:
Expand Down Expand Up @@ -196,6 +197,18 @@ async def delete(self, key: str) -> None:
with contextlib.suppress(FileNotFoundError):
await obs.delete_async(self.store, key)

async def delete_dir(self, prefix: str) -> None:
# docstring inherited
import obstore as obs

self._check_writable()
if prefix != "" and not prefix.endswith("/"):
prefix += "/"

metas = await obs.list(self.store, prefix).collect_async()
Copy link
Contributor

@kylebarron kylebarron Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, you could potentially make this faster by not needing to wait until the list fully finishes to start deleting files. You could do a concurrent map over each batch returned from the list. You have the tradeoff between needing to wait for the list to fully finish before starting deletes vs needing to run concurrent_map in batches instead of all at once. But note you can also customize the chunk_size of list

I don't know in practice which approach is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial motivation for this approach was that listing upfront is certainly less complex than interleaving list/delete (granted, the difference is small), while being similarly performant, in theory.

I ran a basic benchmark (courtesy of claude) with this kind of impl and got these results (one run each):

  • async.concurrency==1000: delete1 (6.9s) vs. delete2 (10.45s)
  • async.concurrency==100: delete1 (8.18s) vs. delete2 (15.67s)
  • async.concurrency==20: delete1 (32.37) vs. delete2 (30.61s)
    async def delete_dir2(self, prefix: str) -> None:
        # docstring inherited
        import obstore as obs

        self._check_writable()
        if prefix != "" and not prefix.endswith("/"):
            prefix += "/"

        limit = config.get("async.concurrency")
        async for chunk in obs.list(self.store, prefix, chunk_size=1000):
            keys = [(x["path"],) for x in chunk]
            await concurrent_map(keys, self.delete, limit=limit)
import zarr
import zarr.storage
import obstore as obs
import time
import asyncio

zarr.config.set({"async.concurrency": 1000})

TEST_BUCKET = "my-test-bucket"

remote = obs.store.S3Store(bucket=TEST_BUCKET)
store = zarr.storage.ObjectStore(remote)


async def fill_bucket(remote_store):
    """Fill bucket with 10000 files prefixed like 'c/0', 'c/1', etc."""
    print("Filling bucket with 10000 files...")

    async def put_file(i):
        key = f"c/{i}"
        data = f"test data for file {i}".encode("utf-8")
        await obs.put_async(remote_store, key, data)

    # Create tasks for concurrent uploads
    tasks = []
    for i in range(10000):
        tasks.append(put_file(i))
        if len(tasks) >= 100:  # Process in batches of 100
            await asyncio.gather(*tasks)
            print(f"Created {i + 1} files...")
            tasks = []

    # Process remaining tasks
    if tasks:
        await asyncio.gather(*tasks)
        print("Finished filling bucket with 10000 files")


async def test1():
    await fill_bucket(remote)
    start_time = time.time()
    await store.delete_dir("c")
    end_time = time.time()
    print(f"delete_dir took {end_time - start_time:.2f} seconds")


async def test2():
    await fill_bucket(remote)
    start_time = time.time()
    await store.delete_dir2("c")
    end_time = time.time()
    print(f"delete_dir2 took {end_time - start_time:.2f} seconds")


async def main():
    print("Running test1 (delete_dir)...")
    await test1()

    print("\nRunning test2 (delete_dir2)...")
    await test2()


if __name__ == "__main__":
    asyncio.run(main())

Assuming the alternate impl looks about right, I'd be biased towards the current impl.

Copy link
Contributor Author

@slowjazz slowjazz Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these results were a bit surprising. I reran after setting chunk_size=10_000 in the second impl, in which both impls would then have similar perf (~5s). So it seems like calling concurrent_map as few times as possible is desirable. Though in effect, this strategy just converges to fetching all the keys upfront, it seems.

keys = [(m["path"],) for m in metas]
await concurrent_map(keys, self.delete, limit=config.get("async.concurrency"))

@property
def supports_partial_writes(self) -> bool:
# docstring inherited
Expand Down
11 changes: 11 additions & 0 deletions tests/test_store/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,17 @@ async def test_store_getsize_prefix(self, store: ObjectStore) -> None:
total_size = await store.getsize_prefix("c")
assert total_size == len(buf) * 2

async def test_store_delete(self, store: ObjectStore) -> None:
assert store.supports_deletes
buf = cpu.Buffer.from_bytes(b"\x01\x02\x03\x04")
await store.set("foo/1", buf)
await store.set("foo/2", buf)
await store.delete("foo/1")
assert not await store.exists("foo/1")
assert await store.exists("foo/2")
await store.delete_dir("foo") # FileNotFoundErrors are suppressed
assert not await store.exists("foo/2")


@pytest.mark.slow_hypothesis
def test_zarr_hierarchy():
Expand Down
Loading