Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def parse(cls, entity_id):
"""Split up an entity ID into the plain ID and the namespace
signature. If either part is missing, return None instead."""
entity_id = registry.entity.clean(entity_id)
if entity_id is None:
return (None, None)
try:
plain_id, checksum = entity_id.rsplit(cls.SEP, 1)
return (plain_id, checksum)
except ValueError:
return (entity_id, None)
return
prop = self.schema.properties[prop_name]
# Don't allow setting the reverse properties:
if prop.stub:
if quiet:
return
msg = gettext("Stub property (%s): %s")
raise InvalidData(msg % (self.schema, prop))
for value in value_list(values):
if not cleaned:
value = prop.type.clean(value, proxy=self, fuzzy=fuzzy)
if value is None:
continue
if prop.type == registry.entity and value == self.id:
msg = gettext("Self-relationship (%s): %s")
raise InvalidData(msg % (self.schema, prop))
# Somewhat hacky: limit the maximum size of any particular
# field to avoid overloading upstream aleph/elasticsearch.
value_size = len(value)
if prop.type.max_size is not None:
if self._size + value_size > prop.type.max_size:
# msg = "[%s] too large. Rejecting additional values."
# log.warning(msg, prop.name)
continue
self._size += value_size
if prop_name not in self._properties:
self._properties[prop_name] = set()
self._properties[prop_name].add(value)
def _normalize_data(data):
"""Turn entities in properties into entity ids"""
entities = data['layout']['entities']
for obj in entities:
schema = model.get(obj.get('schema'))
if schema is None:
raise InvalidData("Invalid schema %s" % obj.get('schema'))
properties = obj.get('properties', {})
for prop in schema.properties.values():
if prop.type == registry.entity:
values = ensure_list(properties.get(prop.name))
if values:
properties[prop.name] = []
for value in values:
entity_id = get_entity_id(value)
properties[prop.name].append(entity_id)
return data
from typing import Dict, Tuple, List, TextIO, Optional
from pymisp import MISPObject
from followthemoney.graph import Node, Edge
from followthemoney.types import registry
from followthemoney.proxy import EntityProxy
from followthemoney.export.graph import GraphExporter
DEFAULT_EDGE_TYPES: Tuple[str] = (registry.entity.name,)
class MISPExporter(GraphExporter):
def __init__(
self, fh: Optional[TextIO] = None, edge_types: Tuple[str] = DEFAULT_EDGE_TYPES
):
super(MISPExporter, self).__init__(edge_types=edge_types)
self._nodes_mapping: Dict[str, MISPObject] = {}
self._references_to_add: List[Tuple[MISPObject, str]] = []
self.misp_objects: List[MISPObject] = []
self.fh: Optional[TextIO] = fh
def add_entity(self, proxy: EntityProxy, **kwargs):
self.graph.add(proxy)
def _add_reverse(self, data, other):
name = data.get("name", None)
if name is None:
raise InvalidModel("Unnamed reverse: %s" % other)
prop = self.get(name)
if prop is None:
data.update(
{
"type": registry.entity.name,
"reverse": {"name": other.name},
"range": other.schema.name,
"stub": True,
}
)
data["hidden"] = data.get("hidden", other.hidden)
prop = Property(self, name, data)
prop.generate()
self.properties[name] = prop
return prop
def is_entity(self):
return self.type == registry.entity
def _to_edge(proxy, edge_types):
attributes = _to_attributes(proxy, edge_types)
source_prop = proxy.schema.get(proxy.schema.edge_source)
target_prop = proxy.schema.get(proxy.schema.edge_target)
for (source, target) in proxy.edgepairs():
source = registry.entity.rdf(source).n3()
yield _make_node({'id': source}, source_prop.range.name)
target = registry.entity.rdf(target).n3()
yield _make_node({'id': target}, target_prop.range.name)
yield _make_edge(source, target, attributes, proxy.schema.name)
def entity_references(entity, authz):
"""Given a particular entity, find all the references to it from other
entities, grouped by the property where they are used."""
schema = model.get(entity.get('schema'))
group = registry.entity.group
facets = []
for prop in model.properties:
if prop.type != registry.entity:
continue
if not schema.is_a(prop.range):
continue
index = entities_read_index(prop.schema)
field = 'properties.%s' % prop.name
value = entity.get('id')
facets.append((index, prop.qname, group, field, value))
res = _filters_faceted_query(authz, facets)
for (qname, total) in res.items():
if total > 0:
yield (model.get_qname(qname), total)
def execute_mapping(query):
for entity in model.map_entities(query):
schema = model.get(entity['schema'])
properties = entity.pop('properties')
entity_ref = registry.entity.ref(entity)
for name, values in properties.items():
prop = schema.get(name)
for value in values:
if prop.type.prefix:
stmt = Statement(entity_ref, prop, value)
print(stmt)
print(stmt.invert())
def entity_references(entity, authz):
"""Given a particular entity, find all the references to it from other
entities, grouped by the property where they are used."""
schema = model.get(entity.get('schema'))
group = registry.entity.group
facets = []
for prop in model.properties:
if prop.type != registry.entity:
continue
if not schema.is_a(prop.range):
continue
index = entities_read_index(prop.schema)
field = 'properties.%s' % prop.name
value = entity.get('id')
facets.append((index, prop.qname, group, field, value))
res = _filters_faceted_query(authz, facets)
for (qname, total) in res.items():
if total > 0:
yield (model.get_qname(qname), total)