Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._timeout,
self._retry_count,
self._port,
)
files = sftp.list_files(
self._src_dir, self._dest_dir, re.compile(self._src_pattern)
)
if self._quit is True and len(files) == 0:
self._logger.info("No file was found. After process will not be processed")
return StepStatus.SUCCESSFUL_TERMINATION
self._logger.info("Files downloaded %s" % files)
# cache downloaded file names
ObjectStore.put(self._step, files)
self._password,
self._timeout,
self._retry_count,
self._port,
self._tls,
)
files = ftp_util.list_files(
self._src_dir, self._dest_dir, re.compile(self._src_pattern)
)
if self._quit is True and len(files) == 0:
self._logger.info("No file was found. After process will not be processed")
return StepStatus.SUCCESSFUL_TERMINATION
# cache downloaded file names
ObjectStore.put(self._step, files)
def _save_to_cache(self):
self._logger.info("Save data to on memory")
df = pandas.read_gbq(
query="SELECT * FROM %s.%s" % (self._dataset, self._tblname)
if self._query is None
else self._query,
dialect="standard",
location=self._location,
project_id=self._project_id,
credentials=ServiceAccount.auth(self._credentials),
)
ObjectStore.put(self._key, df)
valid = EssentialParameters(self.__class__.__name__, [self._src_pattern])
valid()
client = Gcs.get_gcs_client(self._credentials)
bucket = client.get_bucket(self._bucket)
dl_files = []
for blob in bucket.list_blobs(prefix=self._prefix, delimiter=self._delimiter):
r = re.compile(self._src_pattern)
if not r.fullmatch(blob.name):
continue
dl_files.append(blob.name)
blob.download_to_filename(
os.path.join(self._dest_dir, os.path.basename(blob.name))
)
ObjectStore.put(self._step, dl_files)
# extract only the specified columns
row_dict = {}
for c in self._columns:
if not row.get(c):
continue
row_dict[c] = row.get(c)
self._s.save(row_dict)
else:
reader = csv.reader(f)
header = next(reader, None)
for row in reader:
row_dict = dict(zip(header, row))
self._s.save(row_dict)
# cache downloaded file names
ObjectStore.put(self._step, files)
def execute(self, *args):
super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._key])
valid()
df = pandas.read_gbq(
query=self._get_query(),
dialect="standard",
location=self._location,
project_id=self._project_id,
credentials=ServiceAccount.auth(self._credentials),
)
ObjectStore.put(self._key, df)