Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
collection_endpoint = {}
classes_endpoint = {}
collection = 0
classes = 0
print("split entrypoint into 2 types of endpoints collection and classes")
for obj in url_node.properties:
if obj != "@id" and obj != "@type" and obj != "@context":
if obj in api_doc.collections.keys():
collection = 1
# print ("collection ddddd",new_obj)
collection_endpoint[obj] = str(url_node.properties[obj])
else:
classes = 1
classes_endpoint[obj] = str(url_node.properties[obj])
if collection == 1:
collection_node = Node(
label="id",
alias="collection_endpoint",
properties=collection_endpoint)
redis_graph.add_node(collection_node)
edge = Edge(url_node, "has_collection_endpoint", collection_node)
redis_graph.add_edge(edge)
# redis_graph.commit()
print("collection endpoint node ", collection_node)
endpointCollection(collection_node, url_node)
# redis_graph.commit()
if classes == 1:
classes_node = Node(
label="id",
alias="classes_endpoint",
properties=classes_endpoint)
redis_graph.add_node(classes_node)
if obj["@type"] == remove_vocab(obj1["@id"]):
obj_properties["methods"] = str(
get_method(obj1["supportedOperation"]))
obj_properties_classlist = get_property(
obj1["supportedProperty"])
obj_properties["class_property"] = str(
obj_properties_classlist)
for obj2 in obj1["supportedProperty"]:
if isinstance(new_file1[obj2["title"]], str):
member[obj2["title"]] = str(
new_file1[obj2["title"]].replace("'", ""))
member["@id"] = str(obj["@id"])
member["@type"] = str(obj["@type"])
obj_properties["property"] = str(member)
objects_node = Node(
label="id",
alias=str(member_alias),
properties=obj_properties)
redis_graph.add_node(objects_node)
edge = Edge(obj_collection_node, "has_" +
str(obj_collection_node.properties["title"]),
objects_node)
redis_graph.add_edge(edge)
print("commiting collection object", member_alias)
redis_graph.commit()
print(
"property of obj which can be class",
obj_properties["class_property"],
"collection",
obj1["@id"])
connect_nodes(url_node, "has" + member_alias, objects_node)
def get_endpoints(entrypoint_obj):
print("creating entrypoint node")
url_node = Node(label="id", alias="Entrypoint", properties=entrypoint_obj)
redis_graph.add_node(url_node)
print("commiting")
redis_graph.commit()
print("entrypoint node ", url_node)
get_apistructure(entrypoint_obj, url_node)
if obj1["@id"] == obj:
properties = []
if obj1["supportedOperation"]:
objects_properties["methods"] = str(
get_method(obj1["supportedOperation"]))
if obj1["supportedProperty"]:
for obj2 in obj1["supportedProperty"]:
properties.append(obj2["title"])
objects_properties["property"] = str(properties)
obj_properties_classlist = get_property(
obj1["supportedProperty"])
objects_properties["class_property"] = str(
obj_properties_classlist)
print(objects_node.alias + remove_vocab(obj))
obj_alias = str(objects_node.alias + remove_vocab(obj)).lower()
obj_node = Node(
label="id",
alias=obj_alias,
properties=objects_properties)
redis_graph.add_node(obj_node)
print("commiting objects_property", obj_node)
redis_graph.commit()
print("property node", obj_node)
edge = Edge(objects_node, "has" + remove_vocab(obj), obj_node)
redis_graph.add_edge(edge)
print("commiting objects_property edge", obj_node)
redis_graph.commit()
print("property node", obj_node)
connect_nodes(url_node, "has" +
str(objects_node.alias +
remove_vocab(obj)), obj_node)
if obj_properties_classlist:
for obj in new_dict:
if obj != "@id":
if not isinstance(new_dict[obj], list) and not isinstance(
new_dict[obj], dict):
new_obj = remove_special_character(obj)
# new_obj = ''.join(e for e in obj if e.isalnum())
idd = source_node.label + "_" + new_obj
if new_dict[obj]:
s = remove_special_character(new_dict[obj]).lower()
else:
s = new_dict[obj]
if s is None:
node_id = "none"
else:
node_id = remove_special_character(s)
node = Node(
label=idd,
alias=node_id,
properties={
str(idd): str(s)})
redis_graph.add_node(node)
edge = Edge(source_node, new_obj, node)
redis_graph.add_edge(edge)
elif obj == "supportedOperation":
if new_dict[obj]:
get_supp_oper(new_dict[obj], source_node)
else:
edge = Edge(source_node, obj, null_node)
redis_graph.add_edge(edge)
elif obj == "supportedProperty":
def get_prop(new_dict, source_node):
predicate = "property"
idd = source_node.label + "_" + "property_id"
s = remove_special_character(new_dict["@id"]).lower()
node_id = s
node_prop_id = Node(
label=idd,
alias=node_id,
properties={
str(idd): str(s)})
redis_graph.add_node(node_prop_id)
edge = Edge(source_node, predicate, node_prop_id)
redis_graph.add_edge(edge)
for obj in new_dict:
if obj != "@id":
if not isinstance(new_dict[obj], list) and not isinstance(
new_dict[obj], dict):
new_obj = remove_special_character(obj)
idd = source_node.label + "_" + new_obj
if new_dict[obj] and not isinstance(new_dict[obj], type(True)):
s = remove_special_character(new_dict[obj]).lower()
else:
def get_supp_oper(new_file, source_node):
predicate = "supportedoperation"
count = 1
for obj in new_file:
idd = source_node.label + "_id1" + str(count)
node_id = remove_special_character(obj["@id"])
count += 1
node_oper = Node(
label=idd,
alias=node_id,
properties={
idd: str(node_id)})
redis_graph.add_node(node_oper)
edge = Edge(source_node, predicate, node_oper)
redis_graph.add_edge(edge)
get_supp_oper_value(obj, node_oper)
for obj in new_dict:
if obj != "@id":
if not isinstance(new_dict[obj], list) and not isinstance(
new_dict[obj], dict):
new_obj = remove_special_character(obj)
idd = source_node.label + "_" + new_obj
if new_dict[obj]:
s = remove_special_character(new_dict[obj]).lower()
else:
s = new_dict[obj]
if s is None or s == "delete":
node_id = "none"
else:
node_id = s
node = Node(
label=idd,
alias=node_id,
properties={
str(idd): str(s)})
redis_graph.add_node(node)
edge = Edge(source_node, new_obj, node)
redis_graph.add_edge(edge)
elif obj == "statusCodes":
if new_dict[obj]:
get_status_code(new_dict[obj], source_node)
else:
edge = Edge(source_node, obj, null_node)
redis_graph.add_edge(edge)
def check_url(new_file, url):
response, content = http.request(url, "GET")
link = response.get('link')
match = APIDOC_RE.match(link)
api_doc = match.groups()[0]
if api_doc != url:
print url, "not match", api_doc
new_url = api_doc
return final_file(new_url)
else:
prop = {"vocabid1": url}
print api_doc, "yes match", url
print "now we have to create the graph for ", url
node_id = remove_special_character(new_file["@id"])
node_url = Node(
label="id1",
alias=node_id,
properties={
"id1": str(
new_file["@id"])})
redis_graph.add_node(node_url)
get_type(new_file, node_url)
return get_supp_class(node_url, "supportedclass",
new_file["supportedClass"])
def get_endpoints(self,api_doc, redis_connection):
"""Create node for entrypoint"""
print("creating entrypoint node")
entrypoint_properties = {}
entrypoint_properties["@id"] = str("vocab:Entrypoint")
entrypoint_properties["url"] = str(
api_doc.entrypoint.url) + str(api_doc.entrypoint.api)
entrypoint_properties["supportedOperation"] = "GET"
entrypoint_node = Node(
label="id",
alias="Entrypoint",
properties=entrypoint_properties)
self.redis_graph.add_node(entrypoint_node)
return self.get_apistructure(entrypoint_node, api_doc)