Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_transform_and_load_subnets(neo4j_session):
"""
Ensure we can transform and load subnets.
"""
subnet_res = tests.data.gcp.compute.VPC_SUBNET_RESPONSE
subnet_list = cartography.intel.gcp.compute.transform_gcp_subnets(subnet_res)
cartography.intel.gcp.compute.load_gcp_subnets(neo4j_session, subnet_list, TEST_UPDATE_TAG)
query = """
MATCH(subnet:GCPSubnet)
RETURN subnet.id, subnet.region, subnet.gateway_address, subnet.ip_cidr_range, subnet.private_ip_google_access,
subnet.vpc_partial_uri
"""
nodes = neo4j_session.run(query)
actual_nodes = {
(
n['subnet.id'],
n['subnet.region'],
n['subnet.gateway_address'],
n['subnet.ip_cidr_range'],
n['subnet.private_ip_google_access'],
n['subnet.vpc_partial_uri'],
def test_parse_compute_full_uri_to_partial_uri():
subnet_uri = 'https://www.googleapis.com/compute/v1/projects/project-abc/regions/europe-west2/subnetworks/default'
inst_uri = 'https://www.googleapis.com/compute/v1/projects/project-abc/zones/europe-west2-b/disks/instance-1'
vpc_uri = 'https://www.googleapis.com/compute/v1/projects/project-abc/global/networks/default'
assert cartography.intel.gcp.compute._parse_compute_full_uri_to_partial_uri(subnet_uri) == \
'projects/project-abc/regions/europe-west2/subnetworks/default'
assert cartography.intel.gcp.compute._parse_compute_full_uri_to_partial_uri(inst_uri) == \
'projects/project-abc/zones/europe-west2-b/disks/instance-1'
assert cartography.intel.gcp.compute._parse_compute_full_uri_to_partial_uri(vpc_uri) == \
'projects/project-abc/global/networks/default'
def _sync_single_project(neo4j_session, resources, project_id, gcp_update_tag, common_job_parameters):
"""
Handles graph sync for a single GCP project.
:param neo4j_session: The Neo4j session
:param resources: namedtuple of the GCP resource objects
:param project_id: The project ID number to sync. See the `projectId` field in
https://cloud.google.com/resource-manager/reference/rest/v1/projects
:param gcp_update_tag: The timestamp value to set our new Neo4j nodes with
:param common_job_parameters: Other parameters sent to Neo4j
:return: Nothing
"""
compute.sync(neo4j_session, resources.compute, project_id, gcp_update_tag, common_job_parameters)
storage.sync_gcp_buckets(neo4j_session, resources.storage, project_id, gcp_update_tag, common_job_parameters)