Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_put_get_list_delete(self):
for key in ['中文!@#$%^&*()-=文件\x0C-1.txt', u'中文!@#$%^&*()-=文件\x0C-1.txt']:
content = '中文内容'
self.bucket.put_object(key, content)
self.assertEqual(self.bucket.get_object(key).read(), to_bytes(content))
self.assertTrue(to_string(key) in list(info.key for info in oss2.ObjectIterator(self.bucket, prefix='中文')))
self.bucket.delete_object(key)
def test_object_iterator(self):
# ObjectIterator without payer setting, should be failed.
access_err_flag = False
try:
obj_iter = oss2.ObjectIterator(self.payer_bucket)
for obj in obj_iter:
pass
except oss2.exceptions.AccessDenied:
access_err_flag = True
self.assertEqual(access_err_flag, True)
# ObjectIterator with payer setting, should be failed.
headers = dict()
headers[OSS_REQUEST_PAYER] = "requester"
obj_iter = oss2.ObjectIterator(self.payer_bucket, headers=headers)
for obj in obj_iter:
pass
for del_marker in objects.delete_marker:
bucket.delete_object(del_marker.key, params={'versionId': del_marker.versionid})
is_truncated = objects.is_truncated
if is_truncated:
next_key_marker = objects.next_key_marker
next_versionid_marker = objects.next_versionid_marker
except:
pass
# list all upload_parts to delete
up_iter = oss2.MultipartUploadIterator(bucket)
for up in up_iter:
bucket.abort_multipart_upload(up.key, up.upload_id)
# list all objects to delete
obj_iter = oss2.ObjectIterator(bucket)
for obj in obj_iter:
bucket.delete_object(obj.key)
# list all live channels to delete
for ch_iter in oss2.LiveChannelIterator(bucket):
bucket.delete_live_channel(ch_iter.name)
# delete_bucket
bucket.delete_bucket()
dir_list = []
# 准备文件
for i in range(20):
object_list.append(prefix + random_string(16))
self.bucket.put_object(object_list[-1], random_bytes(10))
# 准备目录
for i in range(5):
dir_list.append(prefix + random_string(5) + '/')
self.bucket.put_object(dir_list[-1] + random_string(5), random_bytes(3))
# 验证
objects_got = []
dirs_got = []
for info in oss2.ObjectIterator(self.bucket, prefix, delimiter='/', max_keys=4):
if info.is_prefix():
dirs_got.append(info.key)
else:
objects_got.append(info.key)
result = self.bucket.head_object(info.key)
self.assertEqual(result.last_modified, info.last_modified)
self.assertEqual(sorted(object_list), objects_got)
self.assertEqual(sorted(dir_list), dirs_got)
delete_keys(self.bucket, object_list)
dm_type = project_file.split('.')[-1]
if len(project_file.split('.')) >2:
dm_type = '.'.join(project_file.split('.')[1:])
package = '%s-%s.%s'% (package_name, version, dm_type)
project_path = '%s/%s/%s' % (dockerfile_path, dm_name, project_file)
if not os.path.exists(project_path):
try:
Redis.lpush(redis_key, '%s package download from oss ......' %package)
_flow_log('%s package download from oss ......' %package)
auth = oss2.Auth(oss_id, oss_key)
bucket = oss2.Bucket(auth, oss_url, 'ops')
oss_project_path = None
try:
if not os.path.exists('%s/%s' %(dockerfile_path,dm_name)):
os.mkdir('%s/%s' %(dockerfile_path,dm_name))
for obj in oss2.ObjectIterator(bucket):
if obj.key.endswith('.war') or obj.key.endswith('.tar.gz') or obj.key.endswith('.jar'):
obj_name = obj.key.split('/')[-1].replace('_','-')
if obj_name.startswith(package_name) and version in obj_name:
oss_project_path = obj.key
break
except Exception as e:
logging.error(e)
if oss_project_path:
#尝试3次下载
for i in range(3):
try:
oss2.resumable_download(bucket,oss_project_path, project_path)
break
except:
continue
else:
shutil.copyfileobj(bucket.get_object('motto.txt'), f)
# 把本地文件 “座右铭.txt” 上传到OSS,新的Object叫做 “我的座右铭.txt”
# 注意到,这次put_object()的第二个参数是file object;而上次上传是一个字符串。
# put_object()能够识别不同的参数类型
with open(oss2.to_unicode('本地座右铭.txt'), 'rb') as f:
bucket.put_object('云上座右铭.txt', f)
# 上面两行代码,也可以用下面的一行代码来实现
bucket.put_object_from_file('云上座右铭.txt', '本地座右铭.txt')
# 列举Bucket下10个Object,并打印它们的最后修改时间、文件名
for i, object_info in enumerate(oss2.ObjectIterator(bucket)):
print("{0} {1}".format(object_info.last_modified, object_info.key))
if i >= 9:
break
# 删除名为motto.txt的Object
bucket.delete_object('motto.txt')
# 也可以批量删除
# 注意:重复删除motto.txt,并不会报错
bucket.batch_delete_objects(['motto.txt', '云上座右铭.txt'])
# 确认Object已经被删除了
assert not bucket.object_exists('motto.txt')
def _list_paths(self, prefix):
import oss2
for blob in oss2.ObjectIterator(self.oss_service, prefix=prefix):
yield blob.key