14
14
import xml .etree .ElementTree
15
15
from requests import Request , Session , ConnectionError , Timeout
16
16
from datetime import datetime
17
- from six .moves .urllib .parse import quote , unquote , urlencode
17
+ from six .moves .urllib .parse import quote , unquote , urlencode , urlparse
18
18
from six import text_type , binary_type
19
19
from hashlib import md5
20
20
from .streambody import StreamBody
@@ -241,7 +241,7 @@ class CosS3Client(object):
241
241
__built_in_sessions = None # 内置的静态连接池,多个Client间共享使用
242
242
__built_in_pid = 0
243
243
244
- def __init__ (self , conf , retry = 1 , session = None ):
244
+ def __init__ (self , conf , retry = 3 , session = None ):
245
245
"""初始化client对象
246
246
247
247
:param conf(CosConfig): 用户的配置.
@@ -250,6 +250,7 @@ def __init__(self, conf, retry=1, session=None):
250
250
"""
251
251
self ._conf = conf
252
252
self ._retry = retry # 重试的次数,分片上传时可适当增大
253
+ self ._retry_exe_times = 0 # 重试已执行次数
253
254
254
255
if session is None :
255
256
if not CosS3Client .__built_in_sessions :
@@ -299,6 +300,14 @@ def handle_built_in_connection_pool_by_pid(self):
299
300
def get_conf (self ):
300
301
"""获取配置"""
301
302
return self ._conf
303
+
304
+ def get_retry_exe_times (self ):
305
+ """获取重试已执行次数"""
306
+ return self ._retry_exe_times
307
+
308
+ def inc_retry_exe_times (self ):
309
+ """重试执行次数递增"""
310
+ self ._retry_exe_times += 1
302
311
303
312
def get_auth (self , Method , Bucket , Key , Expired = 300 , Headers = {}, Params = {}, SignHost = None , UseCiEndPoint = False ):
304
313
"""获取签名
@@ -342,11 +351,11 @@ def get_auth(self, Method, Bucket, Key, Expired=300, Headers={}, Params={}, Sign
342
351
auth = CosS3Auth (self ._conf , Key , Params , Expired , SignHost )
343
352
return auth (r ).headers ['Authorization' ]
344
353
345
- def should_switch_domain (self , domain_switched , headers = {}):
354
+ def should_switch_domain (self , url , headers = {}):
355
+ host = urlparse (url ).hostname
346
356
if not 'x-cos-request-id' in headers and \
347
- not domain_switched and \
348
357
self ._conf ._auto_switch_domain_on_retry and \
349
- self . _conf . _ip is None :
358
+ re . match ( r'^([a-z0-9-]+-[0-9]+\.)(cos\.[a-z]+-[a-z]+(-[a-z]+)?(-1)?)\.(myqcloud\.com)$' , host ) :
350
359
return True
351
360
return False
352
361
@@ -375,7 +384,6 @@ def send_request(self, method, url, bucket=None, timeout=30, cos_request=True, c
375
384
kwargs ['headers' ] = format_values (kwargs ['headers' ])
376
385
377
386
file_position = None
378
- domain_switched = False # 重试时如果要切换域名, 只切换一次
379
387
if 'data' in kwargs :
380
388
body = kwargs ['data' ]
381
389
if hasattr (body , 'tell' ) and hasattr (body , 'seek' ) and hasattr (body , 'read' ):
@@ -402,9 +410,11 @@ def send_request(self, method, url, bucket=None, timeout=30, cos_request=True, c
402
410
if j != 0 :
403
411
if client_can_retry (file_position , ** kwargs ):
404
412
kwargs ['headers' ]['x-cos-sdk-retry' ] = 'true' # SDK重试标记
413
+ self .inc_retry_exe_times ()
405
414
time .sleep (j )
406
415
else :
407
416
break
417
+ logger .debug ("send request: url: {}, headers: {}" .format (url , kwargs ['headers' ]))
408
418
if method == 'POST' :
409
419
res = self ._session .post (url , timeout = timeout , proxies = self ._conf ._proxies , ** kwargs )
410
420
elif method == 'GET' :
@@ -415,34 +425,27 @@ def send_request(self, method, url, bucket=None, timeout=30, cos_request=True, c
415
425
res = self ._session .delete (url , timeout = timeout , proxies = self ._conf ._proxies , ** kwargs )
416
426
elif method == 'HEAD' :
417
427
res = self ._session .head (url , timeout = timeout , proxies = self ._conf ._proxies , ** kwargs )
428
+ logger .debug ("recv response: status_code: {}, headers: {}" .format (res .status_code , res .headers ))
418
429
if res .status_code < 400 : # 2xx和3xx都认为是成功的
419
430
if res .status_code == 301 or res .status_code == 302 or res .status_code == 307 :
420
- if j < self ._retry and self .should_switch_domain (domain_switched , res .headers ):
431
+ if j < self ._retry and self .should_switch_domain (url , res .headers ):
421
432
url = switch_hostname_for_url (url )
422
- domain_switched = True
423
433
continue
424
434
return res
425
435
elif res .status_code < 500 : # 4xx 不重试
426
- if j < self ._retry and self .should_switch_domain (domain_switched , res .headers ):
427
- url = switch_hostname_for_url (url )
428
- domain_switched = True
429
- continue
430
436
break
431
437
else :
432
- if j < self ._retry and self .should_switch_domain (domain_switched , res .headers ):
438
+ if j == ( self ._retry - 1 ) and self .should_switch_domain (url , res .headers ):
433
439
url = switch_hostname_for_url (url )
434
- domain_switched = True
435
- continue
436
- else :
437
- break
440
+ continue
438
441
except Exception as e : # 捕获requests抛出的如timeout等客户端错误,转化为客户端错误
442
+ logger .debug ("recv exception: {}" .format (e ))
439
443
# 记录每次请求的exception
440
444
exception_log = 'url:%s, retry_time:%d exception:%s' % (url , j , str (e ))
441
445
exception_logbuf .append (exception_log )
442
446
if j < self ._retry and (isinstance (e , ConnectionError ) or isinstance (e , Timeout )): # 只重试网络错误
443
- if self .should_switch_domain (domain_switched ):
447
+ if j == ( self ._retry - 1 ) and self . should_switch_domain (url ):
444
448
url = switch_hostname_for_url (url )
445
- domain_switched = True
446
449
continue
447
450
logger .exception (exception_logbuf ) # 最终重试失败, 输出前几次重试失败的exception
448
451
raise CosClientError (str (e ))
@@ -524,6 +527,7 @@ def get_object(self, Bucket, Key, KeySimplifyCheck=True, **kwargs):
524
527
525
528
:param Bucket(string): 存储桶名称.
526
529
:param Key(string): COS路径.
530
+ :param KeySimplifyCheck(bool): 是否对Key进行posix路径语义归并检查
527
531
:param kwargs(dict): 设置下载的headers.
528
532
:return(dict): 下载成功返回的结果,包含Body对应的StreamBody,可以获取文件流或下载文件到本地.
529
533
@@ -4033,7 +4037,7 @@ def _check_all_upload_parts(self, bucket, key, uploadid, local_path, parts_num,
4033
4037
already_exist_parts [part_num ] = part ['ETag' ]
4034
4038
return True
4035
4039
4036
- def download_file (self , Bucket , Key , DestFilePath , PartSize = 20 , MAXThread = 5 , EnableCRC = False , progress_callback = None , DumpRecordDir = None , KeySimplifyCheck = True , ** Kwargs ):
4040
+ def download_file (self , Bucket , Key , DestFilePath , PartSize = 20 , MAXThread = 5 , EnableCRC = False , progress_callback = None , DumpRecordDir = None , KeySimplifyCheck = True , DisableTempDestFilePath = False , ** Kwargs ):
4037
4041
"""小于等于20MB的文件简单下载,大于20MB的文件使用续传下载
4038
4042
4039
4043
:param Bucket(string): 存储桶名称.
@@ -4042,6 +4046,9 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAXThread=5, Ena
4042
4046
:param PartSize(int): 分块下载的大小设置,单位为MB.
4043
4047
:param MAXThread(int): 并发下载的最大线程数.
4044
4048
:param EnableCRC(bool): 校验下载文件与源文件是否一致
4049
+ :param DumpRecordDir(string): 指定保存断点信息的文件路径
4050
+ :param KeySimplifyCheck(bool): 是否对Key进行posix路径语义归并检查
4051
+ :param DisableTempDestFilePath(bool): 简单下载写入目标文件时,不使用临时文件
4045
4052
:param kwargs(dict): 设置请求headers.
4046
4053
"""
4047
4054
logger .debug ("Start to download file, bucket: {0}, key: {1}, dest_filename: {2}, part_size: {3}MB,\
@@ -4058,9 +4065,9 @@ def download_file(self, Bucket, Key, DestFilePath, PartSize=20, MAXThread=5, Ena
4058
4065
head_headers ['VersionId' ] = Kwargs ['VersionId' ]
4059
4066
object_info = self .head_object (Bucket , Key , ** head_headers )
4060
4067
file_size = int (object_info ['Content-Length' ])
4061
- if file_size <= 1024 * 1024 * 20 :
4068
+ if file_size <= 1024 * 1024 * PartSize :
4062
4069
response = self .get_object (Bucket , Key , KeySimplifyCheck , ** Kwargs )
4063
- response ['Body' ].get_stream_to_file (DestFilePath )
4070
+ response ['Body' ].get_stream_to_file (DestFilePath , DisableTempDestFilePath )
4064
4071
return
4065
4072
4066
4073
# 支持回调查看进度
0 commit comments