How to use the oss2.to_bytes function in oss2

To help you get started, we’ve selected a few oss2 examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aliyun / aliyun-oss-python-sdk / tests / test_chinese.py View on Github external
def test_put_get_list_delete(self):
        for key in ['中文!@#$%^&*()-=文件\x0C-1.txt', u'中文!@#$%^&*()-=文件\x0C-1.txt']:
            content = '中文内容'

            self.bucket.put_object(key, content)
            self.assertEqual(self.bucket.get_object(key).read(), to_bytes(content))

            self.assertTrue(to_string(key) in list(info.key for info in oss2.ObjectIterator(self.bucket, prefix='中文')))

            self.bucket.delete_object(key)
github aliyun / aliyun-oss-python-sdk / tests / test_mock_object.py View on Github external
def test_crypto_get_compact_deprecated_rsa(self, do_request):
        utils.silently_remove('./rsa-test.public_key.pem')
        utils.silently_remove('./rsa-test.private_key.pem')

        with open("./rsa-test.private_key.pem", 'wb') as f:
            f.write(oss2.to_bytes(private_key_compact))

        with open("./rsa-test.public_key.pem", 'wb') as f:
            f.write(oss2.to_bytes(public_key_compact))

        content = b'a' * 1024 * 1024
        encrypted_rsa_path = "tests/deprecated_encrypted_1MB_a_rsa"
        encrypted_meta_rsa_path = "tests/deprecated_encrypted_1MB_a_meta_rsa.json"

        with open(encrypted_rsa_path, 'rb') as f:
            encrypted_content = f.read()

        with open(encrypted_meta_rsa_path, 'r') as f:
            meta = json.loads(f.read())

        key = random_string(10)
        provider = oss2.LocalRsaProvider(dir='./', key='rsa-test')
github aliyun / aliyun-oss-python-sdk / tests / test_utils.py View on Github external
u = u'中文'

        self.assertEqual(u, oss2.to_unicode(u))
        self.assertEqual(u.encode('utf-8'), oss2.to_bytes(u))

        if is_py2:
            self.assertEqual(u.encode('utf-8'), oss2.to_string(u))

        if is_py3:
            self.assertEqual(u, oss2.to_string(u))

        # from bytes
        b = u.encode('utf-8')

        self.assertEqual(b.decode('utf-8'), oss2.to_unicode(b))
        self.assertEqual(b, oss2.to_bytes(b))

        if is_py2:
            self.assertEqual(b, oss2.to_string(b))

        if is_py3:
            self.assertEqual(b.decode('utf-8'), oss2.to_string(b))
github aliyun / aliyun-oss-python-sdk / unittests / common.py View on Github external
def random_bytes(n):
    return oss2.to_bytes(random_string(n))
github aliyun / aliyun-oss-python-sdk / tests / test_crc64_combine.py View on Github external
def test_crc64_combine(self):
        _POLY = 0x142F0E1EBA9EA3693
        _XOROUT = 0XFFFFFFFFFFFFFFFF

        string_a = oss2.to_bytes('12345')
        string_b = oss2.to_bytes('67890')

        combine_fun = oss2.crc64_combine.mkCombineFun(_POLY, 0, True, _XOROUT)

        crc64_a = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
        crc64_a.update(string_a)
        crc1 = crc64_a.crcValue

        crc64_b = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
        crc64_b.update(string_b)
        crc2 = crc64_b.crcValue

        crc_combine = combine_fun(crc1, crc2, len(string_b))

        crc64_c = crcmod.Crc(_POLY, initCrc=0, xorOut=_XOROUT)
        crc64_c.update(string_a + string_b)
github aliyun / aliyun-oss-python-sdk / tests / test_utils.py View on Github external
def __fake_response(self, status, error_body):
        key = self.random_key()

        self.bucket.put_object(key, oss2.to_bytes(error_body))
        resp = self.bucket.get_object(key).resp
        resp.status = status

        return resp
github aliyun / aliyun-oss-python-sdk / tests / test_mock_object.py View on Github external
Connection: keep-alive
x-oss-request-id: 566B6BE93A7B8CFD53D4BAA3
Accept-Ranges: bytes
ETag: "D80CF0E5BE2436514894D64B2BCFB2AE"
x-oss-meta-client-side-encryption-wrap-alg: {1}
x-oss-meta-client-side-encryption-cek-alg: {2}
x-oss-meta-client-side-encryption-key: {3}
x-oss-meta-client-side-encryption-start: {4}
x-oss-meta-unencrypted-content-length: {5}
Last-Modified: Sat, 12 Dec 2015 00:35:53 GMT
x-oss-object-type: Normal

'''.format(len(encrypted_content), wrap_alg, cek_alg, encrypted_key, encrypted_iv, len(encrypted_content))

    io = BytesIO()
    io.write(oss2.to_bytes(response_text))
    io.write(encrypted_content)

    response_text = io.getvalue()

    return request_text, response_text
github aliyun / aliyun-oss-python-sdk / unittests / common.py View on Github external
def make_tempfile(self, content):
        fd, pathname = tempfile.mkstemp(suffix='test-upload')

        os.write(fd, oss2.to_bytes(content))
        os.close(fd)

        self.temp_files.append(pathname)
        return pathname
github aliyun / aliyun-oss-python-sdk / examples / upload.py View on Github external
# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint):
    assert '<' not in param, '请设置参数:' + param


# 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)


def random_string(n):
    return ''.join(random.choice(string.ascii_lowercase) for i in range(n))

# 生成一个本地文件用于测试。文件内容是bytes类型。
filename = random_string(32) + '.txt'
content = oss2.to_bytes(random_string(1024 * 1024))

with open(filename, 'wb') as fileobj:
    fileobj.write(content)


"""
断点续传上传
"""

# 断点续传一:因为文件比较小(小于oss2.defaults.multipart_threshold),
# 所以实际上用的是oss2.Bucket.put_object
oss2.resumable_upload(bucket, 'remote-normal.txt', filename)

# 断点续传二:为了展示的需要,我们指定multipart_threshold可选参数,确保使用分片上传
oss2.resumable_upload(bucket, 'remote-multipart.txt', filename, multipart_threshold=100 * 1024)