Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def clean_and_delete_bucket_by_prefix(bucket_prefix):
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT)
buckets = service.list_buckets(prefix=bucket_prefix).buckets
for b in buckets:
bucket = oss2.Bucket(oss2.Auth(OSS_ID, OSS_SECRET), b.extranet_endpoint, b.name)
clean_and_delete_bucket(bucket)
assert_found = False
def do_request(session_self, req, timeout):
if assert_found:
self.assertTrue(req.headers['User-Agent'].find(app) >= 0)
else:
self.assertTrue(req.headers['User-Agent'].find(app) < 0)
raise oss2.exceptions.ClientError('intentional')
from unittest.mock import patch
with patch.object(oss2.Session, 'do_request', side_effect=do_request, autospec=True):
# 不加 app_name
assert_found = False
self.assertRaises(oss2.exceptions.ClientError, self.bucket.get_bucket_acl)
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT)
self.assertRaises(oss2.exceptions.ClientError, service.list_buckets)
# 加app_name
assert_found = True
bucket = oss2.Bucket(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT, OSS_BUCKET,
app_name=app)
self.assertRaises(oss2.exceptions.ClientError, bucket.get_bucket_acl)
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT,
app_name=app)
self.assertRaises(oss2.exceptions.ClientError, service.list_buckets)
def test_bucket_iterator(self):
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT)
self.assertTrue(OSS_BUCKET in (b.name for b in oss2.BucketIterator(service, max_keys=2)))
self.assertTrue(OSS_BUCKET in list(b.name for b in oss2.BucketIterator(service, max_keys=2)))
from unittest.mock import patch
with patch.object(oss2.Session, 'do_request', side_effect=do_request, autospec=True):
# 不加 app_name
assert_found = False
self.assertRaises(oss2.exceptions.ClientError, self.bucket.get_bucket_acl)
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT)
self.assertRaises(oss2.exceptions.ClientError, service.list_buckets)
# 加app_name
assert_found = True
bucket = oss2.Bucket(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT, OSS_BUCKET,
app_name=app)
self.assertRaises(oss2.exceptions.ClientError, bucket.get_bucket_acl)
service = oss2.Service(oss2.Auth(OSS_ID, OSS_SECRET), OSS_ENDPOINT,
app_name=app)
self.assertRaises(oss2.exceptions.ClientError, service.list_buckets)
def oss_check(self, bucket_name, default_endpoint, access_key_id, access_key_secret):
if bucket_name in self.bucket_endpoints:
endpoint = self.bucket_endpoints[bucket_name]
try:
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name, connect_timeout=5.0, app_name=defaults.app_name)
res = bucket.get_bucket_acl()
except oss2.exceptions.OssError as e:
raise AuthenticationFailed("access bucket:%s using specified \
endpoint:%s failed. request_id:%s, status:%s, code:%s, message:%s" % (bucket_name, endpoint, e.request_id, unicode(e.status), e.code, e.message))
return endpoint
try:
service = oss2.Service(oss2.Auth(access_key_id, access_key_secret), default_endpoint, app_name=defaults.app_name)
res = service.list_buckets(prefix=bucket_name)
except oss2.exceptions.AccessDenied as e:
raise AuthenticationFailed("can't list buckets, check your access_key.request_id:%s, status:%s, code:%s, message:%s"% (e.request_id, unicode(e.status), e.code, e.message))
except oss2.exceptions.OssError as e:
raise AuthenticationFailed("list buckets error. request_id:%s, status:%s, code:%s, message:%s" % (e.request_id, unicode(e.status), e.code, e.message))
bucket_list = res.buckets
for bucket in bucket_list:
if bucket.name == bucket_name:
endpoint = self.get_endpoint(bucket_name, bucket.location.decode('utf-8'), access_key_id, access_key_secret)
return endpoint
raise AuthenticationFailed("can't find the bucket %s when list buckets." % bucket_name)
def get_oss_client(credentials, region=None):
try:
auth = oss2.Auth(credentials.credentials.access_key_id, credentials.credentials.access_key_secret)
client = oss2.Service(auth,
endpoint='oss-{}.aliyuncs.com'.format(region) if region
else 'oss-cn-hangzhou.aliyuncs.com')
return client
except Exception as e:
print_exception(e)
return None
def __init__(self):
self.access_key_id = self._get_config('OSS_ACCESS_KEY_ID')
self.access_key_secret = self._get_config('OSS_ACCESS_KEY_SECRET')
self.end_point = _normalize_endpoint(self._get_config('OSS_END_POINT').strip())
self.bucket_name = self._get_config('OSS_BUCKET_NAME')
self.cname = self._get_config('OSS_CNAME')
self.auth = Auth(self.access_key_id, self.access_key_secret)
self.service = Service(self.auth, self.end_point)
try:
if self.bucket_name not in self._list_bucket(self.service):
# create bucket if not exists
self.bucket = self._create_bucket(self.auth)
else:
# change bucket acl if not consists
self.bucket = self._check_bucket_acl(self._get_bucket(self.auth))
except AccessDenied:
# 当启用了RAM访问策略,是不允许list和create bucket的
self.bucket = self._get_bucket(self.auth)