Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_string_credentials_user(self):
with open(self.user_file, 'r') as json_file:
project.initialize(_user=json_file.read())
service = get_service('oauth2', 'v2', 'user')
response = service.userinfo().get().execute()
self.assertIn('email', response)
self.helper_refresh()
def test_string_credentials_service(self):
with open(self.service_file, 'r') as json_file:
project.initialize(_service=json_file.read())
service = get_service('cloudresourcemanager', 'v1', 'service')
response = service.projects().list().execute()
self.assertIn('projects', response)
def object_delete(auth, path):
bucket, filename = path.split(':', 1)
service = get_service('storage', 'v1', auth)
return service.objects().delete(bucket=bucket, object=filename).execute()
def object_put(auth, path, data, mimetype='application/octet-stream'):
bucket, filename = path.split(':', 1)
service = get_service('storage', 'v1', auth)
media = MediaIoBaseUpload(data, mimetype=mimetype, chunksize=CHUNKSIZE, resumable=True)
request = service.objects().insert(bucket=bucket, name=filename, media_body=media)
response = None
errors = 0
while response is None:
error = None
try:
status, response = request.next_chunk()
if project.verbose and status: print "Uploaded %d%%." % int(status.progress() * 100)
except HttpError, e:
if e.resp.status < 500: raise
error = e
except (httplib2.HttpLib2Error, IOError), e:
error = e
def object_get_chunks(auth, path, chunksize=CHUNKSIZE, encoding=None):
bucket, filename = path.split(':', 1)
service = get_service('storage', 'v1', auth)
data = BytesIO()
request = service.objects().get_media(bucket=bucket, object=filename)
media = MediaIoBaseDownload(data, request, chunksize=chunksize)
retries = 0
done = False
while not done:
error = None
try:
progress, done = media.next_chunk()
if progress: print('Download %d%%' % int(progress.progress() * 100))
data.seek(0)
#yield data
yield data.read().decode(encoding) if encoding else data
data.seek(0)
def sheets_batch_update(auth, sheet_url_or_name, data):
service = get_service('sheets', 'v4', auth)
sheet_id = sheets_id(auth, sheet_url_or_name)
API_Retry(service.spreadsheets().batchUpdate(spreadsheetId=sheet_id, body=data))
def sheets_get(auth, sheet_url_or_name):
sheet_id = sheets_id(auth, sheet_url_or_name)
if sheet_id:
service = get_service('sheets', 'v4', auth)
return API_Retry(service.spreadsheets().get(spreadsheetId=sheet_id))
else:
return None
def bucket_access(auth, project, name, role, emails=[], groups=[], services=[], domains=[]):
service = get_service('storage', 'v1', auth)
entities = map(lambda e: 'user-%s' % e, emails) + \
map(lambda e: 'group-%s' % e, groups) + \
map(lambda e: 'user-%s' % e, services) + \
map(lambda e: 'domain-%s' % e, domains)
for entity in entities:
body = {
"kind": "storage#bucketAccessControl",
"bucket":name,
"entity":entity,
"role":role
}
API_Retry(service.bucketAccessControls().insert(bucket=name, body=body))
def sheets_clear(auth, sheet_url, sheet_tab, sheet_range):
if project.verbose: print 'SHEETS CLEAR', sheet_url, sheet_tab, sheet_range
service = get_service('sheets', 'v4', auth)
sheet_id = sheets_id(sheet_url)
API_Retry(service.spreadsheets().values().clear(spreadsheetId=sheet_id, range=sheets_tab_range(sheet_tab, sheet_range), body={}))
def bucket_get(auth, name):
service = get_service('storage', 'v1', auth)
try: return service.buckets().get(bucket=name).execute()
except HttpError, e:
if e.resp.status == 404: return None
elif e.resp.status in [403, 500, 503]: sleep(5)
else: raise