Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
person.test3 = {test3}
"""
for node in data:
test = node[0]
test2 = node[1]
test3 = node[2]
neo4j_session.run(
ingest_nodes,
test=test,
test2=test2,
test3=test3,
)
query_directory = "tests/data/test_update_detectors/test_detector"
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
storage = FileSystem
file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
file_2 = str(datetime.datetime(2019, 1, 1, 0, 0, 1)) + ".json"
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
add_shortcut(FileSystem(), ShortcutSchema(), query_directory, "most-recent", file_1)
detector_1_data = FileSystem.load(os.path.join(query_directory, file_1))
detector_1 = state_serializer.load(detector_1_data)
detector_2_data = FileSystem.load(os.path.join(query_directory, file_2))
detector_2 = state_serializer.load(detector_2_data)
assert detector_1.name == detector_2.name
assert detector_1.validation_query == detector_2.validation_query
filename = "1.json"
shortcut_path = directory + '/shortcut.json'
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias,
"--file",
filename,
])
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
assert shortcut.shortcuts[alias] == filename
shortcut.shortcuts.pop(alias)
shortcut_data = ShortcutSchema().dump(shortcut)
FileSystem.write(shortcut_data, shortcut_path)
cli = CLI(prog="cartography-detectdrift")
directory = "tests/data/test_cli_detectors/detector"
alias = "2.json"
filename = "1.json"
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias,
"--file",
filename,
])
shortcut_path = directory + '/shortcut.json'
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
with pytest.raises(KeyError):
shortcut.shortcuts[alias]
cli = CLI(prog="cartography-detectdrift")
directory = "tests/data/test_cli_detectors/detector"
alias = "test_shortcut"
filename = "3.json"
shortcut_path = os.path.join(directory, "shortcut.json")
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias,
"--file",
filename,
])
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
with pytest.raises(KeyError):
shortcut.shortcuts[alias]
"--query-directory",
directory,
"--shortcut",
alias_2,
"--file",
alias,
])
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
assert shortcut.shortcuts[alias] == filename
assert shortcut.shortcuts[alias_2] == filename
# Return shortcut back to its original state.
shortcut.shortcuts.pop(alias)
shortcut.shortcuts.pop(alias_2)
shortcut_data = ShortcutSchema().dump(shortcut)
FileSystem.write(shortcut_data, shortcut_path)
"--shortcut",
alias,
"--file",
filename,
])
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias_2,
"--file",
alias,
])
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
assert shortcut.shortcuts[alias] == filename
assert shortcut.shortcuts[alias_2] == filename
# Return shortcut back to its original state.
shortcut.shortcuts.pop(alias)
shortcut.shortcuts.pop(alias_2)
shortcut_data = ShortcutSchema().dump(shortcut)
FileSystem.write(shortcut_data, shortcut_path)
cli = CLI(prog="cartography-detectdrift")
directory = "tests/data/test_cli_detectors/detector"
alias = "test_shortcut"
filename = "1.json"
shortcut_path = directory + '/shortcut.json'
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias,
"--file",
filename,
])
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
assert shortcut.shortcuts[alias] == filename
shortcut.shortcuts.pop(alias)
shortcut_data = ShortcutSchema().dump(shortcut)
FileSystem.write(shortcut_data, shortcut_path)
def run_drift_detection(config):
try:
if not valid_directory(config.query_directory):
logger.error("Invalid Drift Detection Directory")
return
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
shortcut_data = FileSystem.load(os.path.join(config.query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
start_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.start_state,
config.start_state,
),
),
)
start_state = state_serializer.load(start_state_data)
end_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.end_state,
config.end_state,
)
else:
logger.error(
(
"Unable to auth to Neo4j, an error occurred: '%s'. driftdetect attempted to connect to Neo4j "
"with a username and password. Check your Neo4j server settings to see if the username and "
"password provided to driftdetect are valid credentials."
),
e,
)
return
with neo4j_driver.session() as session:
filename = '.'.join([str(i) for i in time.gmtime()] + ["json"])
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
for query_directory in FileSystem.walk(config.drift_detection_directory):
try:
get_query_state(session, query_directory, state_serializer, FileSystem, filename)
add_shortcut(FileSystem, shortcut_serializer, query_directory, 'most-recent', filename)
except ValidationError as err:
msg = "Unable to create State for directory {}, with data \n{}".format(
query_directory,
err.messages,
)
logger.exception(msg)
except KeyError as err:
msg = f"Could not find {err} field in state template for directory {query_directory}."
logger.exception(msg)
except FileNotFoundError as err:
logger.exception(err)
except neobolt.exceptions.CypherSyntaxError as err:
def run_add_shortcut(config):
"""
Runs add_shortcut from the command line. Does error handling.
:type config: Config Object
:param config: Config of adding shortcut
:return:
"""
if not valid_directory(config.query_directory):
logger.error("Invalid Drift Detection Directory")
return
try:
add_shortcut(FileSystem, ShortcutSchema(), config.query_directory, config.shortcut, config.filename)
except ValidationError as err:
msg = "Could not load shortcut file from json file {} in query directory {}.".format(
err.messages,
config.query_directory,
)
logger.exception(msg)