Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_country_codes(self):
countries = registry.country
self.assertEqual(countries.clean("DE"), "de")
self.assertTrue(countries.validate("DE"))
self.assertFalse(countries.validate("DEU"))
self.assertFalse(countries.validate(""))
self.assertFalse(countries.validate(None))
self.assertFalse(countries.validate(4))
self.assertFalse(countries.validate("SU"))
self.assertTrue(countries.validate("XK"))
self.assertTrue(countries.validate("EU"))
self.assertEqual(countries.country_hint("eu"), "eu")
assert "iso-3166-1:eu" in countries.rdf("eu")
def test_country_names(self):
countries = registry.country
self.assertEqual(countries.clean(None), None)
self.assertEqual(countries.clean("Takatukaland", guess=False), None)
self.assertEqual(countries.clean("Germany"), "de")
# self.assertEqual(countries.clean('Germani'), 'de')
self.assertEqual(countries.clean("Soviet Union"), "suhh")
party.add('ogrnCode', number)
continue
schema, attr = REGISTRATIONS.get(type_)
party.schema = model.common_schema(party.schema, schema)
if len(attr):
party.add(attr, number)
for feature in profile.findall(qpath('Feature')):
feature_type = deref(doc, 'FeatureType', feature.get('FeatureTypeID'))
attr, schema = FEATURES.get(feature_type)
party.schema = model.common_schema(party.schema, schema)
if len(attr):
value = parse_feature(doc, feature)
if isinstance(value, tuple):
value, country_code = value
if party.schema.get(attr).type == registry.country:
value = country_code
else:
party.add('country', country_code)
party.add(attr, value, quiet=True)
emitter.emit(party)
emitter.log.info("[%s] %s", party.schema.name, party.caption)
def check_country_code(value):
value = registry.country.clean(value)
if not registry.country.validate(value):
msg = gettext('Invalid country code: %s')
raise ValueError(msg % value)
return True
def update(self, data):
props = ('title', 'summary', 'author', 'crawler', 'source_url',
'file_name', 'mime_type', 'headers', 'date', 'authored_at',
'modified_at', 'published_at', 'retrieved_at', 'languages',
'countries', 'keywords')
data['countries'] = ensure_list(data.get('countries', []))
data['countries'] = [registry.country.clean(val) for val in data['countries']] # noqa
data['languages'] = ensure_list(data.get('languages', []))
data['languages'] = [registry.language.clean(val) for val in data['languages']] # noqa
for prop in props:
text = data.get(prop, self.meta.get(prop))
if isinstance(text, list):
self.meta[prop] = [sanitize_text(txt) for txt in text]
else:
self.meta[prop] = sanitize_text(text)
flag_modified(self, 'meta')
# on as many of these matches as we can, then build a regression
# model which properly weights the value of a matching property
# based upon it's type.
FP_WEIGHT = 0.6
MATCH_WEIGHTS = {
registry.text: 0,
registry.name: 0, # because we already compare fingerprints
registry.identifier: 0.4,
registry.url: 0.1,
registry.email: 0.3,
registry.ip: 0.1,
registry.iban: 0.3,
registry.address: 0.2,
registry.date: 0.3,
registry.phone: 0.1,
registry.country: 0.1,
registry.language: 0.1,
}
def compare(left, right):
"""Compare two entities and return number between 0 and 1.
Returned number indicates probability that two entities are the same.
"""
left_schema = model.get(left.get('schema'))
right_schema = model.get(right.get('schema'))
if right_schema not in list(left_schema.matchable_schemata):
return 0
schema = model.precise_schema(left_schema, right_schema)
score = compare_fingerprints(left, right)
left_properties = left.get('properties', {})
right_properties = right.get('properties', {})
def check_country_code(value):
value = registry.country.clean(value)
if not registry.country.validate(value):
msg = gettext('Invalid country code: %s')
raise ValueError(msg % value)
return True
proxy = self.model.make_entity(self.schema)
proxy.id = self.compute_key(record)
if proxy.id is None:
return
# THIS IS HACKY
# Some of the converters, e.g. for phone numbers, work better if they
# know the country which the number is from. In order to provide that
# detail, we are first running country fields, then making the data
# from that accessible to phone and address parsers.
for prop in self.properties:
if prop.prop.type == registry.country:
prop.map(proxy, record, entities)
for prop in self.properties:
if prop.prop.type != registry.country:
prop.map(proxy, record, entities)
for prop in self.properties:
if prop.required and not proxy.has(prop.prop):
# This is a bit weird, it flags fields to be required in
# the mapping, not in the model. Basically it means: if
# this row of source data doesn't have that field, then do
# not map it again.
return
return proxy
def countries(self):
countries = self.meta.get('countries')
return registry.country.normalize_set(countries)
def update(self, data, authz):
self.label = data.get('label', self.label)
self.summary = data.get('summary', self.summary)
self.summary = data.get('summary', self.summary)
self.publisher = data.get('publisher', self.publisher)
self.publisher_url = data.get('publisher_url', self.publisher_url)
if self.publisher_url is not None:
self.publisher_url = stringify(self.publisher_url)
self.info_url = data.get('info_url', self.info_url)
if self.info_url is not None:
self.info_url = stringify(self.info_url)
self.data_url = data.get('data_url', self.data_url)
if self.data_url is not None:
self.data_url = stringify(self.data_url)
self.countries = ensure_list(data.get('countries', self.countries))
self.countries = [registry.country.clean(val) for val in self.countries] # noqa
self.languages = ensure_list(data.get('languages', self.languages))
self.languages = [registry.language.clean(val) for val in self.languages] # noqa
# Some fields are editable only by admins in order to have
# a strict separation between source evidence and case
# material.
if authz.is_admin:
self.category = data.get('category', self.category)
self.casefile = as_bool(data.get('casefile'),
default=self.casefile)
creator = ensure_dict(data.get('creator'))
creator_id = data.get('creator_id', creator.get('id'))
creator = Role.by_id(creator_id)
if creator is not None:
self.creator = creator