Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_crypto_init_multipart_invalid_part_size(self):
crypto_bucket = random.choice([self.rsa_crypto_bucket, self.kms_crypto_bucket])
key = self.random_key()
data_size = 1024 * 100
part_size = random.randint(1, 15) + 1024 * 100
context = models.MultipartUploadCryptoContext(data_size, part_size)
# not align to block_size
self.assertRaises(oss2.exceptions.ClientError, crypto_bucket.init_multipart_upload, key, upload_context=context)
# part size is small than 100*1024
part_size = random.randint(1, 1024 * 100 - 1)
context.part_size = part_size
self.assertRaises(oss2.exceptions.ClientError, crypto_bucket.init_multipart_upload, key, upload_context=context)
def test_get_object_to_file_incomplete_download(self):
file_size = 123 * 3 + 1
key, filename, content = self.__prepare(self.bucket, file_size)
with patch.object(oss2.Bucket, 'get_object',
side_effect=partial(mock_get_object, content_length=file_size),
autospec=True):
try:
self.bucket.get_object_to_file(key, filename)
except oss2.exceptions.InconsistentError as e:
self.assertTrue(e.request_id)
self.assertEqual(e.body, 'InconsistentError: IncompleteRead from source')
except:
self.assertTrue(False)
def test_make_exception(self):
body = 'bad body'
e = make_exception(self.__fake_response(400, body))
self.assertTrue(isinstance(e, oss2.exceptions.ServerError))
self.assertEqual(e.status, 400)
self.assertEqual(e.body, oss2.to_bytes(body))
body = '<code>NoSuchKey</code>中文和控制字符'
e = make_exception(self.__fake_response(404, body))
self.assertTrue(isinstance(e, oss2.exceptions.NoSuchKey))
self.assertEqual(e.status, 404)
self.assertEqual(e.code, 'NoSuchKey')
def oss_check(self, bucket_name, default_endpoint, access_key_id, access_key_secret):
if bucket_name in self.bucket_endpoints:
endpoint = self.bucket_endpoints[bucket_name]
try:
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name, connect_timeout=5.0, app_name=defaults.app_name)
res = bucket.get_bucket_acl()
except oss2.exceptions.OssError as e:
raise AuthenticationFailed("access bucket:%s using specified \
endpoint:%s failed. request_id:%s, status:%s, code:%s, message:%s" % (bucket_name, endpoint, e.request_id, unicode(e.status), e.code, e.message))
return endpoint
try:
service = oss2.Service(oss2.Auth(access_key_id, access_key_secret), default_endpoint, app_name=defaults.app_name)
res = service.list_buckets(prefix=bucket_name)
except oss2.exceptions.AccessDenied as e:
raise AuthenticationFailed("can't list buckets, check your access_key.request_id:%s, status:%s, code:%s, message:%s"% (e.request_id, unicode(e.status), e.code, e.message))
except oss2.exceptions.OssError as e:
raise AuthenticationFailed("list buckets error. request_id:%s, status:%s, code:%s, message:%s" % (e.request_id, unicode(e.status), e.code, e.message))
bucket_list = res.buckets
for bucket in bucket_list:
if bucket.name == bucket_name:
endpoint = self.get_endpoint(bucket_name, bucket.location.decode('utf-8'), access_key_id, access_key_secret)
return endpoint
raise AuthenticationFailed("can't find the bucket %s when list buckets." % bucket_name)
def getObj(self,filename,record_date):
'''获取str对象'''
try:
object_stream = self.bucket.get_object('%s/%s/%s'%(self.base_dir,record_date,filename))
#print('[Success] Get obj success!')
return object_stream.read().decode()
except oss2.exceptions.NoSuchKey as e:
return json.dumps({'0.0029790401458740234':'[Error] OSS录像文件不存在!'})
except oss2.exceptions.ServerError as e:
return json.dumps({'0.0029790401458740234':'[Error] 请检查[KEY][SECRET][存储桶]是否正确!'})
except oss2.exceptions.AccessDenied as e:
return json.dumps({'0.0029790401458740234':'[Error] 操作拒绝,请检查key是否有权限上传!'})
except Exception as e:
return json.dumps({'0.0029790401458740234':'[Error]--->%s'%e})
def bucket_exists(self):
import oss2
try:
self.bucket.get_bucket_acl()
return True
except oss2.exceptions.NoSuchBucket:
return False
def _query(self, filename):
"""Get the file size"""
import oss2
key_name = self.key_prefix + filename
try:
file_size = self.bucket.get_object_meta(key_name).content_length
except oss2.exceptions.NoSuchKey :
file_size = -1
return {'size': file_size}
def getObj(self,filename,record_date):
'''获取str对象'''
try:
object_stream = self.bucket.get_object('%s/%s/%s'%(self.base_dir,record_date,filename))
#print('[Success] Get obj success!')
return object_stream.read().decode()
except oss2.exceptions.NoSuchKey as e:
return json.dumps({'0.0029790401458740234':'[Error] OSS录像文件不存在!'})
except oss2.exceptions.ServerError as e:
return json.dumps({'0.0029790401458740234':'[Error] 请检查[KEY][SECRET][存储桶]是否正确!'})
except oss2.exceptions.AccessDenied as e:
return json.dumps({'0.0029790401458740234':'[Error] 操作拒绝,请检查key是否有权限上传!'})
except Exception as e:
return json.dumps({'0.0029790401458740234':'[Error]--->%s'%e})
def getObj(self,filename,record_date):
'''获取str对象'''
try:
object_stream = self.bucket.get_object('%s/%s/%s'%(self.base_dir,record_date,filename))
#print('[Success] Get obj success!')
return object_stream.read().decode()
except oss2.exceptions.NoSuchKey as e:
return json.dumps({'0.0029790401458740234':'[Error] OSS录像文件不存在!'})
except oss2.exceptions.ServerError as e:
return json.dumps({'0.0029790401458740234':'[Error] 请检查[KEY][SECRET][存储桶]是否正确!'})
except oss2.exceptions.AccessDenied as e:
return json.dumps({'0.0029790401458740234':'[Error] 操作拒绝,请检查key是否有权限上传!'})
except Exception as e:
return json.dumps({'0.0029790401458740234':'[Error]--->%s'%e})