Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_upload_from_string_w_bytes(self):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from google.cloud.streaming import http_wrapper
BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
DATA = b'ABCDEF'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
connection = _Connection(
(loc_response, '{}'),
(chunk1_response, ''),
(chunk2_response, ''),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
blob.upload_from_string(DATA)
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')
uri = rq[0]['uri']
scheme, netloc, path, qs, _ = urlsplit(uri)
self.assertEqual(scheme, 'http')
def test_create_resumable_upload_session(self):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
loc_response = {'status': OK, 'location': UPLOAD_URL}
connection = _Connection(
(loc_response, '{}'),
)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
resumable_url = blob.create_resumable_upload_session()
self.assertEqual(resumable_url, UPLOAD_URL)
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')
uri = rq[0]['uri']
scheme, netloc, path, qs, _ = urlsplit(uri)
def test_upload_from_file_w_slash_in_name(self):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.streaming import http_wrapper
BLOB_NAME = 'parent/child'
UPLOAD_URL = 'http://example.com/upload/name/parent%2Fchild'
DATA = b'ABCDEF'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
connection = _Connection(
(loc_response, '{}'),
(chunk1_response, ''),
(chunk2_response, ''),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
with _NamedTemporaryFile() as temp:
with open(temp.name, 'wb') as file_obj:
file_obj.write(DATA)
with open(temp.name, 'rb') as file_obj:
blob.upload_from_file(file_obj, rewind=True)
self.assertEqual(file_obj.tell(), len(DATA))
def test_download_to_file_wo_media_link(self):
from io import BytesIO
from six.moves.http_client import OK
from six.moves.http_client import PARTIAL_CONTENT
BLOB_NAME = 'blob-name'
MEDIA_LINK = 'http://example.com/media/'
chunk1_response = {'status': PARTIAL_CONTENT,
'content-range': 'bytes 0-2/6'}
chunk2_response = {'status': OK,
'content-range': 'bytes 3-5/6'}
connection = _Connection(
(chunk1_response, b'abc'),
(chunk2_response, b'def'),
)
# Only the 'reload' request hits on this side: the others are done
# through the 'http' object.
reload_response = {'status': OK, 'content-type': 'application/json'}
connection._responses = [(reload_response, {"mediaLink": MEDIA_LINK})]
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
fh = BytesIO()
blob.download_to_file(fh)
self.assertEqual(fh.getvalue(), b'abcdef')
self.assertEqual(blob.media_link, MEDIA_LINK)
SOURCE_KEY_HASH).rstrip().decode('ascii')
DEST_KEY = b'90123456789012345678901234567890' # 32 bytes
DEST_KEY_B64 = base64.b64encode(DEST_KEY).rstrip().decode('ascii')
DEST_KEY_HASH = hashlib.sha256(DEST_KEY).digest()
DEST_KEY_HASH_B64 = base64.b64encode(
DEST_KEY_HASH).rstrip().decode('ascii')
BLOB_NAME = 'blob'
TOKEN = 'TOKEN'
RESPONSE = {
'totalBytesRewritten': 42,
'objectSize': 42,
'done': True,
'resource': {'etag': 'DEADBEEF'},
}
response = ({'status': OK}, RESPONSE)
connection = _Connection(response)
client = _Client(connection)
bucket = _Bucket(client=client)
source = self._make_one(
BLOB_NAME, bucket=bucket, encryption_key=SOURCE_KEY)
dest = self._make_one(BLOB_NAME, bucket=bucket,
encryption_key=DEST_KEY)
token, rewritten, size = dest.rewrite(source, token=TOKEN)
self.assertIsNone(token)
self.assertEqual(rewritten, 42)
self.assertEqual(size, 42)
kw = connection._requested
self.assertEqual(len(kw), 1)
self.assertEqual(kw[0]['method'], 'POST')
def test_generate_signed_url_w_slash_in_name(self):
BLOB_NAME = 'parent/child'
EXPIRATION = '2014-10-16T20:34:37.000Z'
connection = _Connection()
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
URI = ('http://example.com/abucket/a-blob-name?Signature=DEADBEEF'
'&Expiration=2014-10-16T20:34:37.000Z')
SIGNER = _Signer()
with mock.patch('google.cloud.storage.blob.generate_signed_url',
new=SIGNER):
signed_url = blob.generate_signed_url(EXPIRATION)
self.assertEqual(signed_url, URI)
EXPECTED_ARGS = (_Connection.credentials,)
EXPECTED_KWARGS = {
'api_access_endpoint': 'https://storage.googleapis.com',
'expiration': EXPIRATION,
def test_compose_wo_content_type_set(self):
SOURCE_1 = 'source-1'
SOURCE_2 = 'source-2'
DESTINATION = 'destinaton'
connection = _Connection()
client = _Client(connection)
bucket = _Bucket(client=client)
source_1 = self._make_one(SOURCE_1, bucket=bucket)
source_2 = self._make_one(SOURCE_2, bucket=bucket)
destination = self._make_one(DESTINATION, bucket=bucket)
with self.assertRaises(ValueError):
destination.compose(sources=[source_1, source_2])
def test_rewrite_other_bucket_other_name_no_encryption_partial(self):
from six.moves.http_client import OK
SOURCE_BLOB = 'source'
DEST_BLOB = 'dest'
DEST_BUCKET = 'other-bucket'
TOKEN = 'TOKEN'
RESPONSE = {
'totalBytesRewritten': 33,
'objectSize': 42,
'done': False,
'rewriteToken': TOKEN,
'resource': {'etag': 'DEADBEEF'},
}
response = ({'status': OK}, RESPONSE)
connection = _Connection(response)
client = _Client(connection)
source_bucket = _Bucket(client=client)
source_blob = self._make_one(SOURCE_BLOB, bucket=source_bucket)
dest_bucket = _Bucket(client=client, name=DEST_BUCKET)
dest_blob = self._make_one(DEST_BLOB, bucket=dest_bucket)
token, rewritten, size = dest_blob.rewrite(source_blob)
self.assertEqual(token, TOKEN)
self.assertEqual(rewritten, 33)
self.assertEqual(size, 42)
kw = connection._requested
self.assertEqual(len(kw), 1)
self.assertEqual(kw[0]['method'], 'POST')
PATH = '/b/name/o/%s/rewriteTo/b/%s/o/%s' % (
content_type_arg=None,
expected_content_type=None):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.streaming import http_wrapper
BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
DATA = b'ABCDEF'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
connection = _Connection(
(loc_response, '{}'),
(chunk1_response, ''),
(chunk2_response, ''),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket,
properties=properties)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
with _NamedTemporaryFile(suffix='.jpeg') as temp:
with open(temp.name, 'wb') as file_obj:
file_obj.write(DATA)
blob.upload_from_filename(temp.name,
content_type=content_type_arg)
def test_exists_miss(self):
from six.moves.http_client import NOT_FOUND
NONESUCH = 'nonesuch'
not_found_response = ({'status': NOT_FOUND}, b'')
connection = _Connection(not_found_response)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(NONESUCH, bucket=bucket)
self.assertFalse(blob.exists())