Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_peeringdb_id(self):
"""
Retrieves the PeeringDB ID for this IX based on the IP addresses that
have been recorded. The ID of the PeeringDB record will be returned on
success. In any other cases 0 will be returned.
"""
network_ixlan = PeeringDB().get_ix_network_by_ip_address(
ipv6_address=self.ipv6_address, ipv4_address=self.ipv4_address
)
return network_ixlan.id if network_ixlan else 0
def index_peer_records(self, request):
return Response(
{"peer-record-count": PeeringDB().force_peer_records_discovery()}
)
def find_potential_ix_peering_sessions(self):
"""
Saves an IP address list. Each IP address of the list is the address of a
potential peering session with the current AS on an Internet Exchange.
"""
# Potential IX peering sessions
potential_ix_peering_sessions = []
# Get common IX networks between us and this AS
common = PeeringDB().get_common_ix_networks_for_asns(settings.MY_ASN, self.asn)
# For each common networks take a look at it
for us, peer in common:
peering_sessions = []
if peer.ipaddr6:
peering_sessions.append(peer.ipaddr6)
if peer.ipaddr4:
peering_sessions.append(peer.ipaddr4)
# Get all known sessions for this AS on the given IX
known_sessions = InternetExchangePeeringSession.objects.filter(
autonomous_system=self, ip_address__in=peering_sessions
)
# Check if peer IP addresses are known sessions
for peering_session in peering_sessions:
def is_peeringdb_valid(self):
"""
Tells if the PeeringDB ID for this IX is still valid. This function
will return true if the PeeringDB record for this IX is valid or if
this IX does not have a Peering DB ID set. In any other cases, the
return value will be false.
"""
if self.peeringdb_id:
peeringdb_record = PeeringDB().get_ix_network(self.peeringdb_id)
if not peeringdb_record:
return False
return True
def get_objects(self):
objects = []
known_objects = []
api = PeeringDB()
for ix in InternetExchange.objects.all():
if ix.peeringdb_id:
known_objects.append(ix.peeringdb_id)
ix_networks = api.get_ix_networks_for_asn(settings.MY_ASN) or []
slugs_occurences = {}
for ix_network in ix_networks:
if ix_network.id not in known_objects:
slug = slugify(ix_network.name)
if slug in slugs_occurences:
slugs_occurences[slug] += 1
slug = "{}-{}".format(slug, slugs_occurences[slug])
else:
def get(self, request, asn):
autonomous_system = get_object_or_404(AutonomousSystem, asn=asn)
peeringdb_contacts = PeeringDB().get_autonomous_system_contacts(
autonomous_system.asn
)
common_ix_and_sessions = []
for ix in autonomous_system.get_common_internet_exchanges():
common_ix_and_sessions.append(
{
"internet_exchange": ix,
"has_potential_ix_peering_sessions": autonomous_system.has_potential_ix_peering_sessions(
ix
),
}
)
context = {
"autonomous_system": autonomous_system,
"peeringdb_contacts": peeringdb_contacts,
def get_prefixes(self):
"""
Returns a list of prefixes found in PeeringDB for this IX.
"""
return PeeringDB().get_prefixes_for_ix_network(self.peeringdb_id)
def create_from_peeringdb(asn):
peeringdb_network = PeeringDB().get_autonomous_system(asn)
if not peeringdb_network:
return None
try:
return AutonomousSystem.objects.get(asn=peeringdb_network.asn)
except AutonomousSystem.DoesNotExist:
values = {
"asn": peeringdb_network.asn,
"name": peeringdb_network.name,
"irr_as_set": peeringdb_network.irr_as_set,
"ipv6_max_prefixes": peeringdb_network.info_prefixes6,
"ipv4_max_prefixes": peeringdb_network.info_prefixes4,
}
autonomous_system = AutonomousSystem(**values)
autonomous_system.save()
def get_common_internet_exchanges(self):
"""
Return all IX we have in common with the AS.
"""
# Get common IX networks between us and this AS
common = PeeringDB().get_common_ix_networks_for_asns(settings.MY_ASN, self.asn)
return InternetExchange.objects.filter(
peeringdb_id__in=[us.id for us, _ in common]
).order_by("name", "slug")
def synchronize_with_peeringdb(self):
"""
Synchronize AS properties with those found in PeeringDB.
"""
peeringdb_info = PeeringDB().get_autonomous_system(self.asn)
# No record found, nothing to sync
if not peeringdb_info:
return False
if self.name_peeringdb_sync:
self.name = peeringdb_info.name
if self.irr_as_set_peeringdb_sync:
self.irr_as_set = peeringdb_info.irr_as_set
if self.ipv6_max_prefixes_peeringdb_sync:
self.ipv6_max_prefixes = peeringdb_info.info_prefixes6
if self.ipv4_max_prefixes_peeringdb_sync:
self.ipv4_max_prefixes = peeringdb_info.info_prefixes4
# Save the new AS
self.save()