From 8f0f85f0d296f2f47add38272ac7566c51baf5f6 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 5 Feb 2024 16:33:59 +0900 Subject: [PATCH 1/2] put_file: support concurrent multipart uploads with max_concurrency --- s3fs/core.py | 91 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 72 insertions(+), 19 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index 11053005..f17c7573 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -235,6 +235,12 @@ class S3FileSystem(AsyncFileSystem): session : aiobotocore AioSession object to be used for all connections. This session will be used inplace of creating a new session inside S3FileSystem. For example: aiobotocore.session.AioSession(profile='test_user') + max_concurrency : int (None) + If given, the maximum number of concurrent transfers to use for a + multipart upload. Defaults to 1 (multipart uploads will be done sequentially). + Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)`` + the result will be a maximum of ``max_concurrency * batch_size`` concurrent + transfers. The following parameters are passed on to fsspec: @@ -282,6 +288,7 @@ def __init__( cache_regions=False, asynchronous=False, loop=None, + max_concurrency=None, **kwargs, ): if key and username: @@ -319,6 +326,7 @@ def __init__( self.cache_regions = cache_regions self._s3 = None self.session = session + self.max_concurrency = max_concurrency @property def s3(self): @@ -1140,7 +1148,13 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs): self.invalidate_cache(path) async def _put_file( - self, lpath, rpath, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, **kwargs + self, + lpath, + rpath, + callback=_DEFAULT_CALLBACK, + chunksize=50 * 2**20, + max_concurrency=None, + **kwargs, ): bucket, key, _ = self.split_path(rpath) if os.path.isdir(lpath): @@ -1169,24 +1183,15 @@ async def _put_file( mpu = await self._call_s3( "create_multipart_upload", Bucket=bucket, Key=key, **kwargs ) - - out = [] - while True: - chunk = f0.read(chunksize) - if not chunk: - break - out.append( - await self._call_s3( - "upload_part", - Bucket=bucket, - PartNumber=len(out) + 1, - UploadId=mpu["UploadId"], - Body=chunk, - Key=key, - ) - ) - callback.relative_update(len(chunk)) - + out = await self._upload_part_concurrent( + bucket, + key, + mpu, + f0, + callback=callback, + chunksize=chunksize, + max_concurrency=max_concurrency, + ) parts = [ {"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out) ] @@ -1201,6 +1206,54 @@ async def _put_file( self.invalidate_cache(rpath) rpath = self._parent(rpath) + async def _upload_part_concurrent( + self, + bucket, + key, + mpu, + f0, + callback=_DEFAULT_CALLBACK, + chunksize=50 * 2**20, + max_concurrency=None, + ): + max_concurrency = max_concurrency or self.max_concurrency + if max_concurrency is None or max_concurrency < 1: + max_concurrency = 1 + + async def _upload_chunk(chunk, part_number): + result = await self._call_s3( + "upload_part", + Bucket=bucket, + PartNumber=part_number, + UploadId=mpu["UploadId"], + Body=chunk, + Key=key, + ) + callback.relative_update(len(chunk)) + return result + + out = [] + while True: + chunks = [] + for i in range(max_concurrency): + chunk = f0.read(chunksize) + if chunk: + chunks.append(chunk) + if not chunks: + break + if len(chunks) > 1: + out.extend( + await asyncio.gather( + *[ + _upload_chunk(chunk, len(out) + i) + for i, chunk in enumerate(chunks, 1) + ] + ) + ) + else: + out.append(await _upload_chunk(chunk, len(out) + 1)) + return out + async def _get_file( self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None ): From f8ed021b7748788eb198851adb19b8e256397dea Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Mon, 19 Feb 2024 14:25:31 +0900 Subject: [PATCH 2/2] review updates --- s3fs/core.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/s3fs/core.py b/s3fs/core.py index f17c7573..7240d1c9 100644 --- a/s3fs/core.py +++ b/s3fs/core.py @@ -235,12 +235,14 @@ class S3FileSystem(AsyncFileSystem): session : aiobotocore AioSession object to be used for all connections. This session will be used inplace of creating a new session inside S3FileSystem. For example: aiobotocore.session.AioSession(profile='test_user') - max_concurrency : int (None) - If given, the maximum number of concurrent transfers to use for a - multipart upload. Defaults to 1 (multipart uploads will be done sequentially). - Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)`` - the result will be a maximum of ``max_concurrency * batch_size`` concurrent - transfers. + max_concurrency : int (1) + The maximum number of concurrent transfers to use per file for multipart + upload (``put()``) operations. Defaults to 1 (sequential). When used in + conjunction with ``S3FileSystem.put(batch_size=...)`` the maximum number of + simultaneous connections is ``max_concurrency * batch_size``. We may extend + this parameter to affect ``pipe()``, ``cat()`` and ``get()``. Increasing this + value will result in higher memory usage during multipart upload operations (by + ``max_concurrency * chunksize`` bytes per file). The following parameters are passed on to fsspec: @@ -288,7 +290,7 @@ def __init__( cache_regions=False, asynchronous=False, loop=None, - max_concurrency=None, + max_concurrency=1, **kwargs, ): if key and username: @@ -326,6 +328,8 @@ def __init__( self.cache_regions = cache_regions self._s3 = None self.session = session + if max_concurrency < 1: + raise ValueError("max_concurrency must be >= 1") self.max_concurrency = max_concurrency @property @@ -1183,7 +1187,7 @@ async def _put_file( mpu = await self._call_s3( "create_multipart_upload", Bucket=bucket, Key=key, **kwargs ) - out = await self._upload_part_concurrent( + out = await self._upload_file_part_concurrent( bucket, key, mpu, @@ -1206,7 +1210,7 @@ async def _put_file( self.invalidate_cache(rpath) rpath = self._parent(rpath) - async def _upload_part_concurrent( + async def _upload_file_part_concurrent( self, bucket, key, @@ -1217,8 +1221,8 @@ async def _upload_part_concurrent( max_concurrency=None, ): max_concurrency = max_concurrency or self.max_concurrency - if max_concurrency is None or max_concurrency < 1: - max_concurrency = 1 + if max_concurrency < 1: + raise ValueError("max_concurrency must be >= 1") async def _upload_chunk(chunk, part_number): result = await self._call_s3(