Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_can_match_one_by_id(movie_graph):
# given
keanu_0 = Person.match(movie_graph, "Keanu Reeves").first()
node_id = keanu_0.__node__.identity
# when
class PersonById(MovieGraphObject):
__primarylabel__ = "Person"
name = Property()
year_of_birth = Property(key="born")
acted_in = RelatedTo(Film)
directed = RelatedTo("Film")
produced = RelatedTo("test.fixtures.ogm.Film")
keanu = PersonById.match(movie_graph, node_id).first()
# then
assert keanu.name == "Keanu Reeves"
assert keanu.year_of_birth == 1964
title = Property()
tag_line = Property(key="tagline", default="Bit boring")
year_of_release = Property(key="released")
actors = RelatedFrom("Person", "ACTED_IN")
def __init__(self, title):
self.title = title
class Person(MovieGraphObject):
__primarykey__ = "name"
name = Property()
year_of_birth = Property(key="born")
acted_in = RelatedTo(Film)
directed = RelatedTo("Film")
produced = RelatedTo("test.fixtures.ogm.Film")
def __hash__(self):
return hash(self.name)
class MacGuffin(MovieGraphObject):
pass
class BaseThing(GraphObject):
__primarylabel__ = "MyLabel"
__primarykey__ = "my_key"
self.seq = seq
self.ts = ts
self.method = method
self.url = url
self.uuid = "{} [{} {}] {}.{}.{}".format(type(self).__name__, seq, ts, projname, session, user)
class HTTPResponse(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
session = Property()
user = Property()
seq = Property()
ts = Property()
status = Property()
Header = RelatedTo("HeaderList")
Body = RelatedTo("Body") # We can use any other type of node. Apparently this library does not to type enforcement for nodes.
def __init__(self, projname=None, session=None, user=None, seq=None, ts=None, status=None):
self.projname = projname
self.session = session
self.user = user
self.seq = seq
self.ts = ts
size = Property()
setuid = Property()
setgid = Property()
sticky = Property()
md5sum = Property()
sha1sum = Property()
sha256sum = Property()
sha512sum = Property()
inode_type = Property()
file_type = Property()
mime_type = Property()
checksec = Property()
# ELF security
relro = Property()
canary = Property()
nx = Property()
pie = Property()
rpath = Property()
runpath = Property()
symbols = Property()
fortify_source = Property()
fortified = Property()
fortifyable = Property()
# PE security
dynamic_base = Property()
no_seh = Property()
guard_cf = Property()
force_integrity = Property()
nx_compat = Property()
high_entropy_va = Property()
signed = Property()
cat_filepath = Property()
class SeleneseCommand(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
session = Property()
user = Property()
seq = Property()
ts = Property()
command = Property()
target = Property()
value = Property()
Command = RelatedTo("DataValue")
Target = RelatedTo("DataValue")
Value = RelatedTo("DataValue")
Next = RelatedTo("SeleneseCommand")
Caused = RelatedTo("HTTPRequest")
def __init__(self, projname=None, session=None, user=None, seq=None, ts=None, command=None, target=None, value=None):
self.projname=projname
self.session = session
self.user = user
self.seq = seq
self.ts = ts
self.command = command
def __init__(self, projname=None, url=None):
self.projname=projname
self.url = url
self.uuid = str(uuid4())
class QueryString(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
qs = Property()
Parameter = RelatedTo("KeyValuePair")
def __init__(self, projname=None, qs=None):
self.projname=projname
self.qs = qs
self.uuid = str(uuid4())
class HeaderList(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
def __init__(self, projname=None, statement=None):
self.projname = projname
self.stmt = statement
self.uuid = str(uuid4())
class QueryString(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
qs = Property()
Parameter = RelatedTo("KeyValuePair")
def __init__(self, projname=None, qs=None):
self.projname = projname
self.qs = qs
self.uuid = str(uuid4())
class SQLQuery(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
def __init__(self, projname, dm_type, session, user, seq, ts, message):
super(Event, self).__init__(projname, dm_type)
self.session = session
self.user = user
self.seq = seq
self.ts = ts
self.message = message
self.uuid = "{} - {}.{}.{}.{}.{}".format(dm_type, projname, session,
user, seq, ts)
class AbstractEvent(BasicNode):
""" Describe an abstract observation of a dynamic trace
"""
operation = Property()
seq = Property()
message = Property()
IsFollowedBy = RelatedTo("AbstractEvent")
Caused = RelatedTo("AbstractEvent") # COMMENT: for cross-tiers causality
IsGeneratedBy = RelatedTo("AbstractEvent") # COMMENT: for intra-tier causality
Abstracts = RelatedTo("Event")
def __init__(self, projname, dm_type, operation, seq, message):
super(AbstractEvent, self).__init__(projname, dm_type)
self.operation = operation
self.seq = seq
self.message = message
self.uuid = "{} - {}.{}.{}".format(dm_type, projname, operation, seq)
from uuid import uuid4
from GenericElements import KeyValuePair,DataValue
from ApplicationDataLevelSQL import SQLQuery
from ApplicationDataLevelSession import PHPSession
class HTTPRequest(GraphObject):
__primarykey__ = "uuid"
uuid = Property()
projname = Property()
session = Property()
user = Property()
seq = Property()
ts = Property()
method = Property()
url = Property()
URL = RelatedTo("URL")
Header = RelatedTo("HeaderList")
ABSTRACTSTO = RelatedTo("AbstractHTTPRequest")
# We can use any other type of node.
# Apparently this library does not to type enforcement for nodes.
Body = RelatedTo("Body")
Next = RelatedTo("HTTPRequest")
Transaction = RelatedTo("HTTPResponse")
Caused = RelatedTo(["SQLQuery", "PHPSession"])
self.pos = pos
"""
**************************
DATA FLOW
**************************
"""
class Variable(BasicNode):
"""
This represent a basic value
"""
session = Property()
user = Property()
seq = Property()
name = Property()
value = Property()
syntype = Property()
semtype = Property()
proptype = Property()
PropagatesTo = RelatedTo(["Variable"]) # COMMENT: here we list all nodes that can be connected in chains
HasName = RelatedTo(["PTTerminalNode"])
HasValue = RelatedTo(["PTTerminalNode"])
BelongsTo = RelatedTo(["DFAState", "Event"])
def __init__(self, projname, dm_type, session, user, seq, name, value):
super(Variable, self).__init__(projname, dm_type)
self.session = session