Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_auth_no_credentials(self):
assert ServiceAccount.auth(None) is None
is_inserted: if the data is already inserted or not
if_exists: replace or append
"""
df = pandas.DataFrame(self._format_insert_data(insert_rows))
if is_inserted is True:
# if_exists after the first insert execution
if_exists = self._APPEND
dest_tbl = self._dataset + "." + self._tblname
self._logger.info("Start insert %s rows to %s" % (len(insert_rows), dest_tbl))
df.to_gbq(
dest_tbl,
project_id=self._project_id,
if_exists=if_exists,
table_schema=self._table_schema,
location=self._location,
credentials=ServiceAccount.auth(self._credentials),
)
def get_gcs_client(credentials):
return storage.Client(credentials=ServiceAccount.auth(credentials))
def execute(self, *args):
super().execute()
valid = EssentialParameters(self.__class__.__name__, [self._key])
valid()
df = pandas.read_gbq(
query=self._get_query(),
dialect="standard",
location=self._location,
project_id=self._project_id,
credentials=ServiceAccount.auth(self._credentials),
)
ObjectStore.put(self._key, df)
is_inserted: if the data is already inserted or not
if_exists: replace or append
"""
df = pandas.DataFrame(self.__format_insert_data(insert_rows))
if is_inserted is True:
# if_exists after the first insert execution
if_exists = self.APPEND
dest_tbl = self._dataset + "." + self._tblname
self._logger.info("Start insert %s rows to %s" % (len(insert_rows), dest_tbl))
df.to_gbq(
dest_tbl,
project_id=self._project_id,
if_exists=if_exists,
table_schema=self._table_schema,
location=self._location,
credentials=ServiceAccount.auth(self._credentials),
)
def get_firestore_client(credentials):
return firestore.Client(credentials=ServiceAccount.auth(credentials))
def _save_to_cache(self):
self._logger.info("Save data to on memory")
df = pandas.read_gbq(
query="SELECT * FROM %s.%s" % (self._dataset, self._tblname)
if self._query is None
else self._query,
dialect="standard",
location=self._location,
project_id=self._project_id,
credentials=ServiceAccount.auth(self._credentials),
)
ObjectStore.put(self._key, df)
def get_bigquery_client(credentials):
"""
get bigquery client object
Args:
credentials: gcp service account json
"""
return bigquery.Client(credentials=ServiceAccount.auth(credentials))