-
-
Notifications
You must be signed in to change notification settings - Fork 352
obstore delete_dir #3310
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
obstore delete_dir #3310
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3310 +/- ##
==========================================
+ Coverage 60.72% 60.74% +0.01%
==========================================
Files 78 78
Lines 9408 9417 +9
==========================================
+ Hits 5713 5720 +7
- Misses 3695 3697 +2
🚀 New features to boost your workflow:
|
cc @kylebarron |
src/zarr/storage/_obstore.py
Outdated
if prefix != "" and not prefix.endswith("/"): | ||
prefix += "/" | ||
|
||
keys = [(k,) async for k in self.list_prefix(prefix)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're not using the async iterator from the list, you could also just call metas = await obs.list(self.store, prefix=prefix).collect_async()
and then extract the path
from each dict. That might give a tiny bit less async overhead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the suggestion, seems good to me.
if prefix != "" and not prefix.endswith("/"): | ||
prefix += "/" | ||
|
||
metas = await obs.list(self.store, prefix).collect_async() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, you could potentially make this faster by not needing to wait until the list fully finishes to start deleting files. You could do a concurrent map over each batch returned from the list
. You have the tradeoff between needing to wait for the list
to fully finish before starting deletes vs needing to run concurrent_map
in batches instead of all at once. But note you can also customize the chunk_size
of list
I don't know in practice which approach is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My initial motivation for this approach was that listing upfront is certainly less complex than interleaving list/delete (granted, the difference is small), while being similarly performant, in theory.
I ran a basic benchmark (courtesy of claude) with this kind of impl and got these results (one run each):
- async.concurrency==1000: delete1 (6.9s) vs. delete2 (10.45s)
- async.concurrency==100: delete1 (8.18s) vs. delete2 (15.67s)
- async.concurrency==20: delete1 (32.37) vs. delete2 (30.61s)
async def delete_dir2(self, prefix: str) -> None:
# docstring inherited
import obstore as obs
self._check_writable()
if prefix != "" and not prefix.endswith("/"):
prefix += "/"
limit = config.get("async.concurrency")
async for chunk in obs.list(self.store, prefix, chunk_size=1000):
keys = [(x["path"],) for x in chunk]
await concurrent_map(keys, self.delete, limit=limit)
import zarr
import zarr.storage
import obstore as obs
import time
import asyncio
zarr.config.set({"async.concurrency": 1000})
TEST_BUCKET = "my-test-bucket"
remote = obs.store.S3Store(bucket=TEST_BUCKET)
store = zarr.storage.ObjectStore(remote)
async def fill_bucket(remote_store):
"""Fill bucket with 10000 files prefixed like 'c/0', 'c/1', etc."""
print("Filling bucket with 10000 files...")
async def put_file(i):
key = f"c/{i}"
data = f"test data for file {i}".encode("utf-8")
await obs.put_async(remote_store, key, data)
# Create tasks for concurrent uploads
tasks = []
for i in range(10000):
tasks.append(put_file(i))
if len(tasks) >= 100: # Process in batches of 100
await asyncio.gather(*tasks)
print(f"Created {i + 1} files...")
tasks = []
# Process remaining tasks
if tasks:
await asyncio.gather(*tasks)
print("Finished filling bucket with 10000 files")
async def test1():
await fill_bucket(remote)
start_time = time.time()
await store.delete_dir("c")
end_time = time.time()
print(f"delete_dir took {end_time - start_time:.2f} seconds")
async def test2():
await fill_bucket(remote)
start_time = time.time()
await store.delete_dir2("c")
end_time = time.time()
print(f"delete_dir2 took {end_time - start_time:.2f} seconds")
async def main():
print("Running test1 (delete_dir)...")
await test1()
print("\nRunning test2 (delete_dir2)...")
await test2()
if __name__ == "__main__":
asyncio.run(main())
Assuming the alternate impl looks about right, I'd be biased towards the current impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these results were a bit surprising. I reran after setting chunk_size=10_000
in the second impl, in which both impls would then have similar perf (~5s). So it seems like calling concurrent_map
as few times as possible is desirable. Though in effect, this strategy just converges to fetching all the keys upfront, it seems.
Closes #3309
This approach will eagerly load all keys staged for deletion. I think this is the simplest way to go about this, though let me know if another approach involving batching is more desirable.
TODO:
docs/user-guide/*.rst
changes/