Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exists_hit(self):
from six.moves.http_client import OK
BLOB_NAME = 'blob-name'
found_response = ({'status': OK}, b'')
connection = _Connection(found_response)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
bucket._blobs[BLOB_NAME] = 1
self.assertTrue(blob.exists())
def _download_to_file_helper(self, chunk_size=None):
from io import BytesIO
from six.moves.http_client import OK
from six.moves.http_client import PARTIAL_CONTENT
BLOB_NAME = 'blob-name'
chunk1_response = {'status': PARTIAL_CONTENT,
'content-range': 'bytes 0-2/6'}
chunk2_response = {'status': OK,
'content-range': 'bytes 3-5/6'}
connection = _Connection(
(chunk1_response, b'abc'),
(chunk2_response, b'def'),
)
client = _Client(connection)
bucket = _Bucket(client)
MEDIA_LINK = 'http://example.com/media/'
properties = {'mediaLink': MEDIA_LINK}
blob = self._make_one(BLOB_NAME, bucket=bucket, properties=properties)
if chunk_size is not None:
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = chunk_size
fh = BytesIO()
blob.download_to_file(fh)
self.assertEqual(fh.getvalue(), b'abcdef')
from six.moves.http_client import PARTIAL_CONTENT
from google.cloud._testing import _NamedTemporaryFile
BLOB_NAME = 'blob-name'
KEY = b'aa426195405adee2c8081bb9e7e74b19'
HEADER_KEY_VALUE = 'YWE0MjYxOTU0MDVhZGVlMmM4MDgxYmI5ZTdlNzRiMTk='
HEADER_KEY_HASH_VALUE = 'V3Kwe46nKc3xLv96+iJ707YfZfFvlObta8TQcx2gpm0='
chunk1_response = {'status': PARTIAL_CONTENT,
'content-range': 'bytes 0-2/6'}
chunk2_response = {'status': OK,
'content-range': 'bytes 3-5/6'}
connection = _Connection(
(chunk1_response, b'abc'),
(chunk2_response, b'def'),
)
client = _Client(connection)
bucket = _Bucket(client)
MEDIA_LINK = 'http://example.com/media/'
properties = {'mediaLink': MEDIA_LINK,
'updated': '2014-12-06T13:13:50.690Z'}
blob = self._make_one(BLOB_NAME, bucket=bucket, properties=properties,
encryption_key=KEY)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 3
with _NamedTemporaryFile() as temp:
blob.download_to_filename(temp.name)
with open(temp.name, 'rb') as file_obj:
wrote = file_obj.read()
mtime = os.path.getmtime(temp.name)
updatedTime = time.mktime(blob.updated.timetuple())
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.streaming import http_wrapper
BLOB_NAME = 'parent/child'
UPLOAD_URL = 'http://example.com/upload/name/parent%2Fchild'
DATA = b'ABCDEF'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
connection = _Connection(
(loc_response, '{}'),
(chunk1_response, ''),
(chunk2_response, ''),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
with _NamedTemporaryFile() as temp:
with open(temp.name, 'wb') as file_obj:
file_obj.write(DATA)
with open(temp.name, 'rb') as file_obj:
blob.upload_from_file(file_obj, rewind=True)
self.assertEqual(file_obj.tell(), len(DATA))
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['redirections'], 5)
self.assertEqual(rq[0]['body'], DATA)
from google.cloud._testing import _NamedTemporaryFile
from google.cloud.streaming import http_wrapper
BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
DATA = b'ABCDEF'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
connection = _Connection(
(loc_response, '{}'),
(chunk1_response, ''),
(chunk2_response, ''),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket,
properties=properties)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
with _NamedTemporaryFile(suffix='.jpeg') as temp:
with open(temp.name, 'wb') as file_obj:
file_obj.write(DATA)
blob.upload_from_filename(temp.name,
content_type=content_type_arg)
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')
uri = rq[0]['uri']
import hashlib
from six.moves.http_client import OK
BLOB_NAME = 'blob-name'
BLOB_KEY = b'01234567890123456789012345678901' # 32 bytes
BLOB_KEY_B64 = base64.b64encode(BLOB_KEY).rstrip().decode('ascii')
BLOB_KEY_HASH = hashlib.sha256(BLOB_KEY).digest()
BLOB_KEY_HASH_B64 = base64.b64encode(
BLOB_KEY_HASH).rstrip().decode('ascii')
STORAGE_CLASS = u'NEARLINE'
RESPONSE = {
'resource': {'storageClass': STORAGE_CLASS},
}
response = ({'status': OK}, RESPONSE)
connection = _Connection(response)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(
BLOB_NAME, bucket=bucket, encryption_key=BLOB_KEY)
blob.update_storage_class('NEARLINE')
self.assertEqual(blob.storage_class, 'NEARLINE')
kw = connection._requested
self.assertEqual(len(kw), 1)
self.assertEqual(kw[0]['method'], 'POST')
PATH = '/b/name/o/%s/rewriteTo/b/name/o/%s' % (BLOB_NAME, BLOB_NAME)
self.assertEqual(kw[0]['path'], PATH)
self.assertNotIn('query_params', kw[0])
SENT = {'storageClass': STORAGE_CLASS}
self.assertEqual(kw[0]['data'], SENT)
chunk_size=5,
status=None):
from six.moves.http_client import OK
from six.moves.urllib.parse import parse_qsl
from six.moves.urllib.parse import urlsplit
from google.cloud._testing import _NamedTemporaryFile
BLOB_NAME = 'blob-name'
DATA = b'ABCDEF'
if status is None:
status = OK
response = {'status': status}
connection = _Connection(
(response, b'{}'),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket, properties=properties)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = chunk_size
with _NamedTemporaryFile() as temp:
with open(temp.name, 'wb') as file_obj:
file_obj.write(DATA)
with open(temp.name, 'rb') as file_obj:
blob.upload_from_file(file_obj, rewind=True,
content_type=content_type_arg)
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')
from google.cloud.streaming import http_wrapper
BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
DATA = b'ABCDEF'
loc_response = {'status': OK, 'location': UPLOAD_URL}
chunk1_response = {'status': http_wrapper.RESUME_INCOMPLETE,
'range': 'bytes 0-4'}
chunk2_response = {'status': OK}
# Need valid JSON on last response, since resumable.
connection = _Connection(
(loc_response, b''),
(chunk1_response, b''),
(chunk2_response, b'{}'),
)
client = _Client(connection)
bucket = _Bucket(client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
blob._CHUNK_SIZE_MULTIPLE = 1
blob.chunk_size = 5
# Set the threshhold low enough that we force a resumable upload.
patch = mock.patch(
'google.cloud.streaming.transfer.RESUMABLE_UPLOAD_THRESHOLD',
new=5)
with patch:
with _NamedTemporaryFile() as temp:
with open(temp.name, 'wb') as file_obj:
file_obj.write(DATA)
with open(temp.name, 'rb') as file_obj:
blob.upload_from_file(file_obj, rewind=True)
def test_create_resumable_upload_session_args(self):
from six.moves.http_client import OK
BLOB_NAME = 'blob-name'
UPLOAD_URL = 'http://example.com/upload/name/key'
CONTENT_TYPE = 'text/plain'
SIZE = 1024
ORIGIN = 'http://google.com'
loc_response = {'status': OK, 'location': UPLOAD_URL}
connection = _Connection(
(loc_response, '{}'),
)
client = _Client(connection)
bucket = _Bucket(client=client)
blob = self._make_one(BLOB_NAME, bucket=bucket)
resumable_url = blob.create_resumable_upload_session(
content_type=CONTENT_TYPE,
size=SIZE,
origin=ORIGIN)
self.assertEqual(resumable_url, UPLOAD_URL)
rq = connection.http._requested
self.assertEqual(len(rq), 1)
self.assertEqual(rq[0]['method'], 'POST')
headers = {
key.title(): str(value) for key, value in rq[0]['headers'].items()}