Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def GetIpParents(self, query):
"""Return network tokens that contain IP in query.
Args:
query: an ip string ('10.1.1.1') or nacaddr.IP object
Returns:
A sorted list of unique parent tokens.
"""
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
# Get parent token for another token
else:
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
# Get parent token for another token
else:
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if item[:1].isalpha() and item == query:
base_parents.append(token)
# look for nested tokens
for bp in base_parents:
done = False
for token in self.networks:
if term.expiration <= exp_info_date:
logging.info('INFO: Term %s in policy %s>%s expires '
'in less than two weeks.', term.name, self.from_zone,
self.to_zone)
if term.expiration <= current_date:
logging.warn('WARNING: Term %s in policy %s>%s is expired.',
term.name, self.from_zone, self.to_zone)
continue
# SRX address books leverage network token names for IPs.
# When excluding addresses, we lose those distinct names so we need
# to create a new unique name based off the term name before excluding.
if term.source_address_exclude:
# If we have a naked source_exclude, we need something to exclude from
if not term.source_address:
term.source_address = [nacaddr.IP('0.0.0.0/0',
term.name.upper(),
term.name.upper())]
# Use the term name as the token & parent_token
new_src_parent_token = term.name.upper() + '_SRC_EXCLUDE'
new_src_token = new_src_parent_token
for i in term.source_address_exclude:
term.source_address = nacaddr.RemoveAddressFromList(
term.source_address, i)
for j in term.source_address:
j.token = new_src_token
j.parent_token = new_src_parent_token
if term.destination_address_exclude:
if not term.destination_address:
term.destination_address = [nacaddr.IP('0.0.0.0/0',
term.name.upper(),
# validate source address
if src == 'any':
self.src = src
else:
try:
self.src = nacaddr.IP(src)
except ValueError:
raise AddressError('bad source address: %s\n' % src)
# validate destination address
if dst == 'any':
self.dst = dst
else:
try:
self.dst = nacaddr.IP(dst)
except ValueError:
raise AddressError('bad destination address: %s\n' % dst)
if type(self.pol_obj) is not policy.Policy:
raise BadPolicy('Policy object is not valid.')
self.matches = []
self.exact_matches = []
for header, terms in self.pol_obj.filters:
filtername = header.target[0].options[0]
for term in terms:
possible = []
logging.debug('checking term: %s', term.name)
if not self._AddrInside(self.src, term.source_address):
logging.debug('srcaddr does not match')
continue
def get_nets_and_highest_prefix(ip, net_group, db):
"""Find the highest prefix length in all networks given it contains the IP.
Args:
ip: the IP address contained in net_group
net_group: the name of the network object we'll be looking through
db: network and service definitions
Returns:
highest_prefix_length, networks as tuple
highest_prefix_length : the longest prefix length found,
networks : network objects
"""
highest_prefix_length = 0
networks = []
ip = nacaddr.IP(ip)
# loop through all the networks in the net_group
for net in get_nets([net_group], db)[0][1]:
# find the highest prefix length for the networks that contain the IP
if ip in net:
networks.append(str(net))
if net.prefixlen > highest_prefix_length:
highest_prefix_length = net.prefixlen
return highest_prefix_length, networks
options: the options sent to the script
db: network and service definitions
Returns:
meta, results :
((first object, second object, union of those two),
diff of those two network objects)
"""
t1, t2 = options.cmp
d1 = db.GetNet(t1)
d2 = db.GetNet(t2)
union = list(set(d1 + d2))
meta = (t1, t2, union)
results = []
for el in set(d1 + d2):
el = nacaddr.IP(el)
if el in d1 and el in d2:
results.append(str(el))
elif el in d1:
results.append(str(el))
elif el in d2:
results.append(str(el))
return meta, results
# validate source address
if src == 'any':
self.src = src
else:
try:
self.src = nacaddr.IP(src)
except ValueError:
raise AddressError('bad source address: %s\n' % src)
# validate destination address
if dst == 'any':
self.dst = dst
else:
try:
self.dst = nacaddr.IP(dst)
except ValueError:
raise AddressError('bad destination address: %s\n' % dst)
if type(self.pol_obj) is not policy.Policy:
raise BadPolicy('Policy object is not valid.')
self.matches = []
self.exact_matches = []
for header, terms in self.pol_obj.filters:
filtername = header.target[0].options[0]
for term in terms:
possible = []
logging.debug('checking term: %s', term.name)
if not self._AddrInside(self.src, term.source_address):
logging.debug('srcaddr does not match')
continue