Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_gcp_projects(neo4j_session):
cartography.intel.gcp.crm.load_gcp_projects(
neo4j_session,
tests.data.gcp.crm.GCP_PROJECTS,
TEST_UPDATE_TAG,
)
expected_nodes = {
("sample-id-232323", "sample-number-121212"),
}
nodes = neo4j_session.run(
"""
MATCH (d:GCPProject) return d.id, d.projectnumber
"""
)
actual_nodes = {(n['d.id'], n['d.projectnumber']) for n in nodes}
assert actual_nodes == expected_nodes
def test_parse_compute_full_uri_to_partial_uri():
subnet_uri = 'https://www.googleapis.com/compute/v1/projects/project-abc/regions/europe-west2/subnetworks/default'
inst_uri = 'https://www.googleapis.com/compute/v1/projects/project-abc/zones/europe-west2-b/disks/instance-1'
vpc_uri = 'https://www.googleapis.com/compute/v1/projects/project-abc/global/networks/default'
assert cartography.intel.gcp.compute._parse_compute_full_uri_to_partial_uri(subnet_uri) == \
'projects/project-abc/regions/europe-west2/subnetworks/default'
assert cartography.intel.gcp.compute._parse_compute_full_uri_to_partial_uri(inst_uri) == \
'projects/project-abc/zones/europe-west2-b/disks/instance-1'
assert cartography.intel.gcp.compute._parse_compute_full_uri_to_partial_uri(vpc_uri) == \
'projects/project-abc/global/networks/default'
def _ensure_local_neo4j_has_test_firewall_data(neo4j_session):
cartography.intel.gcp.compute.load_gcp_ingress_firewalls(
neo4j_session,
tests.data.gcp.compute.TRANSFORMED_FW_LIST,
TEST_UPDATE_TAG,
)
def test_transform_gcp_vpcs():
"""
Ensure that transform_gcp_vpcs() returns a list of VPCs, computes correct partial_uris, and parses the nested
objects correctly.
"""
vpc_list = cartography.intel.gcp.compute.transform_gcp_vpcs(VPC_RESPONSE)
assert len(vpc_list) == 1
vpc = vpc_list[0]
assert vpc['partial_uri'] == 'projects/project-abc/global/networks/default'
assert vpc['routing_config_routing_mode'] == 'REGIONAL'
def build_default_sync():
"""
Build the default cartography sync, which runs all intelligence modules shipped with the cartography package.
:rtype: cartography.sync.Sync
:return: The default cartography sync object.
"""
sync = Sync()
sync.add_stages([
('create-indexes', cartography.intel.create_indexes.run),
('aws', cartography.intel.aws.start_aws_ingestion),
('gcp', cartography.intel.gcp.start_gcp_ingestion),
('gsuite', cartography.intel.gsuite.start_gsuite_ingestion),
('crxcavator', cartography.intel.crxcavator.start_extension_ingestion),
('analysis', cartography.intel.analysis.run),
])
return sync