Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_remote_location(path, staging_location):
"""Get the remote location of the file
Args:
path (str): raw path of the file
staging_location (str): path to stage the file
"""
if is_gs_path(path):
return path, False
if staging_location is None:
return "", True
if not is_gs_path(staging_location):
raise ValueError("Staging location must be in GCS")
filename = ntpath.basename(path)
return staging_location + "/" + filename, True
def _get_remote_location(path, staging_location):
"""Get the remote location of the file
Args:
path (str): raw path of the file
staging_location (str): path to stage the file
"""
if is_gs_path(path):
return path, False
if staging_location is None:
return "", True
if not is_gs_path(staging_location):
raise ValueError("Staging location must be in GCS")
filename = ntpath.basename(path)
return staging_location + "/" + filename, True
Returns: (str) path to the downloaded file
"""
if not staging_location:
df = self.download_table_as_df(full_table_id)
if file_type == FileType.CSV:
df.to_csv(dest, index=False)
elif file_type == FileType.JSON:
df.to_json(dest, index=False)
else:
raise ValueError(
"Only FileType: CSV and JSON are supported for download_table_as_file without staging location"
)
return dest
if not is_gs_path(staging_location):
raise ValueError("staging_uri must be a directory in GCS")
shard_folder = self.__extract_table_to_shard_folder(
full_table_id, staging_location, file_type)
return gcs_folder_to_file(shard_folder, dest)
Timestamp value to assign to all features in the dataset.
serving_store (feast.sdk.resources.feature.DataStore): Defaults to None.
Serving store to write the features in this instance to.
warehouse_store (feast.sdk.resources.feature.DataStore): Defaults to None.
Warehouse store to write the features in this instance to.
job_options (dict): Defaults to empty dict. Additional job options.
Returns:
Importer: the importer for the dataset provided.
"""
src_type = "file.csv"
source_options = {}
source_options["path"], require_staging = _get_remote_location(
path, staging_location
)
if is_gs_path(path):
df = gcs_to_df(path)
else:
df = pd.read_csv(path)
schema, features = _detect_schema_and_feature(
entity,
owner,
id_column,
feature_columns,
timestamp_column,
timestamp_value,
serving_store,
warehouse_store,
df,
)
iport_spec = _create_import(
src_type, source_options, job_options, entity, schema
"""
Download a BigQuery table as Pandas Dataframe
Args:
full_table_id (src) : fully qualified BigQuery table id
staging_location: url to staging_location (currently
support a folder in GCS)
Returns: pandas.DataFrame: dataframe of the training dataset
"""
if not staging_location:
table = bigquery.TableReference.from_string(full_table_id)
rows = self.bqclient.list_rows(table)
return rows.to_dataframe(bqstorage_client=self.bqstorageclient)
if not is_gs_path(staging_location):
raise ValueError("staging_uri must be a directory in GCS")
shard_folder = self.__extract_table_to_shard_folder(
full_table_id, staging_location, DestinationFormat.CSV)
return gcs_folder_to_df(shard_folder)