Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def find(self, label, property_key=None, property_value=None, limit=None):
""" Iterate through a set of labelled nodes, optionally filtering
by property key and value
"""
if not label:
raise ValueError("Empty label")
from py2neo.cypher import cypher_escape
if property_key is None:
statement = "MATCH (n:%s) RETURN n,labels(n)" % cypher_escape(label)
parameters = {}
else:
statement = "MATCH (n:%s {%s:{V}}) RETURN n,labels(n)" % (
cypher_escape(label), cypher_escape(property_key))
parameters = {"V": property_value}
if limit:
statement += " LIMIT %s" % limit
cursor = self.cypher.run(statement, parameters)
while cursor.move():
a = cursor[0]
a.labels().update(cursor[1])
yield a
cursor.close()
def _merge_nodes(tx, p_label, p_key, labels, data):
"""
:param tx:
:param p_label:
:param p_key:
:param labels:
:param data: list of (p_value, properties)
:return:
"""
assert isinstance(labels, frozenset)
label_string = ":".join(cypher_escape(label) for label in sorted(labels))
cypher = "UNWIND $x AS data MERGE (_:%s {%s:data[0]}) SET _:%s SET _ = data[1] RETURN id(_)" % (
cypher_escape(p_label), cypher_escape(p_key), label_string)
for record in tx.run(cypher, x=data):
yield record[0]
def merge(self, label, property_key=None, property_value=None, limit=None):
""" Match or create a node by label and optional property and return
all matching nodes.
"""
if not label:
raise ValueError("Empty label")
from py2neo.cypher import cypher_escape
if property_key is None:
statement = "MERGE (n:%s) RETURN n" % cypher_escape(label)
parameters = {}
elif not isinstance(property_key, string):
raise TypeError("Property key must be textual")
elif property_value is None:
raise ValueError("Both key and value must be specified for a property")
else:
statement = "MERGE (n:%s {%s:{V}}) RETURN n" % (
cypher_escape(label), cypher_escape(property_key))
parameters = {"V": coerce_property(property_value)}
if limit:
statement += " LIMIT %s" % limit
cursor = self.cypher.run(statement, parameters)
for record in cursor.collect():
yield record[0]
def drop_uniqueness_constraint(self, label, property_key):
""" Remove the node uniqueness constraint for a given label and
property key.
"""
cypher = "DROP CONSTRAINT ON (_:{}) ASSERT _.{} IS UNIQUE".format(
cypher_escape(label), cypher_escape(property_key))
self.graph.run(cypher).close()
def _create_nodes(tx, labels, data):
assert isinstance(labels, frozenset)
label_string = "".join(":" + cypher_escape(label) for label in sorted(labels))
cypher = "UNWIND $x AS data CREATE (_%s) SET _ = data RETURN id(_)" % label_string
for record in tx.run(cypher, x=data):
yield record[0]
def _merge_relationships(tx, r_type, data):
"""
:param tx:
:param r_type:
:param data: list of (a_id, b_id, properties)
:return:
"""
cypher = ("UNWIND $x AS data "
"MATCH (a) WHERE id(a) = data[0] "
"MATCH (b) WHERE id(b) = data[1] "
"MERGE (a)-[_:%s]->(b) SET _ = data[2] RETURN id(_)" % cypher_escape(r_type))
for record in tx.run(cypher, x=data):
yield record[0]
def _query_and_parameters(self, count=False):
""" A tuple of the Cypher query and parameters used to select
the nodes that match the criteria for this selection.
:return: Cypher query string
"""
clauses = ["MATCH (_%s)" % "".join(":%s" % cypher_escape(label) for label in self._labels)]
parameters = {}
if self._conditions:
conditions = []
for condition in self._conditions:
if isinstance(condition, tuple):
condition, param = condition
parameters.update(param)
conditions.append(condition)
clauses.append("WHERE %s" % " AND ".join(conditions))
if count:
clauses.append("RETURN count(_)")
else:
clauses.append("RETURN _")
if self._order_by:
clauses.append("ORDER BY %s" % (", ".join(self._order_by)))
if self._skip:
raise ValueError("Node %r does not belong to this graph" % n)
if n.identity is None:
raise ValueError("Node %r is not bound to a graph" % n)
def r_type_name(r):
try:
return r.__name__
except AttributeError:
return r
clauses = []
parameters = {}
if self._r_type is None:
relationship_detail = ""
elif is_collection(self._r_type):
relationship_detail = ":" + "|:".join(cypher_escape(r_type_name(t)) for t in self._r_type)
else:
relationship_detail = ":%s" % cypher_escape(r_type_name(self._r_type))
if not self._nodes:
clauses.append("MATCH (a)-[_" + relationship_detail + "]->(b)")
elif isinstance(self._nodes, Sequence):
if len(self._nodes) >= 1 and self._nodes[0] is not None:
start_node = Node.cast(self._nodes[0])
verify_node(start_node)
clauses.append("MATCH (a) WHERE id(a) = {x}")
parameters["x"] = start_node.identity
if len(self._nodes) >= 2 and self._nodes[1] is not None:
end_node = Node.cast(self._nodes[1])
verify_node(end_node)
clauses.append("MATCH (b) WHERE id(b) = {y}")
parameters["y"] = end_node.identity
if len(self._nodes) >= 3: