Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _rebuild_record(self, filename, store, bucket, key, upload_id, part_size=None, upload_context=None):
abspath = os.path.abspath(filename)
mtime = os.path.getmtime(filename)
size = os.path.getsize(filename)
record = {'op_type': 'ResumableUpload', 'upload_id': upload_id, 'file_path': abspath, 'size': size,
'mtime': mtime, 'bucket': bucket.bucket_name, 'key': key, 'part_size': part_size}
if upload_context:
material = upload_context.content_crypto_material
material_record = {'wrap_alg': material.wrap_alg, 'cek_alg': material.cek_alg,
'encrypted_key': utils.b64encode_as_string(material.encrypted_key),
'encrypted_iv': utils.b64encode_as_string(material.encrypted_iv),
'mat_desc': material.mat_desc}
record['content_crypto_material'] = material_record
store_key = store.make_store_key(bucket.bucket_name, key, abspath)
store.put(store_key, record)
def _rebuild_record(self, filename, store, bucket, key, upload_id, part_size=None, upload_context=None):
abspath = os.path.abspath(filename)
mtime = os.path.getmtime(filename)
size = os.path.getsize(filename)
record = {'op_type': 'ResumableUpload', 'upload_id': upload_id, 'file_path': abspath, 'size': size,
'mtime': mtime, 'bucket': bucket.bucket_name, 'key': key, 'part_size': part_size}
if upload_context:
material = upload_context.content_crypto_material
material_record = {'wrap_alg': material.wrap_alg, 'cek_alg': material.cek_alg,
'encrypted_key': utils.b64encode_as_string(material.encrypted_key),
'encrypted_iv': utils.b64encode_as_string(material.encrypted_iv),
'mat_desc': material.mat_desc}
record['content_crypto_material'] = material_record
store_key = store.make_store_key(bucket.bucket_name, key, abspath)
store.put(store_key, record)
def test_crypto_get(self, do_request):
content = unittests.common.random_bytes(1023)
key = random_string(10)
provider = oss2.RsaProvider(key_pair=key_pair)
content_crypto_material = provider.create_content_material()
request_text, response_text = make_get_encrypted_object(key, content, content_crypto_material)
req_info = unittests.common.mock_response(do_request, response_text)
result = unittests.common.bucket(provider).get_object(key)
encrypted_key = utils.b64encode_as_string(content_crypto_material.encrypted_key)
encrypted_iv = utils.b64encode_as_string(content_crypto_material.encrypted_iv)
wrap_alg = content_crypto_material.wrap_alg
cek_alg = content_crypto_material.cek_alg
self.assertRequest(req_info, request_text)
self.assertEqual(int(result.headers['x-oss-meta-unencrypted-content-length']), len(content))
self.assertEqual(result.status, 200)
self.assertEqual(result.request_id, '566B6BE93A7B8CFD53D4BAA3')
self.assertEqual(result.object_type, 'Normal')
self.assertEqual(result.content_type, 'text/plain')
self.assertEqual(result.etag, 'D80CF0E5BE2436514894D64B2BCFB2AE')
self.assertEqual(result.last_modified, 1449880553)
self.assertEqual(result.headers['x-oss-meta-client-side-encryption-key'], encrypted_key)
self.assertEqual(result.headers['x-oss-meta-client-side-encryption-start'], encrypted_iv)
self.assertEqual(result.headers['x-oss-meta-client-side-encryption-cek-alg'], cek_alg)
self.assertEqual(result.headers['x-oss-meta-client-side-encryption-wrap-alg'], wrap_alg)
def make_get_encrypted_object(key, content, content_crypto_material, invalid_cek_alg='', ranges=None):
request_text = '''GET /{0} HTTP/1.1
Host: ming-oss-share.oss-cn-hangzhou.aliyuncs.com
Accept-Encoding: identity
Connection: keep-alive
date: Sat, 12 Dec 2015 00:35:53 GMT
User-Agent: aliyun-sdk-python/2.0.2(Windows/7/;3.3.3)
Accept: */*
authorization: OSS ZCDmm7TPZKHtx77j:PAedG7U86ZxQ2WTB+GdpSltoiTI='''.format(key)
encrypted_key = utils.b64encode_as_string(content_crypto_material.encrypted_key)
encrypted_iv = utils.b64encode_as_string(content_crypto_material.encrypted_iv)
wrap_alg = content_crypto_material.wrap_alg
cek_alg = content_crypto_material.cek_alg
if invalid_cek_alg:
cek_alg = invalid_cek_alg
cipher = content_crypto_material.cipher
encrypted_content = cipher.encrypt(content)
response_text = '''HTTP/1.1 200 OK
Server: AliyunOSS
Date: Sat, 12 Dec 2015 00:35:53 GMT
Content-Type: text/plain
Content-Length: {0}
Connection: keep-alive
x-oss-request-id: 566B6BE93A7B8CFD53D4BAA3
Accept-Ranges: bytes
def make_put_encrypted_object(key, content, content_crypto_material):
cipher = content_crypto_material.cipher
encrypted_key = utils.b64encode_as_string(content_crypto_material.encrypted_key)
encrypted_iv = utils.b64encode_as_string(content_crypto_material.encrypted_iv)
encrypted_content = cipher.encrypt(content)
wrap_alg = content_crypto_material.wrap_alg
cek_alg = content_crypto_material.cek_alg
request_text = '''PUT /{0} HTTP/1.1
Host: ming-oss-share.oss-cn-hangzhou.aliyuncs.com
Accept-Encoding: identity
Connection: keep-alive
Content-Length: {1}
x-oss-meta-client-side-encryption-wrap-alg: {2}
x-oss-meta-client-side-encryption-cek-alg: {3}
x-oss-meta-client-side-encryption-key: {4}
x-oss-meta-client-side-encryption-start: {5}
x-oss-meta-unencrypted-content-length: {6}
date: Sat, 12 Dec 2015 00:35:53 GMT
def make_put_encrypted_object(key, content, content_crypto_material):
cipher = content_crypto_material.cipher
encrypted_key = utils.b64encode_as_string(content_crypto_material.encrypted_key)
encrypted_iv = utils.b64encode_as_string(content_crypto_material.encrypted_iv)
encrypted_content = cipher.encrypt(content)
wrap_alg = content_crypto_material.wrap_alg
cek_alg = content_crypto_material.cek_alg
request_text = '''PUT /{0} HTTP/1.1
Host: ming-oss-share.oss-cn-hangzhou.aliyuncs.com
Accept-Encoding: identity
Connection: keep-alive
Content-Length: {1}
x-oss-meta-client-side-encryption-wrap-alg: {2}
x-oss-meta-client-side-encryption-cek-alg: {3}
x-oss-meta-client-side-encryption-key: {4}
x-oss-meta-client-side-encryption-start: {5}
x-oss-meta-unencrypted-content-length: {6}
date: Sat, 12 Dec 2015 00:35:53 GMT
User-Agent: aliyun-sdk-python/2.0.2(Windows/7/;3.3.3)
# 生成一个签名的URL,将在60秒后过期
url = bucket.sign_url('GET', 'motto.txt', 60)
print(url)
# 人工构造一个使用V2签名的请求
key = 'object-from-post.txt'
boundary = 'arbitraryboundaryvalue'
headers = {'Content-Type': 'multipart/form-data; boundary=' + boundary}
encoded_policy = oss2.utils.b64encode_as_string(oss2.to_bytes('{ "expiration": "%s","conditions": [["starts-with", "$key", ""]]}'
% oss2.date_to_iso8601(datetime.datetime.utcfromtimestamp(int(time.time()) + 60))))
digest = hmac.new(oss2.to_bytes(access_key_secret), oss2.to_bytes(encoded_policy), hashlib.sha256).digest()
signature = oss2.utils.b64encode_as_string(digest)
form_fields = {
'x-oss-signature-version': 'OSS2',
'x-oss-signature': signature,
'x-oss-access-key-id': access_key_id,
'policy': encoded_policy,
'key': key,
}
# 对象的内容
content = 'file content for post object request'
body = ''
for k, v in form_fields.items():
body += '--%s\r\nContent-Disposition: form-data; name="%s"\r\n\r\n%s\r\n' % (boundary, k, v)