Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def split_items(text):
items = []
text = stringify(text)
if text is None:
return items
for raw in text.split(')'):
if ' ' not in raw:
items.append(raw)
continue
cleaned, suffix = raw.split(' ', 1)
suffix = suffix.replace('(', '')
try:
int(suffix)
items.append(cleaned)
except Exception:
items.append(raw)
return items
def iter_value_entities(type_, value):
value = stringify(value)
if type_.group is None or value is None:
return
key = cache.object_key(type(type_), value)
degree_key = cache.object_key(type(type_), value, 'deg1')
degree = cache.get(degree_key)
if degree is not None:
for item in cache.kv.sscan_iter(key):
qname, entity_id = item.decode('utf-8').split('@', 1)
prop = model.get_qname(qname)
yield entity_id, prop
else:
degree = 0
pipe = cache.kv.pipeline()
for entity_id, prop in _iter_value_entities(type_, value):
yield entity_id, prop
item = '@'.join((prop.qname, entity_id))
def fetch():
file_path = os.path.dirname(__file__)
out_path = os.path.join(file_path, '..', 'fingerprints', 'types.json')
types = {}
fh = urlopen(CSV_URL)
fh = io.TextIOWrapper(fh, encoding='utf-8')
for row in csv.DictReader(fh):
name = stringify(row.get('Name'))
abbr = stringify(row.get('Abbreviation'))
if name is None or abbr is None:
continue
if name in types and types[name] != abbr:
print(name, types[name], abbr)
types[name] = abbr
# print abbr, name
elf_path = os.path.join(file_path, 'elf-code-list.csv')
with open(elf_path, 'r') as fh:
for row in csv.DictReader(fh):
pprint(dict(row))
with open(out_path, 'w') as fh:
json.dump({'types': types}, fh)
def items(self):
for (key, value) in self.args.items(multi=True):
if key == 'offset':
continue
value = stringify(value, encoding='utf-8')
if value is not None:
yield key, value
def to_dict(self):
data = self.to_dict_dates()
data.update({
'id': stringify(self.id),
'type': self.type,
'name': self.name,
'label': self.label,
'email': self.email,
'locale': self.locale,
'api_key': self.api_key,
'is_admin': self.is_admin,
'is_muted': self.is_muted,
'has_password': self.has_password,
# 'notified_at': self.notified_at
})
return data
def ingest(self, file_path, entity, **kwargs):
"""Main execution step of an ingestor."""
file_path = ensure_path(file_path)
if file_path.is_file() and not entity.has('fileSize'):
entity.add('fileSize', file_path.stat().st_size)
try:
ingestor_class = self.auction(file_path, entity)
log.info("Ingestor [%r]: %s", entity, ingestor_class.__name__)
self.delegate(ingestor_class, file_path, entity)
entity.set('processingStatus', self.STATUS_SUCCESS)
except ProcessingException as pexc:
entity.set('processingStatus', self.STATUS_FAILURE)
entity.set('processingError', stringify(pexc))
log.error("[%r] Failed to process: %s", entity, pexc)
finally:
self.finalize(entity)
def generate(text, keep_order=False, keep_brackets=False):
text = stringify(text)
if text is None:
return
# this needs to happen before the replacements
text = text.lower()
text = clean_entity_name(text)
if not keep_brackets:
# Remove any text in brackets
# This is meant to handle names of companies which include
# the jurisdiction, like: Turtle Management (Seychelles) Ltd.
text = BRACKETED.sub(WS, text)
# Super hard-core string scrubbing
text = clean_strict(text)
text = replace_types(text)
def to_dict(self):
data = self.to_dict_dates()
data.update({
'id': stringify(self.id),
'query': self.query,
'normalized': self.normalized,
'role_id': stringify(self.role_id),
'notified_at': self.notified_at
})
return data
if text is not None:
log.info('OCR: %s chars cached', len(text))
return stringify(text)
if not hasattr(settings, '_ocr_service'):
if settings.OCR_VISION_API:
settings._ocr_service = GoogleOCRService()
else:
settings._ocr_service = LocalOCRService()
text = settings._ocr_service.extract_text(data, languages=languages)
self.set_cache_value(key, text)
if text is not None:
log.info('OCR: %s chars (from %s bytes)',
len(text), len(data))
return stringify(text)
def to_dict(self):
data = self.to_dict_dates()
data.update({
'id': stringify(self.id),
'score': self.score,
'entity_id': stringify(self.entity_id),
'collection_id': stringify(self.collection_id),
'match_id': stringify(self.match_id),
'match_collection_id': stringify(self.match_collection_id)
})
return data