Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_csv_export(self):
exporter = Neo4JCSVExporter(
self.outdir, extra=["source"], edge_types=edge_types()
)
for entity in ENTITIES:
entity = model.get_proxy(entity)
exporter.write(entity, extra=["test"])
fh, writer = exporter.handles[entity.schema]
outfile = fh.name
exporter.finalize()
fh = open(outfile, "r")
csv_reader = csv.reader(fh)
rows = list(csv_reader)
headers = rows[0]
assert ":TYPE" in headers, headers
assert ":START_ID" in headers, headers
assert ":END_ID" in headers, headers
assert "id" in headers, headers
assert "date" in headers, headers
data = rows[1]
assert "OWNERSHIP" in data, data
assert "2003-04-01" in data, data
assert match.id == match.canonical.id, match
assert match.entity_id == "ent", match
assert match.entity_id == match.entity.id, match
assert match._score is None, match
assert match.score is not None, match
sample2 = deepcopy(SAMPLE)
canon = sample2.pop("canonical")
sample2["profile_id"] = canon.get("id")
match = Match(model, sample2)
assert match.id == "can", match
assert match.canonical is None, match
assert match.entity is not None, match
canon["id"] = "banana"
match.canonical = model.get_proxy(canon)
assert match.id == "banana", match
match.entity = model.get_proxy(canon)
assert match.entity_id == "banana", match
assert "banana" in repr(match), repr(match)
def test_rdf_export(self):
fh = open(self.temp, "w+")
entity = model.get_proxy(ENTITY)
exporter = RDFExporter(fh)
exporter.write(entity)
exporter.finalize()
fh.seek(0)
data = fh.readlines()
assert len(data) == 8, len(data)
def test_graph(self):
g = DiGraph()
proxy = model.get_proxy(ENTITY)
node = proxy.node
self.assertEqual(str(node), node.id)
for stmt in proxy.statements:
stmt.to_digraph(g)
self.assertEqual(g.number_of_edges(), 8)
self.assertEqual(g.number_of_nodes(), 9)
self.assertIn(node.id, g.nodes)
prop = model.get_qname('Thing:name')
stmt = Statement(Node(registry.name, 'Bob'), prop, proxy.id,
inverted=True)
stmt.to_digraph(g)
self.assertEqual(g.number_of_edges(), 9)
stmt = Statement(node, prop, 'Blub', weight=0)
stmt.to_digraph(g)
def test_excel_bytesio(self):
entity = model.get_proxy(ENTITY)
exporter = ExcelExporter(self.temp, extra=["source"])
exporter.write(entity, extra=["test"])
buffer = exporter.get_bytesio()
assert len(buffer.getvalue()) > 100
def test_entity_filename(self):
proxy = model.get_proxy({"id": "banana", "schema": "Document",})
file_name = entity_filename(proxy)
assert "banana" == file_name, file_name
proxy = model.get_proxy(
{
"id": "banana",
"schema": "Document",
"properties": {"extension": [".doc"],},
}
)
file_name = entity_filename(proxy)
assert "banana.doc" == file_name, file_name
proxy = model.get_proxy(
{
"id": "banana",
"schema": "Document",
"properties": {"mimeType": ["application/pdf"],},
}
)
if query == none_query():
return
query = {
'query': query,
'size': 100,
'_source': {'includes': ['schema', 'properties', 'collection_id']}
}
matchable = list(proxy.schema.matchable_schemata)
index = entities_read_index(schema=matchable)
result = es.search(index=index, body=query)
results = result.get('hits').get('hits')
for result in results:
result = unpack_result(result)
if result is not None:
other = model.get_proxy(result)
score = compare(model, proxy, other)
if score >= SCORE_CUTOFF:
yield score, result.get('collection_id'), other
def read_entity(stream):
line = stream.readline()
if not line:
return
data = json.loads(line)
if is_mapping(data) and "schema" in data:
return model.get_proxy(data)
return data
def expand_entity(entity):
"""Transform an entity into a set of statements. This can
accept either an entity object or an entity ID."""
if not is_mapping(entity):
entity = get_entity(entity)
if entity is None:
return
proxy = model.get_proxy(entity)
yield from proxy.statements
# TODO: factor out inference
thing = model.get(Entity.THING)
if proxy.schema.is_a(thing):
sameAs = thing.get("sameAs")
for (score, _, other) in xref_item(proxy):
yield Statement(proxy.node, sameAs, other.id,
weight=score, inferred=True)
def to_proxy(self):
proxy = model.get_proxy({
'id': self.id,
'schema': self.schema,
'properties': self.data
})
proxy.add('name', self.name)
proxy.set('indexUpdatedAt', self.updated_at)
return proxy