Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# indentation, for multi-line comments, ensures that subsquent lines
# are correctly alligned with the first line of the comment.
indentation = 0
if exclude:
# len('1.1.1.1/32 except;') == 21
indentation = 21 + self._DEFAULT_INDENT
else:
# len('1.1.1.1/32;') == 14
indentation = 14 + self._DEFAULT_INDENT
# length_eol is the width of the line; b/c of the addition of the space
# and the /* characters, it needs to be a little less than the actual width
# to keep from wrapping
length_eol = 77 - indentation
if isinstance(addr, (nacaddr.IPv4, nacaddr.IPv6, summarizer.DSMNet)):
if addr.text:
if line_length == 0:
# line_length of 0 means that we don't want to truncate the comment.
line_length = len(addr.text)
# There should never be a /* or */, but be safe and ignore those
# comments
if addr.text.find('/*') >= 0 or addr.text.find('*/') >= 0:
logging.debug('Malformed comment [%s] ignoring', addr.text)
else:
text = addr.text[:line_length]
comment = ' /*'
while text:
"""Take an ip string and return an object of the correct type.
Args:
ipaddress: the ip address.
comment: option comment field
token: option token name where this address was extracted from
Returns:
ipaddr.IPv4 or ipaddr.IPv6 object or raises ValueError.
Raises:
ValueError: if the string passed isn't either a v4 or a v6 address.
"""
a = ipaddr.IPNetwork(ipaddress)
if a.version == 4:
return IPv4(ipaddress, comment, token)
elif a.version == 6:
return IPv6(ipaddress, comment, token)
def _GetIpString(self, addr):
"""Formats the address object for printing in the ACL.
Args:
addr: str or ipaddr, address
Returns:
An address string suitable for the ACL.
"""
if isinstance(addr, nacaddr.IPv4) or isinstance(addr,
ipaddress.IPv4Network):
if addr.num_addresses > 1:
if self.platform == 'arista':
return addr.with_prefixlen
return '%s %s' % (addr.network_address, addr.hostmask)
return 'host %s' % (addr.network_address)
if isinstance(addr, nacaddr.IPv6) or isinstance(addr,
ipaddress.IPv6Network):
if addr.num_addresses > 1:
return addr.with_prefixlen
return 'host %s' % (addr.network_address)
# DSMO enabled
if isinstance(addr, summarizer.DSMNet):
return '%s %s' % summarizer.ToDottedQuad(addr, negate=True)
return addr
Args:
ip: the ip address.
comment: option comment field
token: option token name where this address was extracted from
strict: If strict should be used in ipaddress object.
Returns:
ipaddress.IPv4 or ipaddress.IPv6 object or raises ValueError.
Raises:
ValueError: if the string passed isn't either a v4 or a v6 address.
"""
imprecise_ip = ipaddress.ip_network(ip, strict=strict)
if imprecise_ip.version == 4:
return IPv4(ip, comment, token, strict=strict)
elif imprecise_ip.version == 6:
return IPv6(ip, comment, token, strict=strict)
Args:
filter_name: name of the filter
action: str, action
proto: str, protocl
saddr: str or ipaddress, source address
sport: str list or none, the source port
daddr: str or ipaddress, the destination address
dport: str list or none, the destination port
icmp_type: icmp-type numeric specification (if any)
option: list or none, optional, eg. 'logging' tokens.
Returns:
string of the cisco acl line, suitable for printing.
"""
# inet4
if isinstance(saddr, nacaddr.IPv4) or isinstance(saddr,
ipaddress.IPv4Network):
if saddr.num_addresses > 1:
saddr = '%s %s' % (saddr.network_address, saddr.netmask)
else:
saddr = 'host %s' % (saddr.network_address)
if isinstance(daddr, nacaddr.IPv4) or isinstance(daddr,
ipaddress.IPv4Network):
if daddr.num_addresses > 1:
daddr = '%s %s' % (daddr.network_address, daddr.netmask)
else:
daddr = 'host %s' % (daddr.network_address)
# inet6
if isinstance(saddr, nacaddr.IPv6) or isinstance(saddr,
ipaddress.IPv6Network):
if saddr.num_addresses > 1:
saddr = '%s/%s' % (saddr.network_address, saddr.prefixlen)
ret_str.append(str(next_verbatim[1]))
return '\n'.join(ret_str)
# protocol
if not self.term.protocol:
protocol = ['ip']
else:
protocol = [proto if proto in self.ALLOWED_PROTO_STRINGS
else self.PROTO_MAP.get(proto)
for proto in self.term.protocol]
# addresses
source_address = self.term.source_address
if not self.term.source_address:
source_address = [nacaddr.IPv4('0.0.0.0/0', token='any')]
source_address_set.add(source_address[0].parent_token)
destination_address = self.term.destination_address
if not self.term.destination_address:
destination_address = [nacaddr.IPv4('0.0.0.0/0', token='any')]
destination_address_set.add(destination_address[0].parent_token)
# ports
source_port = [()]
destination_port = [()]
if self.term.source_port:
source_port = self.term.source_port
if self.term.destination_port:
destination_port = self.term.destination_port
for saddr in source_address_set:
for daddr in destination_address_set:
for sport in source_port:
saddr = '%s%s%s' % (_XML_TABLE.get('srcIpv6Start'),
saddr.with_prefixlen,
_XML_TABLE.get('srcIpv6End'),)
else:
saddr = '%s%s%s' % (
_XML_TABLE.get('srcIpv6Start'),
saddr.network_address, _XML_TABLE.get('srcIpv6End'))
sources = '%s%s' %(sources, saddr)
sources = '%s%s' %(sources, '')
destinations = ''
if destination_addr:
destinations = ''
for daddr in destination_addr:
# inet4
if isinstance(daddr, nacaddr.IPv4):
if daddr.num_addresses > 1:
daddr = '%s%s%s' % (_XML_TABLE.get('destIpv4Start'),
daddr.with_prefixlen,
_XML_TABLE.get('destIpv4End'),)
else:
daddr = '%s%s%s' % (_XML_TABLE.get('destIpv4Start'),
daddr.network_address,
_XML_TABLE.get('destIpv4End'))
destinations = '%s%s' %(destinations, daddr)
# inet6
if isinstance(daddr, nacaddr.IPv6):
if daddr.num_addresses > 1:
daddr = '%s%s%s' % (_XML_TABLE.get('destIpv6Start'),
daddr.with_prefixlen,
_XML_TABLE.get('destIpv6End'),)
else:
'reject-with-tcp-rst': '-j REJECT --reject-with tcp-reset',
'next': '-j RETURN'
}
self.trackstate = trackstate
self.term = term # term object
self.filter = filter_name # actual name of filter
self.default_action = filter_action
self.options = []
self.af = af
self.verbose = verbose
if af == 'inet6':
self._all_ips = nacaddr.IPv6('::/0')
self._action_table['reject'] = ('-j REJECT --reject-with '
'icmp6-adm-prohibited')
else:
self._all_ips = nacaddr.IPv4('0.0.0.0/0')
self._action_table['reject'] = ('-j REJECT --reject-with '
'icmp-host-prohibited')
self.term_name = '%s_%s' % (self.filter[:1], self.term.name)