Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
test2=test2,
test3=test3,
)
query_directory = "tests/data/test_update_detectors/test_detector"
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
storage = FileSystem
file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
file_2 = str(datetime.datetime(2019, 1, 1, 0, 0, 1)) + ".json"
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
add_shortcut(FileSystem(), ShortcutSchema(), query_directory, "most-recent", file_1)
detector_1_data = FileSystem.load(os.path.join(query_directory, file_1))
detector_1 = state_serializer.load(detector_1_data)
detector_2_data = FileSystem.load(os.path.join(query_directory, file_2))
detector_2 = state_serializer.load(detector_2_data)
assert detector_1.name == detector_2.name
assert detector_1.validation_query == detector_2.validation_query
assert detector_1.properties == detector_2.properties
assert detector_1.results == detector_2.results
shortcut_data = FileSystem.load(os.path.join(query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
assert shortcut.shortcuts['most-recent'] == file_1
shortcut_data = shortcut_serializer.dump(shortcut)
FileSystem.write(shortcut_data, os.path.join(query_directory, "shortcut.json"))
"""
cli = CLI(prog="cartography-detectdrift")
directory = "tests/data/test_cli_detectors/detector"
alias = "2.json"
filename = "1.json"
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias,
"--file",
filename,
])
shortcut_path = directory + '/shortcut.json'
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
with pytest.raises(KeyError):
shortcut.shortcuts[alias]
query_directory = "tests/data/test_update_detectors/test_detector"
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
storage = FileSystem
file_1 = str(datetime.datetime(2019, 1, 1, 0, 0, 2)) + ".json"
file_2 = str(datetime.datetime(2019, 1, 1, 0, 0, 1)) + ".json"
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
add_shortcut(FileSystem(), ShortcutSchema(), query_directory, "most-recent", file_1)
detector_1_data = FileSystem.load(os.path.join(query_directory, file_1))
detector_1 = state_serializer.load(detector_1_data)
detector_2_data = FileSystem.load(os.path.join(query_directory, file_2))
detector_2 = state_serializer.load(detector_2_data)
assert detector_1.name == detector_2.name
assert detector_1.validation_query == detector_2.validation_query
assert detector_1.properties == detector_2.properties
assert detector_1.results == detector_2.results
shortcut_data = FileSystem.load(os.path.join(query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
assert shortcut.shortcuts['most-recent'] == file_1
shortcut_data = shortcut_serializer.dump(shortcut)
FileSystem.write(shortcut_data, os.path.join(query_directory, "shortcut.json"))
get_query_state(neo4j_session, query_directory, state_serializer, storage, file_1)
add_shortcut(FileSystem(), ShortcutSchema(), query_directory, "most-recent", file_1)
detector_1_data = FileSystem.load(os.path.join(query_directory, file_1))
detector_1 = state_serializer.load(detector_1_data)
detector_2_data = FileSystem.load(os.path.join(query_directory, file_2))
detector_2 = state_serializer.load(detector_2_data)
assert detector_1.name == detector_2.name
assert detector_1.validation_query == detector_2.validation_query
assert detector_1.properties == detector_2.properties
assert detector_1.results == detector_2.results
shortcut_data = FileSystem.load(os.path.join(query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
assert shortcut.shortcuts['most-recent'] == file_1
shortcut_data = shortcut_serializer.dump(shortcut)
FileSystem.write(shortcut_data, os.path.join(query_directory, "shortcut.json"))
def test_nonexistent_shortcuts():
cli = CLI(prog="cartography-detectdrift")
directory = "tests/data/test_cli_detectors/detector"
alias = "test_shortcut"
filename = "3.json"
shortcut_path = os.path.join(directory, "shortcut.json")
cli.main([
"add-shortcut",
"--query-directory",
directory,
"--shortcut",
alias,
"--file",
filename,
])
shortcut_data = FileSystem.load(shortcut_path)
shortcut = ShortcutSchema().load(shortcut_data)
with pytest.raises(KeyError):
shortcut.shortcuts[alias]
mock_session = MagicMock()
mock_boltstatementresult = MagicMock()
results = [
{key_1: "1", key_2: "8"},
{key_1: "2", key_2: "9"},
{key_1: "3", key_2: "10"},
{key_1: "4", key_2: "11"},
{key_1: "5", key_2: "12"},
{key_1: "6", key_2: "13"},
{key_1: "7", key_2: "14"},
]
mock_boltstatementresult.__getitem__.side_effect = results.__getitem__
mock_boltstatementresult.__iter__.side_effect = results.__iter__
mock_session.run.return_value = mock_boltstatementresult
data = FileSystem.load("tests/data/detectors/test_multiple_expectations.json")
state_old = StateSchema().load(data)
state_new = State(state_old.name, state_old.validation_query, state_old.properties, [])
get_state(mock_session, state_new)
state_new.properties = state_old.properties
drifts = compare_states(state_old, state_new)
mock_session.run.assert_called_with(state_new.validation_query)
assert ["7", "14"] in drifts
def test_basic_drift_detection():
"""
Tests that drift detection works.
"""
data = FileSystem.load("tests/data/test_cli_detectors/detector/1.json")
start_state = StateSchema().load(data)
data = FileSystem.load("tests/data/test_cli_detectors/detector/2.json")
end_state = StateSchema().load(data)
new_results, missing_results = perform_drift_detection(start_state, end_state)
assert ['36', '37', ['38', '39', '40']] in new_results
assert ['7', '14', ['21', '28', '35']] in missing_results
def run_drift_detection(config):
try:
if not valid_directory(config.query_directory):
logger.error("Invalid Drift Detection Directory")
return
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
shortcut_data = FileSystem.load(os.path.join(config.query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
start_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.start_state,
config.start_state,
),
),
)
start_state = state_serializer.load(start_state_data)
end_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.end_state,
config.end_state,
),
logger.error("Invalid Drift Detection Directory")
return
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
shortcut_data = FileSystem.load(os.path.join(config.query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
start_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.start_state,
config.start_state,
),
),
)
start_state = state_serializer.load(start_state_data)
end_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.end_state,
config.end_state,
),
),
)
end_state = state_serializer.load(end_state_data)
new_results, missing_results = perform_drift_detection(start_state, end_state)
report_drift(new_results, missing_results, end_state.name, end_state.properties)
except ValidationError as err:
msg = "Unable to create DriftStates from files {},{} for \n{} in directory {}.".format(
config.start_state,
config.end_state,
err.messages,
config.query_directory,