Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def GetIpParents(self, query):
"""Return network tokens that contain IP in query.
Args:
query: an ip string ('10.1.1.1') or nacaddr.IP object
Returns:
A sorted list of unique parent tokens.
"""
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
Args:
query: an ip string ('10.1.1.1') or nacaddr.IP object
Returns:
A sorted list of unique parent tokens.
"""
base_parents = []
recursive_parents = []
# convert string to nacaddr, if arg is ipaddr then convert str() to nacaddr
if (not isinstance(query, nacaddr.IPv4) and
not isinstance(query, nacaddr.IPv6)):
if query[:1].isdigit():
query = nacaddr.IP(query)
# Get parent token for an IP
if isinstance(query, nacaddr.IPv4) or isinstance(query, nacaddr.IPv6):
for token in self.networks:
for item in self.networks[token].items:
item = item.split('#')[0].strip()
if not item[:1].isdigit():
continue
try:
supernet = nacaddr.IP(item, strict=False)
if supernet.supernet_of(query):
base_parents.append(token)
except ValueError:
# item was not an IP
pass
# Get parent token for another token
else:
for token in self.networks:
for item in self.networks[token].items:
def _GetIpString(self, addr):
"""Formats the address object for printing in the ACL.
Args:
addr: str or ipaddr, address
Returns:
An address string suitable for the ACL.
"""
if isinstance(addr, nacaddr.IPv4) or isinstance(addr,
ipaddress.IPv4Network):
if addr.num_addresses > 1:
if self.platform == 'arista':
return addr.with_prefixlen
return '%s %s' % (addr.network_address, addr.hostmask)
return 'host %s' % (addr.network_address)
if isinstance(addr, nacaddr.IPv6) or isinstance(addr,
ipaddress.IPv6Network):
if addr.num_addresses > 1:
return addr.with_prefixlen
return 'host %s' % (addr.network_address)
# DSMO enabled
if isinstance(addr, summarizer.DSMNet):
return '%s %s' % summarizer.ToDottedQuad(addr, negate=True)
return addr
else:
saddr = 'host %s' % (saddr.network_address)
if isinstance(daddr, nacaddr.IPv4) or isinstance(daddr,
ipaddress.IPv4Network):
if daddr.num_addresses > 1:
daddr = '%s %s' % (daddr.network_address, daddr.netmask)
else:
daddr = 'host %s' % (daddr.network_address)
# inet6
if isinstance(saddr, nacaddr.IPv6) or isinstance(saddr,
ipaddress.IPv6Network):
if saddr.num_addresses > 1:
saddr = '%s/%s' % (saddr.network_address, saddr.prefixlen)
else:
saddr = 'host %s' % (saddr.network_address)
if isinstance(daddr, nacaddr.IPv6) or isinstance(daddr,
ipaddress.IPv6Network):
if daddr.num_addresses > 1:
daddr = '%s/%s' % (daddr.network_address, daddr.prefixlen)
else:
daddr = 'host %s' % (daddr.network_address)
# fix ports
if not sport:
sport = ''
elif sport[0] != sport[1]:
sport = ' range %s %s' % (cisco.PortMap.GetProtocol(sport[0], proto),
cisco.PortMap.GetProtocol(sport[1], proto))
else:
sport = ' eq %s' % (cisco.PortMap.GetProtocol(sport[0], proto))
if not dport:
self._action_table = {
'accept': '-j ACCEPT',
'deny': '-j DROP',
'reject': '-j REJECT --reject-with icmp-host-prohibited',
'reject-with-tcp-rst': '-j REJECT --reject-with tcp-reset',
'next': '-j RETURN'
}
self.trackstate = trackstate
self.term = term # term object
self.filter = filter_name # actual name of filter
self.default_action = filter_action
self.options = []
self.af = af
self.verbose = verbose
if af == 'inet6':
self._all_ips = nacaddr.IPv6('::/0')
self._action_table['reject'] = ('-j REJECT --reject-with '
'icmp6-adm-prohibited')
else:
self._all_ips = nacaddr.IPv4('0.0.0.0/0')
self._action_table['reject'] = ('-j REJECT --reject-with '
'icmp-host-prohibited')
self.term_name = '%s_%s' % (self.filter[:1], self.term.name)
"""
# inet4
if isinstance(saddr, nacaddr.IPv4) or isinstance(saddr,
ipaddress.IPv4Network):
if saddr.num_addresses > 1:
saddr = '%s %s' % (saddr.network_address, saddr.netmask)
else:
saddr = 'host %s' % (saddr.network_address)
if isinstance(daddr, nacaddr.IPv4) or isinstance(daddr,
ipaddress.IPv4Network):
if daddr.num_addresses > 1:
daddr = '%s %s' % (daddr.network_address, daddr.netmask)
else:
daddr = 'host %s' % (daddr.network_address)
# inet6
if isinstance(saddr, nacaddr.IPv6) or isinstance(saddr,
ipaddress.IPv6Network):
if saddr.num_addresses > 1:
saddr = '%s/%s' % (saddr.network_address, saddr.prefixlen)
else:
saddr = 'host %s' % (saddr.network_address)
if isinstance(daddr, nacaddr.IPv6) or isinstance(daddr,
ipaddress.IPv6Network):
if daddr.num_addresses > 1:
daddr = '%s/%s' % (daddr.network_address, daddr.prefixlen)
else:
daddr = 'host %s' % (daddr.network_address)
# fix ports
if not sport:
sport = ''
elif sport[0] != sport[1]:
Args:
ipaddress: the ip address.
comment: option comment field
token: option token name where this address was extracted from
Returns:
ipaddr.IPv4 or ipaddr.IPv6 object or raises ValueError.
Raises:
ValueError: if the string passed isn't either a v4 or a v6 address.
"""
a = ipaddr.IPNetwork(ipaddress)
if a.version == 4:
return IPv4(ipaddress, comment, token)
elif a.version == 6:
return IPv6(ipaddress, comment, token)
prefixlen_diff: Prefix length difference.
Returns:
An IPv4 object
Raises:
PrefixlenDiffInvalidError: Raised when prefixlen - prefixlen_diff results
in a negative number.
"""
if self.prefixlen == 0:
return self
if self.prefixlen - prefixlen_diff < 0:
raise PrefixlenDiffInvalidError(
'current prefixlen is %d, cannot have a prefixlen_diff of %d' % (
self.prefixlen, prefixlen_diff))
ret_addr = IPv6(ipaddr.IPv6Network.supernet(self, prefixlen_diff),
comment=self.text, token=self.token)
return ret_addr
self._action_table = {
'accept': '-j ACCEPT',
'deny': '-j DROP',
'reject': '-j REJECT --reject-with icmp-host-prohibited',
'reject-with-tcp-rst': '-j REJECT --reject-with tcp-reset',
'next': '-j RETURN'
}
self.trackstate = trackstate
self.term = term # term object
self.filter = filter_name # actual name of filter
self.default_action = filter_action
self.options = []
self.af = af
self.verbose = verbose
if af == 'inet6':
self._all_ips = nacaddr.IPv6('::/0')
self._action_table['reject'] = ('-j REJECT --reject-with '
'icmp6-adm-prohibited')
else:
self._all_ips = nacaddr.IPv4('0.0.0.0/0')
self._action_table['reject'] = ('-j REJECT --reject-with '
'icmp-host-prohibited')
self.term_name = '%s_%s' % (self.filter[:1], self.term.name)
ip: the ip address.
comment: option comment field
token: option token name where this address was extracted from
strict: If strict should be used in ipaddress object.
Returns:
ipaddress.IPv4 or ipaddress.IPv6 object or raises ValueError.
Raises:
ValueError: if the string passed isn't either a v4 or a v6 address.
"""
imprecise_ip = ipaddress.ip_network(ip, strict=strict)
if imprecise_ip.version == 4:
return IPv4(ip, comment, token, strict=strict)
elif imprecise_ip.version == 6:
return IPv6(ip, comment, token, strict=strict)