Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _sanitize_table_name(self, table_name):
try:
pathvalidate.validate_sqlite_table_name(table_name)
return table_name
except pathvalidate.ReservedNameError:
return "{:s}_{:s}".format(table_name, self.format_name)
except pathvalidate.InvalidCharError as e:
raise InvalidTableNameError(e)
try:
return [
self.__get_default_header(col_idx)
for col_idx in range(len(self._tabledata.rows[0]))
]
except IndexError:
raise DataError("header list and data body are empty")
attr_name_list = AttrList.sanitize(
super(SQLiteTableDataSanitizer, self)._normalize_headers()
)
try:
for attr_name in attr_name_list:
validate_sqlite_attr_name(attr_name)
except ReservedNameError:
pass
# duplicated attribute name handling ---
for key, count in Counter(attr_name_list).most_common():
if count <= 1:
continue
if self.__dup_col_handler == "error":
raise ValueError("duplicate column name: {}".format(key))
# rename duplicate headers
rename_target_idx_list = [i for i, attr in enumerate(attr_name_list) if attr == key][1:]
suffix_count = 0
for rename_target_idx in rename_target_idx_list:
while True:
suffix_count += 1
def _validate(self, value):
self._validate_null_string(value)
unicode_var_name = _preprocess(value)
if self._is_reserved_keyword(unicode_var_name):
raise ReservedNameError(
"{:s} is a reserved keyword by python".format(unicode_var_name),
reusable_name=False,
reserved_name=unicode_var_name,
)
match = self._invalid_var_name_re.search(unicode_var_name)
if match is not None:
raise InvalidCharError(
"invalid char found in the variable name: '{}'".format(re.escape(match.group()))
)
match = self._invalid_var_name_head_re.search(unicode_var_name)
if match is not None:
raise InvalidCharError(
"the first character of the variable name is invalid: '{}'".format(
re.escape(match.group())
self._invalid_var_name_head_re.search(replacement_text) is not None,
]
)
if is_require_remove_head:
sanitized_var_name = self._invalid_var_name_head_re.sub("", sanitized_var_name)
else:
match = self._invalid_var_name_head_re.search(sanitized_var_name)
if match is not None:
sanitized_var_name = match.end() * replacement_text + self._invalid_var_name_head_re.sub(
"", sanitized_var_name
)
try:
self._validate(sanitized_var_name)
except ReservedNameError as e:
if e.reusable_name is False:
sanitized_var_name += "_"
except NullNameError:
pass
return sanitized_var_name
def __sanitize_header_list(self):
new_header_list = []
for i, header in enumerate(self.header_list):
try:
pathvalidate.validate_sqlite_attr_name(header)
new_header = header
except pathvalidate.ReservedNameError as e:
rename_count = 0
while True:
new_header = "{:s}_rename{:d}".format(header, rename_count)
if all([
new_header not in self.header_list[i:],
new_header not in new_header_list,
]):
break
rename_count += 1
new_header_list.append(new_header)
self.__header_list = new_header_list
def _validate_header(self, header):
try:
pv.validate_sqlite_attr_name(header)
except (pv.NullNameError, pv.ReservedNameError):
pass
except pv.InvalidCharError as e:
raise InvalidHeaderNameError(e)