Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
continue
current_path = unflattened
path_list = [item.rstrip("[]") for item in str(path).split("/")]
for num, path_item in enumerate(path_list):
if isint(path_item):
if num == 0:
warn(
'Column "{}" has been ignored because it is a number.'.format(
path
),
DataErrorWarning,
)
continue
current_type = None
path_till_now = "/".join(
[item for item in path_list[: num + 1] if not isint(item)]
)
if parser:
current_type = parser.flattened.get(path_till_now)
try:
next_path_item = path_list[num + 1]
except IndexError:
next_path_item = ""
# Quick solution to avoid casting of date as datetinme in spreadsheet > xml
if xml:
if type(cell.cell_value) == datetime.datetime and not next_path_item:
if "datetime" not in path:
current_type = "date"
## Array
list_index = -1
def unflatten_main_with_parser(parser, line, timezone, xml, id_name):
unflattened = OrderedDict()
for path, cell in line.items():
# Skip blank cells
if cell.cell_value is None or cell.cell_value == "":
continue
current_path = unflattened
path_list = [item.rstrip("[]") for item in str(path).split("/")]
for num, path_item in enumerate(path_list):
if isint(path_item):
if num == 0:
warn(
'Column "{}" has been ignored because it is a number.'.format(
path
),
DataErrorWarning,
)
continue
current_type = None
path_till_now = "/".join(
[item for item in path_list[: num + 1] if not isint(item)]
)
if parser:
current_type = parser.flattened.get(path_till_now)
try:
next_path_item = path_list[num + 1]
def dict_to_xml(data, tagname, toplevel=True, nsmap=None):
if USING_LXML and ":" in tagname and not toplevel:
tagname = (
"{"
+ nsmap.get(tagname.split(":", 1)[0], "")
+ "}"
+ tagname.split(":", 1)[1]
)
try:
if USING_LXML:
el = ET.Element(tagname, nsmap=nsmap)
else:
el = ET.Element(tagname)
except ValueError as e:
warn(str(e), DataErrorWarning)
return
if USING_LXML:
data = sort_attributes(data)
for k, v in data.items():
if type(v) == list:
for item in v:
child_to_xml(el, k, item, nsmap=nsmap)
else:
child_to_xml(el, k, v, toplevel=toplevel, nsmap=nsmap)
return el
elif type_string in ("array", "array_array", "string_array", "number_array"):
value = str(value)
if type_string == "number_array":
try:
if "," in value:
return [
[Decimal(y) for y in x.split(",")] for x in value.split(";")
]
else:
return [Decimal(x) for x in value.split(";")]
except (TypeError, ValueError, InvalidOperation):
warn(
'Non-numeric value "{}" found in number array column, returning as string array instead.'.format(
value
),
DataErrorWarning,
)
if "," in value:
return [x.split(",") for x in value.split(";")]
else:
return value.split(";")
elif type_string == "string":
if type(value) == datetime.datetime:
return timezone.localize(value).isoformat()
return str(value)
elif type_string == "date":
if type(value) == datetime.datetime:
return value.date().isoformat()
return str(value)
elif type_string == "":
if type(value) == datetime.datetime:
return timezone.localize(value).isoformat()
if json_filename is not None and root_json_dict is not None:
raise ValueError(
"Only one of json_file or root_json_dict should be supplied"
)
if json_filename:
with codecs.open(json_filename, encoding="utf-8") as json_file:
try:
self.root_json_dict = json.load(
json_file, object_pairs_hook=OrderedDict, parse_float=Decimal
)
except UnicodeError as err:
raise BadlyFormedJSONErrorUTF8(*err.args)
except ValueError as err:
raise BadlyFormedJSONError(*err.args)
else:
self.root_json_dict = root_json_dict
if preserve_fields:
# Extract fields to be preserved from input file (one path per line)
preserve_fields_all = []
preserve_fields_input = []
with open(preserve_fields) as preserve_fields_file:
for line in preserve_fields_file:
line = line.strip()
path_fields = line.rsplit("/", 1)
preserve_fields_all = (
preserve_fields_all + path_fields + [line.rstrip("/")]
)
preserve_fields_input = preserve_fields_input + [line.rstrip("/")]
base = None
elif base_json:
with open(base_json) as fp:
base = json.load(fp, object_pairs_hook=OrderedDict)
else:
base = OrderedDict()
base_configuration = parse_sheet_configuration(
[item.strip() for item in default_configuration.split(",")]
)
cell_source_map_data = OrderedDict()
heading_source_map_data = OrderedDict()
if metatab_name and not root_is_list:
spreadsheet_input_class = INPUT_FORMATS[input_format]
spreadsheet_input = spreadsheet_input_class(
input_name=input_name,
timezone_name=timezone_name,
root_list_path="meta",
include_sheets=[metatab_name],
convert_titles=convert_titles,
vertical_orientation=metatab_vertical_orientation,
id_name=id_name,
xml=xml,
use_configuration=False,
)
if metatab_schema:
parser = SchemaParser(
schema_filename=metatab_schema, disable_local_refs=disable_local_refs
)
parser.parse()
if input_format is None:
raise Exception("You must specify an input format (may autodetect in future")
elif input_format not in INPUT_FORMATS:
raise Exception("The requested format is not available")
if metatab_name and base_json:
raise Exception("Not allowed to use base_json with metatab")
if root_is_list:
base = None
elif base_json:
with open(base_json) as fp:
base = json.load(fp, object_pairs_hook=OrderedDict)
else:
base = OrderedDict()
base_configuration = parse_sheet_configuration(
[item.strip() for item in default_configuration.split(",")]
)
cell_source_map_data = OrderedDict()
heading_source_map_data = OrderedDict()
if metatab_name and not root_is_list:
spreadsheet_input_class = INPUT_FORMATS[input_format]
spreadsheet_input = spreadsheet_input_class(
input_name=input_name,
timezone_name=timezone_name,
root_list_path="meta",
include_sheets=[metatab_name],
convert_titles=convert_titles,
vertical_orientation=metatab_vertical_orientation,
id_name=id_name,
# This is misleading as it specifies the row number as the distance vertically
# and the horizontal 'letter' as a number.
# https://github.com/OpenDataServices/flatten-tool/issues/153
cells[header] = Cell(
line[header], (sheet_name, str(k + 1), j + 2, heading)
)
else:
cells[header] = Cell(
line[header],
(sheet_name, _get_column_letter(k + 1), j + 2, heading),
)
unflattened = unflatten_main_with_parser(
self.parser, cells, self.timezone, self.xml, self.id_name
)
if root_id_or_none not in main_sheet_by_ocid:
main_sheet_by_ocid[root_id_or_none] = TemporaryDict(
self.id_name, xml=self.xml
)
def inthere(unflattened, id_name):
if self.xml:
return unflattened[id_name]["text()"].cell_value
else:
return unflattened[id_name].cell_value
if (
self.id_name in unflattened
and inthere(unflattened, self.id_name)
in main_sheet_by_ocid[root_id_or_none]
):
if self.xml:
unflattened_id = unflattened.get(self.id_name)[
def list_as_dicts_to_temporary_dicts(unflattened, id_name, xml):
for key, value in list(unflattened.items()):
if isinstance(value, Cell):
continue
if hasattr(value, "items"):
if not value:
unflattened.pop(key)
list_as_dicts_to_temporary_dicts(value, id_name, xml)
if isinstance(value, ListAsDict):
temporarydict = TemporaryDict(id_name, xml=xml)
for index in sorted(value.keys()):
temporarydict.append(value[index])
unflattened[key] = temporarydict
return unflattened
def path_search(
nested_dict, path_list, id_fields=None, path=None, top=False, top_sheet=False
):
if not path_list:
return nested_dict
id_fields = id_fields or {}
parent_field = path_list[0]
path = parent_field if path is None else path + "/" + parent_field
if parent_field.endswith("[]") or top:
if parent_field.endswith("[]"):
parent_field = parent_field[:-2]
if parent_field not in nested_dict:
nested_dict[parent_field] = TemporaryDict(
keyfield=id_name, top_sheet=top_sheet, xml=xml # noqa
)
sub_sheet_id = id_fields.get(path + "/id")
if sub_sheet_id not in nested_dict[parent_field]:
nested_dict[parent_field][sub_sheet_id] = {}
return path_search(
nested_dict[parent_field][sub_sheet_id],
path_list[1:],
id_fields=id_fields,
path=path,
top_sheet=top_sheet,
)
else:
if parent_field not in nested_dict:
nested_dict[parent_field] = OrderedDict()
return path_search(