Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def post_db_init(self):
"""Set up the labels on the graph"""
GraphNode.post_db_init(self)
@classmethod
def meta_key_attributes(cls):
"""
Return our key attributes in decreasing order of significance
:return: [str]
"""
return ["macaddr", "scope", "domain"]
def post_db_init(self):
"""Set up the labels on the graph"""
GraphNode.post_db_init(self)
@registergraphclass
class IPaddrNode(GraphNode):
"""An object that represents a v4 or v6 IP address without a port - characterized by its
IP address. They are always represented in the database in ipv6 format.
It is must be unique within the 'subnet' passed as a parameter
"""
StoreHostNames = True
def __init__(self, ipaddr, domain, subnet):
"""
Construct an IPaddrNode - validating our parameters
:param ipaddr: Union(str, pyIPaddr): the IP address for this node
:param subnet: Subnet: The subnet this IP address is on...
"""
GraphNode.__init__(self, domain=domain)
if isinstance(ipaddr, str):
@classmethod
def meta_key_attributes(cls):
"""Return our key attributes in order of significance"""
return ["ipport", "domain"]
def format_ipport(self):
"""
Format the ip and port into our key field
Note that we make the port the most significant part of the key - which
should allow some more interesting queries.
"""
return "%s_%s_%s" % (self.port, self.protocol, self.ipaddr)
@registergraphclass
class ProcessNode(GraphNode):
"""A node representing a running process in a host"""
# R0913: Too many arguments (9/7)
# pylint: disable=R0913
def __init__(
self,
domain,
processname,
host,
pathname,
argv,
uid,
gid,
cwd,
roles=None,
is_monitored=False,
def delete_an_array_item(currarray, itemtodel):
"""Function to delete an item from an array of strings (like for roles)"""
if isinstance(itemtodel, (tuple, list)):
for item in itemtodel:
currarray = delete_an_array_item(currarray, item)
return currarray
assert isinstance(itemtodel, str)
if itemtodel is not None and itemtodel in currarray:
currarray = currarray.remove(itemtodel)
if len(currarray) == 0:
currarray = [""] # Limitation of Neo4j
return currarray
@registergraphclass
class BPRules(GraphNode):
"""Class defining best practice rules"""
def __init__(self, bp_class, json, rulesetname):
GraphNode.__init__(self, domain="metadata")
self.bp_class = bp_class
self.rulesetname = rulesetname
self.json = json
self._jsonobj = pyConfigContext(json)
def jsonobj(self):
"""Return the JSON object corresponding to our rules"""
return self._jsonobj
@classmethod
def meta_key_attributes(cls):
"""Return our key attributes in order of significance"""
self.rulesetname = rulesetname
self.basisrules = basisrules
if self.basisrules is None:
return
query = CMAconsts.QUERY_RULESET_RULES
parent = CMAdb.store.load_cypher_node(query, BPRuleSet, params={"name": basisrules})
CMAdb.store.relate_new(self, CMAconsts.REL_basedon, parent)
@classmethod
def meta_key_attributes(cls):
"""Return our key attributes in order of significance"""
return ["rulesetname"]
@registergraphclass
class NICNode(GraphNode):
"""
An object that represents a NIC - characterized by its MAC address
We handily ignore the fact that a single NIC can have multiple MAC addresses...
One of the problematic issues about NICs is that in fact, they can be duplicated.
Theory says otherwise "mostly" but obvious examples are the loopback device and
virtual NICs. There is no mechanism to insure that virtual NICs aren't duplicated
within an enterprise. So the practical matter is that they can show up multiple times,
and the same NIC can show up in mulitple contexts - confusing the issue.
And what is the issue? The issue is that we want each NIC (real or virtual) to appear exactly
once in our database.
There are currently three different contexts in which we might find a NIC:
a) It might be a NIC attached to one of our Drones. It might be real, or it might
def _json_escape(stringthing):
"""Escape this string according to JSON string escaping rules"""
stringthing = GraphNode.REESC.sub(r"\\\\", stringthing)
stringthing = GraphNode.REQUOTE.sub(r"\"", stringthing)
return stringthing
def _json_escape(stringthing):
"""Escape this string according to JSON string escaping rules"""
stringthing = GraphNode.REESC.sub(r"\\\\", stringthing)
stringthing = GraphNode.REQUOTE.sub(r"\"", stringthing)
return stringthing
of the fact that Neo4j restricts what kind of objects we can
have as Node properties.
"""
attrstodump = []
for attr in Store.safe_attrs(self):
if includemap is not None and attr not in includemap:
continue
if excludemap is not None and attr in excludemap:
continue
attrstodump.append(attr)
ret = "{"
comma = ""
attrstodump.sort()
for attr in attrstodump:
ret += '%s"%s": %s' % (comma, attr, GraphNode._json_elem(getattr(self, attr)))
comma = ","
ret += "}"
return ret
self.rulesetname = rulesetname
self.json = json
self._jsonobj = pyConfigContext(json)
def jsonobj(self):
"""Return the JSON object corresponding to our rules"""
return self._jsonobj
@classmethod
def meta_key_attributes(cls):
"""Return our key attributes in order of significance"""
return ["bp_class", "rulesetname"]
@registergraphclass
class BPRuleSet(GraphNode):
"""Class defining best practice rule sets"""
def __init__(self, rulesetname, basisrules=None):
GraphNode.__init__(self, domain="metadata")
self.rulesetname = rulesetname
self.basisrules = basisrules
if self.basisrules is None:
return
query = CMAconsts.QUERY_RULESET_RULES
parent = CMAdb.store.load_cypher_node(query, BPRuleSet, params={"name": basisrules})
CMAdb.store.relate_new(self, CMAconsts.REL_basedon, parent)
@classmethod
def meta_key_attributes(cls):
"""Return our key attributes in order of significance"""
return ["rulesetname"]
def node_to_class(neo_node):
"""
Return the class that corresponds to this Py2neo Neo4j Node object
:param neo_node: py2neo.Neo4j.node
:return:
"""
return GraphNode.str_to_class(str(neo_node["nodetype"]))