Skip to content

Commit b2a8eb4

Browse files
committed
put_file: support concurrent multipart uploads with max_concurrency
1 parent 74f4d95 commit b2a8eb4

File tree

1 file changed

+54
-19
lines changed

1 file changed

+54
-19
lines changed

s3fs/core.py

+54-19
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ class S3FileSystem(AsyncFileSystem):
235235
session : aiobotocore AioSession object to be used for all connections.
236236
This session will be used inplace of creating a new session inside S3FileSystem.
237237
For example: aiobotocore.session.AioSession(profile='test_user')
238+
max_concurrency : int (None)
239+
If given, the maximum number of concurrent transfers to use for a
240+
multipart upload. Defaults to 1 (multipart uploads will be done sequentially).
241+
Note that when used in conjunction with ``S3FileSystem.put(batch_size=...)``
242+
the result will be a maximum of ``max_concurrency * batch_size`` concurrent
243+
transfers.
238244
239245
The following parameters are passed on to fsspec:
240246
@@ -282,6 +288,7 @@ def __init__(
282288
cache_regions=False,
283289
asynchronous=False,
284290
loop=None,
291+
max_concurrency=None,
285292
**kwargs,
286293
):
287294
if key and username:
@@ -319,6 +326,7 @@ def __init__(
319326
self.cache_regions = cache_regions
320327
self._s3 = None
321328
self.session = session
329+
self.max_concurrency = max_concurrency
322330

323331
@property
324332
def s3(self):
@@ -1140,7 +1148,7 @@ async def _pipe_file(self, path, data, chunksize=50 * 2**20, **kwargs):
11401148
self.invalidate_cache(path)
11411149

11421150
async def _put_file(
1143-
self, lpath, rpath, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, **kwargs
1151+
self, lpath, rpath, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, max_concurrency=None, **kwargs
11441152
):
11451153
bucket, key, _ = self.split_path(rpath)
11461154
if os.path.isdir(lpath):
@@ -1169,24 +1177,15 @@ async def _put_file(
11691177
mpu = await self._call_s3(
11701178
"create_multipart_upload", Bucket=bucket, Key=key, **kwargs
11711179
)
1172-
1173-
out = []
1174-
while True:
1175-
chunk = f0.read(chunksize)
1176-
if not chunk:
1177-
break
1178-
out.append(
1179-
await self._call_s3(
1180-
"upload_part",
1181-
Bucket=bucket,
1182-
PartNumber=len(out) + 1,
1183-
UploadId=mpu["UploadId"],
1184-
Body=chunk,
1185-
Key=key,
1186-
)
1187-
)
1188-
callback.relative_update(len(chunk))
1189-
1180+
out = await self._upload_part_concurrent(
1181+
bucket,
1182+
key,
1183+
mpu,
1184+
f0,
1185+
callback=callback,
1186+
chunksize=chunksize,
1187+
max_concurrency=max_concurrency,
1188+
)
11901189
parts = [
11911190
{"PartNumber": i + 1, "ETag": o["ETag"]} for i, o in enumerate(out)
11921191
]
@@ -1201,6 +1200,42 @@ async def _put_file(
12011200
self.invalidate_cache(rpath)
12021201
rpath = self._parent(rpath)
12031202

1203+
async def _upload_part_concurrent(
1204+
self, bucket, key, mpu, f0, callback=_DEFAULT_CALLBACK, chunksize=50 * 2**20, max_concurrency=None
1205+
):
1206+
max_concurrency = max_concurrency or self.max_concurrency
1207+
if max_concurrency is None or max_concurrency < 1:
1208+
max_concurrency = 1
1209+
1210+
async def _upload_chunk(chunk, part_number):
1211+
result = await self._call_s3(
1212+
"upload_part",
1213+
Bucket=bucket,
1214+
PartNumber=part_number,
1215+
UploadId=mpu["UploadId"],
1216+
Body=chunk,
1217+
Key=key,
1218+
)
1219+
callback.relative_update(len(chunk))
1220+
return result
1221+
1222+
out = []
1223+
while True:
1224+
chunks = []
1225+
for i in range(max_concurrency):
1226+
chunk = f0.read(chunksize)
1227+
if chunk:
1228+
chunks.append(chunk)
1229+
if not chunks:
1230+
break
1231+
if len(chunks) > 1:
1232+
out.extend(
1233+
await asyncio.gather(*[_upload_chunk(chunk, len(out) + i) for i, chunk in enumerate(chunks, 1)])
1234+
)
1235+
else:
1236+
out.append(await _upload_chunk(chunk, len(out) + 1))
1237+
return out
1238+
12041239
async def _get_file(
12051240
self, rpath, lpath, callback=_DEFAULT_CALLBACK, version_id=None
12061241
):

0 commit comments

Comments
 (0)