Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
("gcs://bucket/file.json", GCSFileSystem),
],
)
def test_protocol_usage(self, filepath, instance_type):
data_set = JSONDataSet(filepath=filepath)
assert isinstance(data_set._fs, instance_type)
assert str(data_set._filepath) == data_set._fs._strip_protocol(filepath)
assert isinstance(data_set._filepath, PurePosixPath)
def __init__(self):
self.USI_GOOGLE_CLOUD_PROJECT = getenv('USI_GOOGLE_CLOUD_PROJECT')
self.USI_GOOGLE_CLOUD_BUCKET = getenv('USI_GOOGLE_CLOUD_BUCKET')
self.USI_GOOGLE_CLOUD_PROCESSED = getenv('USI_GOOGLE_CLOUD_PROCESSED')
self.USI_GOOGLE_CLOUD_UNPROCESSED = getenv(
'USI_GOOGLE_CLOUD_UNPROCESSED')
self.USI_GOOGLE_CLOUD_SERVICE_ACCOUNT = getenv(
'USI_GOOGLE_CLOUD_SERVICE_ACCOUNT')
self.fs = gcsfs.GCSFileSystem(
project=self.USI_GOOGLE_CLOUD_PROJECT, token=self.USI_GOOGLE_CLOUD_SERVICE_ACCOUNT)
Here you can find all available arguments:
https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.to_csv.html
All defaults are preserved, but "index", which is set to False.
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
project: The GCP project. If not specified, then the default is inferred
by a remote request.
https://cloud.google.com/resource-manager/docs/creating-managing-projects
gcsfs_args: Extra arguments to pass into ``GCSFileSystem``. See
https://gcsfs.readthedocs.io/en/latest/api.html#gcsfs.core.GCSFileSystem
"""
_credentials = deepcopy(credentials) or {}
_gcsfs_args = deepcopy(gcsfs_args) or {}
_gcs = gcsfs.GCSFileSystem(project=project, token=_credentials, **_gcsfs_args)
path = _gcs._strip_protocol(filepath) # pylint: disable=protected-access
path = PurePosixPath("{}/{}".format(bucket_name, path) if bucket_name else path)
super().__init__(
filepath=path,
version=version,
exists_function=_gcs.exists,
glob_function=_gcs.glob,
load_args=load_args,
save_args=save_args,
)
self._gcs = _gcs
def __init__(self, path='.', gcs=None, nfiles=10, **fsargs):
if gcs is None:
# minimum block size: still read on 5MB boundaries.
self.gcs = GCSFileSystem(block_size=30 * 2 ** 20,
cache_timeout=6000, **fsargs)
else:
self.gcs = gcs
self.cache = SmallChunkCacher(self.gcs, nfiles=nfiles)
self.write_cache = {}
self.counter = 0
self.root = path
.master(MASTER_URL)
.config('spark.cassandra.connection.host', MORPHL_SERVER_IP_ADDRESS)
.config('spark.cassandra.auth.username', MORPHL_CASSANDRA_USERNAME)
.config('spark.cassandra.auth.password', MORPHL_CASSANDRA_PASSWORD)
.config('spark.sql.shuffle.partitions', 16)
.getOrCreate())
log4j = spark_session.sparkContext._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)
save_options_usi_csv_features_raw_p_df = {
'keyspace': MORPHL_CASSANDRA_KEYSPACE,
'table': 'usi_csv_features_raw_p'
}
fs = gcsfs.GCSFileSystem(
project=USI_GOOGLE_CLOUD_PROJECT, token=USI_GOOGLE_CLOUD_SERVICE_ACCOUNT)
auth_provider = PlainTextAuthProvider(
username=MORPHL_CASSANDRA_USERNAME,
password=MORPHL_CASSANDRA_PASSWORD
)
cluster = Cluster(
[MORPHL_SERVER_IP_ADDRESS], auth_provider=auth_provider)
spark_session_cass = cluster.connect(MORPHL_CASSANDRA_KEYSPACE)
prep_stmt_predictions_statistics = spark_session_cass.prepare(
'INSERT INTO usi_csv_files (always_zero, day_of_data_capture, is_processed) VALUES (0, ?, false)'
)
save_args: Additional saving options for `pyarrow.parquet.write_table`.
Here you can find all available arguments:
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.write_table.html?highlight=write_table#pyarrow.parquet.write_table
version: If specified, should be an instance of
``kedro.io.core.Version``. If its ``load`` attribute is
None, the latest version will be loaded. If its ``save``
attribute is None, save version will be autogenerated.
project: The GCP project. If not specified, then the default is inferred
by a remote request.
https://cloud.google.com/resource-manager/docs/creating-managing-projects
gcsfs_args: Extra arguments to pass into ``GCSFileSystem``. See
https://gcsfs.readthedocs.io/en/latest/api.html#gcsfs.core.GCSFileSystem
"""
_credentials = deepcopy(credentials) or {}
_gcsfs_args = deepcopy(gcsfs_args) or {}
_gcs = gcsfs.GCSFileSystem(project=project, token=_credentials, **_gcsfs_args)
path = _gcs._strip_protocol(filepath) # pylint: disable=protected-access
path = PurePosixPath("{}/{}".format(bucket_name, path) if bucket_name else path)
super().__init__(
filepath=path,
version=version,
exists_function=_gcs.exists,
glob_function=_gcs.glob,
load_args=load_args,
save_args=save_args,
)
self._gcs = _gcs
default file
Args:
parameters (dict): Parameters for the dataset to be processed. Must
include keys "gs_bucket" (the name of the FQ bucket file path,
for example `"gs://X/Y/Z/data.zarr"`) and "volume_name", which
is the name of the volume in the zarr file (e.g. "raw")
Returns:
None
"""
self.parameters = parameters
self.ingest_job = self.parameters.pop("ingest_job")
self.gs_bucket = self.parameters["gs_bucket"]
self.volume_name = self.parameters["volume_name"]
Zg = zarr.group(store=GCSFileSystem(token="cache").get_mapper(self.gs_bucket))
self.vol = Zg[self.volume_name]