Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def validate(self, text, **kwargs):
text = sanitize_text(text)
if text is None:
return False
return self.ID_RE.match(text) is not None
def clean(self, obj, **kwargs):
if not isinstance(obj, str):
obj = self.pack(obj)
else:
obj = sanitize_text(obj)
return obj
def __init__(self, model, data, key_prefix=None, cleaned=True):
data = dict(data)
properties = data.pop("properties", {})
if not cleaned:
properties = ensure_dict(properties)
self.schema = model.get(data.pop("schema", None))
if self.schema is None:
raise InvalidData(gettext("No schema for entity."))
self.key_prefix = key_prefix
self.id = data.pop("id", None)
if not cleaned:
self.id = sanitize_text(self.id)
self.context = data
self._properties = {}
self._size = 0
for key, value in properties.items():
if key not in self.schema.properties:
continue
if not cleaned:
self.add(key, value, cleaned=cleaned, quiet=True)
else:
self._properties[key] = set(value)
def make_sheet(self, title, headers):
sheet = self.workbook.create_sheet(title=title)
sheet.freeze_panes = "A2"
sheet.sheet_properties.filterMode = True
cells = []
for header in headers:
header = sanitize_text(header)
cell = WriteOnlyCell(sheet, value=header)
cell.font = self.HEADER_FONT
cell.fill = self.HEADER_FILL
cells.append(cell)
sheet.append(cells)
return sheet
def validate(self, obj, **kwargs):
"""Check if a thing is a valid date."""
obj = sanitize_text(obj)
if obj is None:
return False
return self.DATE_RE.match(obj) is not None
def clean(self, text: Any, **kwargs):
"""Create a more clean, but still user-facing version of an
instance of the type."""
text = sanitize_text(text)
if text is not None:
return self.clean_text(text, **kwargs)
def __init__(self, query, data, prop):
self.query = query
data = deepcopy(data)
self.data = data
self.prop = prop
self.name = prop.name
self.type = prop.type
self.refs = keys_values(data, "column", "columns")
self.literals = keys_values(data, "literal", "literals")
self.join = data.pop("join", None)
self.split = data.pop("split", None)
self.entity = data.pop("entity", None)
self.required = data.pop("required", False)
self.template = sanitize_text(data.pop("template", None))
self.replacements = {}
if self.template is not None:
# this is hacky, trying to generate refs from template
for ref in self.FORMAT_PATTERN.findall(self.template):
self.refs.append(ref)
self.replacements["{{%s}}" % ref] = ref
def emit_row_dicts(self, table, rows, headers=None):
csv_path = self.make_work_file(table.id)
row_count = 0
with open(csv_path, 'w', encoding='utf-8') as fp:
csv_writer = csv.writer(fp, dialect='unix')
for row in rows:
if headers is None:
headers = list(row.keys())
values = [sanitize_text(row.get(h)) for h in headers]
csv_writer.writerow(values)
self.manager.emit_text_fragment(table, values, row_count)
row_count += 1
if row_count > 0:
csv_hash = self.manager.store(csv_path, mime_type=CSV)
table.set('csvHash', csv_hash)
table.set('rowCount', row_count + 1)
table.set('columns', registry.json.pack(headers))
def ingest(self, file_path, entity):
entity.schema = model.get('PlainText')
text = self.read_file_decoded(entity, file_path)
text = sanitize_text(text)
entity.set('bodyText', text)
try:
for card in vobject.readComponents(text):
self.ingest_card(entity, card)
except (ParseError, UnicodeDecodeError) as err:
raise ProcessingException('Cannot parse vcard: %s' % err) from err