Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_gcs_client_no_credentials(self):
assert Gcs.get_gcs_client(None) == storage.Client()
path = "%s-%s" % (StringUtil().random_str(self._RANDOM_STR_LENGTH), ymd_hms,)
prefix = "%s/%s/%s" % (self._dataset, self._tblname, path)
gbq_client = BigQuery.get_bigquery_client(self._credentials)
if self._dataset and self._tblname:
table_ref = gbq_client.dataset(self._dataset).table(self._tblname)
elif self._dataset and not self._tblname:
tmp_tbl = (
"tmp_"
+ StringUtil().random_str(self._RANDOM_STR_LENGTH)
+ "_"
+ ymd_hms
)
table_ref = gbq_client.dataset(self._dataset).table(tmp_tbl)
gcs_client = Gcs.get_gcs_client(self._credentials)
gcs_bucket = gcs_client.get_bucket(self._bucket)
# extract job config settings
ext_job_config = BigQuery.get_extract_job_config()
ext_job_config.compression = BigQuery.get_compression_type()
ext = ".csv"
if self._filename:
_, ext = os.path.splitext(self._filename)
support_ext = [".csv", ".json"]
if ext not in support_ext:
raise InvalidParameter("%s is not supported as filename." % ext)
ext_job_config.destination_format = BigQuery.get_destination_format(ext)
comp_format_and_ext = {"GZIP": ".gz"}
comp_ext = comp_format_and_ext.get(str(BigQuery.get_compression_type()))
if self._filename:
def execute(self, *args):
super().execute()
valid = EssentialParameters(
self.__class__.__name__, [self._src_dir, self._src_pattern]
)
valid()
gcs_client = Gcs.get_gcs_client(self._credentials)
bucket = gcs_client.get_bucket(self._bucket)
files = super().get_target_files(self._src_dir, self._src_pattern)
self._logger.info("Upload files %s" % files)
for file in files:
self._logger.info("Start upload %s" % file)
blob = bucket.blob(os.path.join(self._dest_dir, os.path.basename(file)))
blob.upload_from_filename(file)
self._logger.info("Finish upload %s" % file)
def execute(self, *args):
super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._src_pattern])
valid()
client = Gcs.get_gcs_client(self._credentials)
bucket = client.get_bucket(self._bucket)
dl_files = []
for blob in bucket.list_blobs(prefix=self._prefix, delimiter=self._delimiter):
r = re.compile(self._src_pattern)
if not r.fullmatch(blob.name):
continue
dl_files.append(blob.name)
blob.download_to_filename(
os.path.join(self._dest_dir, os.path.basename(blob.name))
)
ObjectStore.put(self._step, dl_files)
def execute(self, *args):
super().execute()
valid = EssentialParameters(
self.__class__.__name__, [self._src_dir, self._src_pattern]
)
valid()
gcs_client = Gcs.get_gcs_client(self._credentials)
bucket = gcs_client.get_bucket(self._bucket)
files = super().get_target_files(self._src_dir, self._src_pattern)
self._logger.info("Upload files %s" % files)
for file in files:
self._logger.info("Start upload %s" % file)
blob = bucket.blob(os.path.join(self._dest_dir, os.path.basename(file)))
blob.upload_from_filename(file)
self._logger.info("Finish upload %s" % file)