How to use the oss2.ObjectIterator function in oss2

To help you get started, we’ve selected a few oss2 examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aliyun / aliyun-oss-python-sdk / tests / test_chinese.py View on Github external
def test_put_get_list_delete(self):
        for key in ['中文!@#$%^&*()-=文件\x0C-1.txt', u'中文!@#$%^&*()-=文件\x0C-1.txt']:
            content = '中文内容'

            self.bucket.put_object(key, content)
            self.assertEqual(self.bucket.get_object(key).read(), to_bytes(content))

            self.assertTrue(to_string(key) in list(info.key for info in oss2.ObjectIterator(self.bucket, prefix='中文')))

            self.bucket.delete_object(key)
github aliyun / aliyun-oss-python-sdk / tests / test_object_request_payment.py View on Github external
def test_object_iterator(self):
        # ObjectIterator without payer setting, should be failed.
        access_err_flag = False
        try:
            obj_iter = oss2.ObjectIterator(self.payer_bucket)
            for obj in obj_iter:
                pass
        except oss2.exceptions.AccessDenied:
            access_err_flag = True

        self.assertEqual(access_err_flag, True)

        # ObjectIterator with payer setting, should be failed.
        headers = dict()
        headers[OSS_REQUEST_PAYER] = "requester"
        obj_iter = oss2.ObjectIterator(self.payer_bucket, headers=headers)
        for obj in obj_iter:
            pass
github aliyun / aliyun-oss-python-sdk / tests / common.py View on Github external
for del_marker in objects.delete_marker:
                    bucket.delete_object(del_marker.key, params={'versionId': del_marker.versionid})
                is_truncated = objects.is_truncated
                if is_truncated:
                    next_key_marker = objects.next_key_marker
                    next_versionid_marker = objects.next_versionid_marker
    except:
        pass

    # list all upload_parts to delete
    up_iter = oss2.MultipartUploadIterator(bucket)
    for up in up_iter:
        bucket.abort_multipart_upload(up.key, up.upload_id)

    # list all objects to delete
    obj_iter = oss2.ObjectIterator(bucket)
    for obj in obj_iter:
        bucket.delete_object(obj.key)
    
    # list all live channels to delete
    for ch_iter in oss2.LiveChannelIterator(bucket):
        bucket.delete_live_channel(ch_iter.name)

    # delete_bucket
    bucket.delete_bucket()
github aliyun / aliyun-oss-python-sdk / tests / test_iterator.py View on Github external
dir_list = []

        # 准备文件
        for i in range(20):
            object_list.append(prefix + random_string(16))
            self.bucket.put_object(object_list[-1], random_bytes(10))

        # 准备目录
        for i in range(5):
            dir_list.append(prefix + random_string(5) + '/')
            self.bucket.put_object(dir_list[-1] + random_string(5), random_bytes(3))

        # 验证
        objects_got = []
        dirs_got = []
        for info in oss2.ObjectIterator(self.bucket, prefix, delimiter='/', max_keys=4):
            if info.is_prefix():
                dirs_got.append(info.key)
            else:
                objects_got.append(info.key)

                result = self.bucket.head_object(info.key)
                self.assertEqual(result.last_modified, info.last_modified)

        self.assertEqual(sorted(object_list), objects_got)
        self.assertEqual(sorted(dir_list), dirs_got)

        delete_keys(self.bucket, object_list)
github wylok / sparrow / module / k8s_resource.py View on Github external
dm_type = project_file.split('.')[-1]
        if len(project_file.split('.')) >2:
            dm_type = '.'.join(project_file.split('.')[1:])
        package = '%s-%s.%s'% (package_name, version, dm_type)
        project_path = '%s/%s/%s' % (dockerfile_path, dm_name, project_file)
        if not os.path.exists(project_path):
            try:
                Redis.lpush(redis_key, '%s package download from oss ......' %package)
                _flow_log('%s package download from oss ......' %package)
                auth = oss2.Auth(oss_id, oss_key)
                bucket = oss2.Bucket(auth, oss_url, 'ops')
                oss_project_path = None
                try:
                    if not os.path.exists('%s/%s' %(dockerfile_path,dm_name)):
                        os.mkdir('%s/%s' %(dockerfile_path,dm_name))
                    for obj in oss2.ObjectIterator(bucket):
                        if obj.key.endswith('.war') or obj.key.endswith('.tar.gz') or obj.key.endswith('.jar'):
                            obj_name = obj.key.split('/')[-1].replace('_','-')
                            if obj_name.startswith(package_name) and version in obj_name:
                                oss_project_path = obj.key
                                break
                except Exception as e:
                    logging.error(e)
                if oss_project_path:
                    #尝试3次下载
                    for i in range(3):
                        try:
                            oss2.resumable_download(bucket,oss_project_path, project_path)
                            break
                        except:
                            continue
                else:
github aliyun / aliyun-oss-python-sdk / examples / object_basic.py View on Github external
shutil.copyfileobj(bucket.get_object('motto.txt'), f)


# 把本地文件 “座右铭.txt” 上传到OSS,新的Object叫做 “我的座右铭.txt”
# 注意到,这次put_object()的第二个参数是file object;而上次上传是一个字符串。
# put_object()能够识别不同的参数类型
with open(oss2.to_unicode('本地座右铭.txt'), 'rb') as f:
    bucket.put_object('云上座右铭.txt', f)


# 上面两行代码,也可以用下面的一行代码来实现
bucket.put_object_from_file('云上座右铭.txt', '本地座右铭.txt')


# 列举Bucket下10个Object,并打印它们的最后修改时间、文件名
for i, object_info in enumerate(oss2.ObjectIterator(bucket)):
    print("{0} {1}".format(object_info.last_modified, object_info.key))

    if i >= 9:
        break


# 删除名为motto.txt的Object
bucket.delete_object('motto.txt')

# 也可以批量删除
# 注意:重复删除motto.txt,并不会报错
bucket.batch_delete_objects(['motto.txt', '云上座右铭.txt'])


# 确认Object已经被删除了
assert not bucket.object_exists('motto.txt')
github iterative / dvc / dvc / remote / oss.py View on Github external
def _list_paths(self, prefix):
        import oss2

        for blob in oss2.ObjectIterator(self.oss_service, prefix=prefix):
            yield blob.key