Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_subgraph_repr(self):
a = Node("Person", name="Alice")
b = Node("Person", name="Bob")
ab = Relationship(a, "TO", b)
ba = Relationship(b, "FROM", a)
s = ab | ba
assert isinstance(s, Subgraph)
r = repr(s)
assert r.startswith("({")
assert r.endswith("})")
nodes, _, relationships = r[2:-2].partition("}, {")
items = [item.strip() for item in nodes.split(",")]
assert len(items) == 2
for i, item in enumerate(items):
assert re.match(r"\(:Person \{name: '(Alice|Bob)'\}\)", item)
items = [item.strip() for item in relationships.split(",")]
assert len(items) == 2
for _ in items:
assert re.match(r'\(.*\)-\[:(TO|FROM) \{\}\]->\(.*\)', repr(ab))
def test_can_create_nodes_and_relationship_3(self):
self.graph.delete_all()
with self.graph.begin() as tx:
a = Node("Person", name="Alice")
b = Node("Person", name="Bob")
r = Relationship(a, "KNOWS", b, since=1999)
tx.create(a)
tx.create(b)
tx.create(r)
self.assertEqual(a.graph, self.graph)
self.assertIsNotNone(a.identity)
self.assertEqual(b.graph, self.graph)
self.assertIsNotNone(b.identity)
self.assertEqual(r.graph, self.graph)
self.assertIsNotNone(r.identity)
assert r.start_node == a
assert r.end_node == b
assert order(self.graph) == 2
assert size(self.graph) == 1
def test_can_create_nodes_and_relationship_1(self):
self.graph.delete_all()
with self.graph.begin() as tx:
a = Node("Person", name="Alice")
b = Node("Person", name="Bob")
tx.create(a)
tx.create(b)
tx.process()
r = Relationship(a, "KNOWS", b, since=1999)
tx.create(r)
self.assertEqual(a.graph, self.graph)
self.assertIsNotNone(a.identity)
self.assertEqual(b.graph, self.graph)
self.assertIsNotNone(b.identity)
self.assertEqual(r.graph, self.graph)
self.assertIsNotNone(r.identity)
assert r.start_node == a
assert r.end_node == b
assert order(self.graph) == 2
assert size(self.graph) == 1
def test_can_merge_relationship_that_does_not_exist(self):
alice = Node("Person", name="Alice")
bob = Node("Person", name="Bob")
ab = Relationship(alice, "KNOWS", bob)
old_order = graph_order(self.graph)
old_size = graph_size(self.graph)
self.graph.merge(ab)
self.assertEqual(alice.graph, self.graph)
self.assertIsNotNone(alice.identity)
self.assertEqual(bob.graph, self.graph)
self.assertIsNotNone(bob.identity)
self.assertEqual(ab.graph, self.graph)
self.assertIsNotNone(ab.identity)
assert self.graph.exists(alice | bob | ab)
new_order = graph_order(self.graph)
new_size = graph_size(self.graph)
assert new_order == old_order + 2
assert new_size == old_size + 1
def test_can_cast_from_tuple_of_entities(self):
a = Node()
b = Node()
r = Relationship(a, "TO", b)
casted = Relationship.cast((a, r, b))
self.assertIsInstance(casted, Relationship)
self.assertIsNone(casted.graph)
self.assertIsNone(casted.identity)
assert casted.start_node == a
assert casted.type == "TO"
assert casted.end_node == b
def test_walkable_repr(self):
a = Node("Person", name="Alice")
b = Node("Person", name="Bob")
c = Node("Person", name="Carol")
d = Node("Person", name="Dave")
ab = Relationship(a, "LOVES", b)
cb = Relationship(c, "HATES", b)
cd = Relationship(c, "KNOWS", d)
t = Walkable([a, ab, b, cb, c, cd, d])
r = repr(t)
expected = "(Alice)-[:LOVES {}]->(Bob)<-[:HATES {}]-(Carol)-[:KNOWS {}]->(Dave)"
assert r == expected
def test_can_reverse_iterate_path_relationships(self):
# given
path = Path(self.alice, "LOVES", self.bob, Relationship(self.carol, "HATES", self.bob),
self.carol, "KNOWS", self.dave)
# when
rels = list(reversed(path))
# then
assert rels == [
Relationship(self.carol, "KNOWS", self.dave),
Relationship(self.carol, "HATES", self.bob),
Relationship(self.alice, "LOVES", self.bob),
]
def create(self, abstract):
""" Create a node or relationship based on the abstract entity
provided.
:param abstract: node or relationship
:return: :class:`.Job`
"""
entity = cast(abstract)
if isinstance(entity, Node):
return self.append(CreateNodeJob(**entity))
elif isinstance(entity, Relationship):
start_node = self.resolve(entity.start_node())
end_node = self.resolve(entity.end_node())
return self.append(CreateRelationshipJob(start_node, entity.type(), end_node, **entity))
else:
raise TypeError(entity)
def __init__(self, graph):
self.graph = graph
self._indexes = {Node: {}, Relationship: {}}
def _create_in_index(self, cls, index, key, value, abstract, query=None):
uri = self._uri_for(self._index(cls, index), query=query)
if cls is Node:
a = cast_node(abstract)
return self.append_post(uri, {
"key": key,
"value": value,
"properties": dict(a),
})
elif cls is Relationship:
r = cast_relationship(abstract)
return self.append_post(uri, {
"key": key,
"value": value,
"start": self._uri_for(abstract.start_node()),
"type": str(abstract.type()),
"end": self._uri_for(abstract.end_node()),
"properties": dict(r),
})
else:
raise TypeError(cls)