Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_valid_directory():
"""
Tests valid directory function.
"""
config = UpdateConfig("tests", "localhost")
assert valid_directory(config.drift_detection_directory)
config.drift_detection_directory = "temp"
assert not valid_directory(config.drift_detection_directory)
def test_valid_directory():
"""
Tests valid directory function.
"""
config = UpdateConfig("tests", "localhost")
assert valid_directory(config.drift_detection_directory)
config.drift_detection_directory = "temp"
assert not valid_directory(config.drift_detection_directory)
def run_get_states(config):
"""
Handles neo4j errors and then updates detectors.
:type config: Config Object
:param config: Config Object from CLI
:return:
"""
if not valid_directory(config.drift_detection_directory):
logger.error("Invalid Drift Detection Directory")
return
neo4j_auth = None
if config.neo4j_user or config.neo4j_password:
neo4j_auth = (config.neo4j_user, config.neo4j_password)
try:
neo4j_driver = GraphDatabase.driver(
config.neo4j_uri,
auth=neo4j_auth,
)
except neobolt.exceptions.ServiceUnavailable as e:
logger.debug("Error occurred during Neo4j connect.", exc_info=True)
logger.error(
(
"Unable to connect to Neo4j using the provided URI '%s', an error occurred: '%s'. Make sure the "
"Neo4j server is running and accessible from your network."
def run_drift_detection(config):
try:
if not valid_directory(config.query_directory):
logger.error("Invalid Drift Detection Directory")
return
state_serializer = StateSchema()
shortcut_serializer = ShortcutSchema()
shortcut_data = FileSystem.load(os.path.join(config.query_directory, "shortcut.json"))
shortcut = shortcut_serializer.load(shortcut_data)
start_state_data = FileSystem.load(
os.path.join(
config.query_directory, shortcut.shortcuts.get(
config.start_state,
config.start_state,
),
),
)
start_state = state_serializer.load(start_state_data)
end_state_data = FileSystem.load(
def run_add_shortcut(config):
"""
Runs add_shortcut from the command line. Does error handling.
:type config: Config Object
:param config: Config of adding shortcut
:return:
"""
if not valid_directory(config.query_directory):
logger.error("Invalid Drift Detection Directory")
return
try:
add_shortcut(FileSystem, ShortcutSchema(), config.query_directory, config.shortcut, config.filename)
except ValidationError as err:
msg = "Could not load shortcut file from json file {} in query directory {}.".format(
err.messages,
config.query_directory,
)
logger.exception(msg)