Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_drop_nones(self):
orig = {'a': 1, 'b': None, 'c': [3, None, 4, None]}
ref = {'a': 1, 'c': [3, 4]}
drop_none = lambda p, k, v: v is not None
remapped = remap(orig, visit=drop_none)
assert remapped == ref
orig = [None] * 100
remapped = remap(orig, drop_none)
assert not remapped
if obj.project is None:
obj.reload("id", "project", "data")
project = obj.project.fetch()
ctx = {
"cid": str(obj.id),
"title": project.title,
"references": project.references,
"landing_page": f"/{project.id}/",
"more": f"/{obj.id}",
}
ctx["descriptions"] = project.description.strip().split(".", 1)
authors = [a.strip() for a in project.authors.split(",") if a]
ctx["authors"] = {"main": authors[0], "etal": authors[1:]}
ctx["data"] = j2h.convert(
json=remap(obj.data, visit=visit, enter=enter),
table_attributes='class="table is-narrow is-fullwidth has-background-light"',
)
return html_minify(render_template(f"card_{fmt}.html", **ctx))
else:
raise UnknownFieldError
"blurb",
"peek",
"designation",
"visible",
"metadata",
]
for attribute in attributes:
if attribute in dataset_attrs:
value = dataset_attrs[attribute]
if attribute == "metadata":
def remap_objects(p, k, obj):
if isinstance(obj, dict) and "model_class" in obj and obj["model_class"] == "MetadataFile":
return (k, model.MetadataFile(dataset=hda, uuid=obj["uuid"]))
return (k, obj)
value = remap(value, remap_objects)
setattr(hda, attribute, value)
handle_dataset_object_edit(hda)
self._flush()
else:
metadata = dataset_attrs['metadata']
model_class = dataset_attrs.get("model_class", "HistoryDatasetAssociation")
if model_class == "HistoryDatasetAssociation":
# Create dataset and HDA.
dataset_instance = model.HistoryDatasetAssociation(name=dataset_attrs['name'],
extension=dataset_attrs['extension'],
info=dataset_attrs['info'],
blurb=dataset_attrs['blurb'],
peek=dataset_attrs['peek'],
Args:
value: Value that needs recursive conversion to a UTF-8 encoded
string.
Returns:
A UTF-8 encoded string representation of value.
"""
def visit(path, key, value):
if isinstance(value, (bytes, bytearray)):
value = str(value, encoding="UTF-8", errors="replace")
elif isinstance(value, uuid.UUID):
value = str(value)
return key, value
return iterutils.remap(value, visit=visit)
if isinstance(value, dict):
return value, ItemsView(value)
else:
return default_enter(path, key, value)
def visit(p, k, v):
print(**kwargs)
print('visit path: {}'.format(p))
print('visit key: {}'.format(k))
print('visit value: {}'.format(v))
return k, v
orig = {'a1': {'b1': 1, 'b2': 2}}
new = remap(orig, visit=visit, enter=enter, test='test')
print('ORIG ID: {}'.format(id(orig)))
print('NEW: {}'.format(new))
print('NEW ID: {}'.format(id(new)))
# data_model_core_dict_names = data_model_core_dict_names
)
# data, is_data, page_n_max = raw[0], raw[1], raw[3]
app_log.info("••• is_data : %s ", is_data )
### operations if there is data
if is_data :
count_results = len(data)
app_log.info("••• data[0] : \n %s " , pformat(data[0]) )
### rewrite field names as understable ones --> replace field_oid by field_name
# cf : https://sedimental.org/remap.html
data = remap( data, lambda p, k, v: ( data_model_custom_dict[k][u"field_name"], v) if k in data_model_custom_dict else (k, v))
### include _id
if "_id" not in ignore_fields_list :
for d in data :
d["_id"] = str(d["_id"])
else :
count_results = 0
data = "no data for this query"
### add header to tell user which level auth he/she gets to get
full_json = {
"status" : "ok",
Args:
scan_result: Scan result to be remapped and formatted.
Returns:
Remapped and formatted scan result.
"""
empty_lambda = lambda p, k, v: v != "" and v != [] and v != {}
def snake(path, key, value):
if not isinstance(key, int):
return (inflection.underscore(key), value)
return (key, value)
if self.log_field_case == "snake":
remapped = iterutils.remap(scan_result, empty_lambda)
return iterutils.remap(remapped, visit=snake)
return iterutils.remap(scan_result, empty_lambda)
query = self.sa_session.query(model.Job, *used_ids).filter(and_(*conditions)).options(joinedload("parameters"))
for job in query.all():
# We found a job that is equal in terms of tool_id, user, state and input datasets,
# but to be able to verify that the parameters match we need to modify all instances of
# dataset_ids (HDA, LDDA, HDCA) in the incoming param_dump to point to those used by the
# possibly equivalent job, which may have been run on copies of the original input data.
job_input_ids = {}
if isinstance(job, tuple):
# If there are any input datasets job will be a tuple
job, current_jobs_data_ids = job[0], job[1:]
for src, requested_id, used_id in zip(data_types, requested_ids, current_jobs_data_ids):
if src not in job_input_ids:
job_input_ids[src] = {requested_id: used_id}
else:
job_input_ids[src][requested_id] = used_id
new_param_dump = remap(param_dump, visit=replace_dataset_ids)
# new_param_dump has its dataset ids remapped to those used by the job.
# We now ask if the remapped job parameters match the current job.
job_parameter_conditions = [model.Job.id == job.id]
for k, v in new_param_dump.items():
a = aliased(model.JobParameter)
job_parameter_conditions.append(and_(
a.job_id == job.id,
a.name == k,
a.value == json.dumps(v)
))
query = self.sa_session.query(model.Job).filter(*job_parameter_conditions)
if query.first() is None:
continue
n_parameters = 0
# Verify that equivalent jobs had the same number of job parameters
# We skip chrominfo, dbkey, __workflow_invocation_uuid__ and identifer
Args:
scan_result: Scan result to be remapped and formatted.
Returns:
Remapped and formatted scan result.
"""
empty_lambda = lambda p, k, v: v != "" and v != [] and v != {}
def snake(path, key, value):
if not isinstance(key, int):
return (inflection.underscore(key), value)
return (key, value)
if self.log_field_case == "snake":
remapped = iterutils.remap(scan_result, empty_lambda)
return iterutils.remap(remapped, visit=snake)
return iterutils.remap(scan_result, empty_lambda)