Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _ensure_local_neo4j_has_test_instance_data(neo4j_session):
cartography.intel.gcp.compute.load_gcp_instances(
neo4j_session,
tests.data.gcp.compute.TRANSFORMED_GCP_INSTANCES,
TEST_UPDATE_TAG,
)
def test_transform_and_load_gcp_instances_and_nics(neo4j_session):
"""
Ensure that we can correctly transform and load GCP instances.
"""
instance_responses = [tests.data.gcp.compute.GCP_LIST_INSTANCES_RESPONSE]
instance_list = cartography.intel.gcp.compute.transform_gcp_instances(instance_responses)
cartography.intel.gcp.compute.load_gcp_instances(neo4j_session, instance_list, TEST_UPDATE_TAG)
instance_id1 = 'projects/project-abc/zones/europe-west2-b/instances/instance-1-test'
instance_id2 = 'projects/project-abc/zones/europe-west2-b/instances/instance-1'
nic_query = """
MATCH(i:GCPInstance)-[r:NETWORK_INTERFACE]->(nic:GCPNetworkInterface)
OPTIONAL MATCH (i)-[:TAGGED]->(t:GCPNetworkTag)
RETURN i.id, i.zone_name, i.project_id, i.hostname, t.value, r.lastupdated, nic.nic_id, nic.private_ip
"""
objects = neo4j_session.run(nic_query)
actual_nodes = {
(
o['i.id'],
o['i.zone_name'],
o['i.project_id'],
o['nic.nic_id'],