Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, project_id: str, bucket_name: str,
service_account_file: str):
self._bucket_name = bucket_name
self._client = storage.Client(project=project_id,
credentials=service_account.Credentials.\
from_service_account_file(service_account_file))
self._fs = GCSFileSystem(token=service_account_file,
check_connection=True)
# The chunk size for uploading
chunk_size = 1024*1024*5
# GCP Service account credentials.
gcp_credentials = service_account.Credentials.from_service_account_file(
gcp_configs.get("service_account_file"))
# The Google Cloud Storage client.
gcs_client = storage.Client(
project=gcp_configs.get("project_id"),
credentials=gcp_credentials)
# The GCSFileSystem interface
gcs_fs = GCSFileSystem(
token=gcp_configs.get("service_account_file"),
check_connection=True)
def get_object(bucket_name, blob_name):
"""Get a JSON object from bucket as a Python dictionary.
Args:
bucket_name: the name of the bucket.
blob_name: the name of the blob object relative to the bucket.
Returns: a Python dictionary.
"""
blob = gcs_client.bucket(bucket_name).get_blob(blob_name)
if blob is None:
raise ValueError("Cannot find blob {}/{}".format(bucket_name, blob_name))