Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_model_common_schema(self):
assert model.common_schema("Thing", "Thing") == "Thing"
assert model.common_schema("Thing", "Person") == "Person"
assert model.common_schema("Person", "Thing") == "Person"
assert model.common_schema("LegalEntity", "Company") == "Company"
assert model.common_schema("Interval", "Ownership") == "Ownership"
# assert model.common_schema("LegalEntity", "Asset") == "Company"
with assert_raises(InvalidData):
model.common_schema("Person", "Directorship")
with assert_raises(InvalidData):
model.common_schema("Person", "Company")
with assert_raises(InvalidData):
model.common_schema("Membership", "Thing")
def test_has(self):
proxy = EntityProxy.from_dict(model, ENTITY)
assert not proxy.has("birthPlace")
proxy.add("birthPlace", "Inferno")
assert proxy.has("birthPlace")
assert 1 == len(proxy.get("birthPlace"))
proxy.add("birthPlace", "Hell")
assert 2 == len(proxy.get("birthPlace"))
proxy.set("birthPlace", "Inferno")
assert 1 == len(proxy.get("birthPlace"))
with assert_raises(InvalidData):
proxy.set("banana", "fruit")
assert not proxy.set("banana", "fruit", quiet=True)
with assert_raises(InvalidData):
proxy.has("banana")
assert not proxy.has("banana", quiet=True)
# Don't allow setting the reverse properties:
if prop.stub:
if quiet:
return
msg = gettext("Stub property (%s): %s")
raise InvalidData(msg % (self.schema, prop))
for value in value_list(values):
if not cleaned:
value = prop.type.clean(value, proxy=self, fuzzy=fuzzy)
if value is None:
continue
if prop.type == registry.entity and value == self.id:
msg = gettext("Self-relationship (%s): %s")
raise InvalidData(msg % (self.schema, prop))
# Somewhat hacky: limit the maximum size of any particular
# field to avoid overloading upstream aleph/elasticsearch.
value_size = len(value)
if prop.type.max_size is not None:
if self._size + value_size > prop.type.max_size:
# msg = "[%s] too large. Rejecting additional values."
# log.warning(msg, prop.name)
continue
self._size += value_size
if prop_name not in self._properties:
self._properties[prop_name] = set()
self._properties[prop_name].add(value)
"""Validate a dataset against the given schema.
This will also drop keys which are not present as properties.
"""
errors = {}
properties = ensure_dict(data.get("properties"))
for name, prop in self.properties.items():
values = ensure_list(properties.get(name))
error = prop.validate(values)
if error is None and not len(values):
if prop.name in self.required:
error = gettext("Required")
if error is not None:
errors[name] = error
if len(errors):
msg = gettext("Entity validation failed")
raise InvalidData(msg, errors={"properties": errors})
def schema_index(schema, version):
"""Convert a schema object to an index name."""
if schema.abstract:
raise InvalidData("Cannot index abstract schema: %s" % schema)
name = 'entity-%s' % schema.name.lower()
return index_name(name, version=version)
"""Compare two entities and return a match score."""
left = model.get_proxy(left)
right = model.get_proxy(right)
if right.schema not in list(left.schema.matchable_schemata):
return 0
schema = model.common_schema(left.schema, right.schema)
score = compare_names(left, right) * NAMES_WEIGHT
score += compare_countries(left, right) * COUNTRIES_WEIGHT
for name, prop in schema.properties.items():
weight = MATCH_WEIGHTS.get(prop.type, 0)
if weight == 0 or not prop.matchable:
continue
try:
left_values = left.get(name)
right_values = right.get(name)
except InvalidData:
continue
if not len(left_values) or not len(right_values):
continue
prop_score = prop.type.compare_sets(left_values, right_values)
score += prop_score * weight
return score
def add(self, prop, values, cleaned=False, quiet=False, fuzzy=False):
"""Add the given value(s) to the property if they are not empty."""
prop_name = self._prop_name(prop, quiet=quiet)
if prop_name is None:
return
prop = self.schema.properties[prop_name]
# Don't allow setting the reverse properties:
if prop.stub:
if quiet:
return
msg = gettext("Stub property (%s): %s")
raise InvalidData(msg % (self.schema, prop))
for value in value_list(values):
if not cleaned:
value = prop.type.clean(value, proxy=self, fuzzy=fuzzy)
if value is None:
continue
if prop.type == registry.entity and value == self.id:
msg = gettext("Self-relationship (%s): %s")
raise InvalidData(msg % (self.schema, prop))
# Somewhat hacky: limit the maximum size of any particular
# field to avoid overloading upstream aleph/elasticsearch.
value_size = len(value)
if prop.type.max_size is not None:
if self._size + value_size > prop.type.max_size:
# msg = "[%s] too large. Rejecting additional values."
def _normalize_data(data):
"""Turn entities in properties into entity ids"""
entities = data['layout']['entities']
for obj in entities:
schema = model.get(obj.get('schema'))
if schema is None:
raise InvalidData("Invalid schema %s" % obj.get('schema'))
properties = obj.get('properties', {})
for prop in schema.properties.values():
if prop.type == registry.entity:
values = ensure_list(properties.get(prop.name))
if values:
properties[prop.name] = []
for value in values:
entity_id = get_entity_id(value)
properties[prop.name].append(entity_id)
return data