Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def update_package_descriptor():
"""
"""
p = Package("datapackage.json")
for f in os.listdir("resources"):
path = os.path.join("resources", f)
r = Resource(path)
p.add_resource(r.descriptor)
p.commit()
os.remove(path)
os.rmdir("resources")
p.save("datapackage.json")
os.chdir(copied_root)
for r in sequence_resources:
write_sequences(
r.name + ".csv", dfs[r.name].loc[temporal.index], replace=True
)
# write temporal information from clustering
temporal.to_csv(
"data/temporal.csv",
header=True,
sep=";",
date_format="%Y-%m-%dT%H:%M:%SZ",
)
# add meta data for new temporal information
r = Resource({"path": "data/temporal.csv"})
r.infer()
r.descriptor[
"description"
] = "Temporal selection based on skipped timesteps. Skipped n={}".format(n)
# Update meta-data of copied package
cp = Package("datapackage.json")
cp.descriptor["name"] = copied_package_name
cp.descriptor["resources"].append(r.descriptor)
cp.commit()
cp.save("datapackage.json")
# set back to 'old' workdirectory
os.chdir(cwd)
["defs_df"]['COLUMN_NAME'])
# Create a data package to contain our resources, based on the template
# JSON file that we have already prepared as an input.
pkg = datapackage.Package(os.path.join(input_dir, "datapackage.json"))
for res in resources:
# Convert the definitions to a dictionary of field descriptions
field_desc = resources[res]["defs_df"].set_index(
'COLUMN_NAME').to_dict()['FIELD_DESCRIPTION']
# Set the description attribute of the fields in the schema using field
# descriptions.
for field in resources[res]["json"]["schema"]["fields"]:
field['description'] = field_desc[field['name']]
resources[res]["resource"] = datapackage.Resource(
descriptor=resources[res]["json"])
# Make sure we didn't miss or re-name any fields accidentally
json_fields = resources[res]["resource"].schema.field_names
defs_fields = list(resources[res]["defs_df"]['COLUMN_NAME'])
data_fields = list(resources[res]['data_df'].columns)
assert json_fields == defs_fields, "json vs. defs missing field: {}".format(
set(json_fields).symmetric_difference(set(defs_fields)))
assert data_fields == defs_fields, "data vs. defs missing field: {}".format(
set(data_fields).symmetric_difference(set(defs_fields)))
resources[res]["resource"].infer()
resources[res]["resource"].commit()
# Need to clean up the integer NA values in the data before outputting:
for field in resources[res]["resource"].schema.field_names:
if resources[res]["resource"].schema.get_field(field).type == 'integer':
def schema_validator(resource, iterator,
field_names=None, on_error=None):
if on_error is None:
on_error = raise_exception
on_error = wrap_handler(on_error)
if isinstance(resource, Resource):
schema: Schema = resource.schema
assert schema is not None
resource = resource.descriptor
else:
schema: Schema = Schema(resource.get('schema', {}))
if field_names is None:
field_names = [f.name for f in schema.fields]
schema_fields = [f for f in schema.fields if f.name in field_names]
for i, row in enumerate(iterator):
field = None
try:
for field in schema_fields:
row[field.name] = field.cast_value(row.get(field.name))
except CastError as e:
if not on_error(resource['name'], row, i, e, field):
continue
def process_datapackage(self, dp: Package):
name = self.name
if name is None:
name = 'res_{}'.format(len(dp.resources) + 1)
self.res = Resource(dict(
name=name,
path='{}.csv'.format(name)
), storage=iterable_storage(self.handle_iterable()))
self.res.infer()
if self.exc is not None:
raise self.exc
dp.descriptor.setdefault('resources', []).append(self.res.descriptor)
return dp
unpartitioned_tables = get_unpartioned_tables([table_name],
datapkg_settings)
data_sources = data_sources_from_tables(unpartitioned_tables)
descriptor['sources'] = get_source_metadata(data_sources,
datapkg_settings)
descriptor['start_date'] = \
get_date_from_sources(descriptor['sources'], 'start_date')
descriptor['end_date'] = \
get_date_from_sources(descriptor['sources'], 'end_date')
if partitions:
for part in partitions.keys():
if part in table_name:
descriptor['group'] = part
resource = datapackage.Resource(descriptor)
if resource.valid:
logger.debug(f"{table_name} is a valid resource")
if not resource.valid:
logger.info(resource)
raise AssertionError(
f"""
Invalid tabular data resource: {resource.name}
Errors:
{resource.errors}
"""
)
return descriptor
"""
# Where the CSV file holding the data is, relative to datapackage.json
# This is the value that has to be embedded in the data package.
csv_relpath = os.path.join('data', f'{table_name}.csv')
# We need to access the file to calculate hash and size too:
csv_abspath = os.path.join(os.path.abspath(pkg_dir), csv_relpath)
# pull the skeleton of the descriptor from the megadata file
descriptor = pudl.helpers.pull_resource_from_megadata(table_name)
descriptor['path'] = csv_relpath
descriptor['bytes'] = os.path.getsize(csv_abspath)
descriptor['hash'] = pudl.output.export.hash_csv(csv_abspath)
descriptor['created'] = (datetime.datetime.utcnow().
replace(microsecond=0).isoformat() + 'Z'),
resource = datapackage.Resource(descriptor)
if resource.valid:
logger.debug(f"{table_name} is a valid resource")
if not resource.valid:
raise AssertionError(
f"""
Invalid tabular data resource: {resource.name}
Errors:
{resource.errors}
"""
)
return descriptor
descriptor['hash'] = hash_csv(csv_abspath)
# If omitted, icenses are inherited from the containing data package.
descriptor["licenses"] = [pudl.constants.licenses['cc-by-4.0'], ]
data_sources = \
pudl.helpers.data_sources_from_tables([table.name, ])
# descriptor["sources"] = \
# [pudl.constants.data_sources[src] for src in data_sources]
descriptor["sources"] = []
for src in data_sources:
if src in pudl.constants.data_sources:
descriptor["sources"].append({"title": src,
"path": pc.base_data_urls[src]})
resource = datapackage.Resource(descriptor)
if not resource.valid:
raise AssertionError(
f"""
Invalid tabular data resource: {resource.name}
Errors:
{resource.errors}
"""
)
return descriptor
def _load_raw_data(self, resource_name):
"""Extract raw data from resource
:param resource_name:
"""
# Instantiating the resource again as a simple `Resource` ensures that
# ``data`` will be returned as bytes.
upcast_resource = datapackage.Resource(
self.__resources[resource_name].descriptor,
base_path=self.__base_path)
return upcast_resource.raw_read()
# If omitted, icenses are inherited from the containing data package.
descriptor["licenses"] = [{
"name": "CC-BY-4.0",
"title": "Creative Commons Attribution 4.0",
"path": "https://creativecommons.org/licenses/by/4.0/"
}]
# This should also include the table specific data sources.
descriptor["sources"] = [{
"title": "Public Utility Data Liberation Project (PUDL)",
"path": "https://catalyst.coop/public-utility-data-liberation/",
"email": "pudl@catalyst.coop",
}]
resource = datapackage.Resource(descriptor)
if not resource.valid:
raise AssertionError(
f"""
Invalid tabular data resource: {resource.name}
Errors:
{resource.errors}
"""
)
return descriptor