Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def work():
"""work
"""
redis_configs = [{
"host": socket.gethostbyname(socket.gethostname()),
"port": 7430
}, ]
graph = redis_graph.RedisGraph("sub_graph", redis_configs, 64)
first = True
for batch_train_samples, batch_train_labels in batch_info:
start_nodes = batch_train_samples
nodes = start_nodes
eids = []
eid2edges = {}
for max_deg in samples:
pred, pred_eid = graph.sample_predecessor(
start_nodes, max_degree=max_deg, return_eids=True)
for _dst, _srcs, _eids in zip(start_nodes, pred, pred_eid):
for _src, _eid in zip(_srcs, _eids):
eid2edges[_eid] = (_src, _dst)
last_nodes = nodes
nodes = [nodes, pred]
eids = [eids, pred_eid]