Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if isinstance(element, tuple):
range_start, range_end = element
if range_start == range_end:
output.append('%d' % range_start)
else:
output.append('%d-%d' % (range_start, range_end))
else:
output.append(str(element))
if len(output) > 1:
# idiosyncrasy of nftables output: no leading space to trailing }
return '{ ' + ', '.join(output) + '}'
else:
return output[0]
class Nftables(aclgenerator.ACLGenerator):
"""nftables generator.
This class takes a policy object and renders the output into a syntax
which is nft intepreter.
"""
SUFFIX = '.nft'
_PLATFORM = 'nftables'
_TERM = Term
# https://wiki.nftables.org/wiki-nftables/index.php/Quick_reference-nftables_in_10_minutes#Tables
_VALID_ADDRESS_FAMILIES = {'inet': 'ip', 'inet6': 'ip6', 'mixed': 'inet'}
# https://wiki.nftables.org/wiki-nftables/index.php/Netfilter_hooks
_VALID_HOOK_NAMES = set(['prerouting', 'input', 'forward',
'output', 'postrouting'])
def _BuildTokens(self):
"""
ret_ports = []
for protocol in protocols:
if protocol in self._PROTOCOL_MAP:
return [str(self._PROTOCOL_MAP[protocol])]
for start_port, end_port in ports:
ret_ports.append('%s %s' %
(protocol.lower(), ' '.join(
str(x) for x in set([start_port, end_port]))))
return ret_ports
class Aruba(aclgenerator.ACLGenerator):
"""An Aruba policy object.
This class takes a policy object and renders the output (via __str__ method)
into a syntax which is understood by Aruba devices.
Args:
pol: policy.Policy object.
"""
SUFFIX = '.aacl'
_ACL_LINE_HEADER = 'ip access-list session'
def _BuildTokens(self):
"""Build supported tokens for platform.
"""
ret_ports = []
for protocol in protocols:
if protocol in self._PROTOCOL_MAP:
return [str(self._PROTOCOL_MAP[protocol])]
for start_port, end_port in ports:
ret_ports.append('%s %s' %
(protocol.lower(), ' '.join(
str(x) for x in set([start_port, end_port]))))
return ret_ports
class Aruba(aclgenerator.ACLGenerator):
"""An Aruba policy object.
This class takes a policy object and renders the output (via __str__ method)
into a syntax which is understood by Aruba devices.
Args:
pol: policy.Policy object.
"""
SUFFIX = '.aacl'
_ACL_LINE_HEADER = 'ip access-list session'
def _BuildTokens(self):
"""Build supported tokens for platform.
norm_ports = []
if norm_ports:
if len(norm_ports) == 1:
portstrings.append('--%sport %s' % (direction, norm_ports[0]))
else:
portstrings.append('-m multiport --%sports %s' %
(direction, ','.join(norm_ports)))
return portstrings
def _SetDefaultAction(self):
"""If term does not specify action, use filter default action."""
if not self.term.action:
self.term.action[0].value = self.default_action
class Iptables(aclgenerator.ACLGenerator):
"""Generates filters and terms from provided policy object."""
_PLATFORM = 'iptables'
_DEFAULT_PROTOCOL = 'all'
SUFFIX = ''
_RENDER_PREFIX = None
_RENDER_SUFFIX = None
_DEFAULTACTION_FORMAT = '-P %s %s'
_DEFAULTACTION_FORMAT_CUSTOM_CHAIN = '-N %s'
_DEFAULT_ACTION = 'DROP'
_TERM = Term
_TERM_MAX_LENGTH = 24
_GOOD_FILTERS = ['INPUT', 'OUTPUT', 'FORWARD']
_GOOD_OPTIONS = ['nostate', 'abbreviateterms', 'truncateterms', 'noverbose']
def _BuildTokens(self):
if len(dest_addr_chunks) > 1:
rule['name'] = '%s-%d' % (rule['name'], i+1)
rule['destinationRanges'] = [str(daddr) for daddr in chunk]
rules.append(rule)
else:
rules.append(proto_dict)
# Sanity checking term name lengths.
long_rules = [rule['name'] for rule in rules if len(rule['name']) > 63]
if long_rules:
raise GceFirewallError(
'GCE firewall name ended up being too long: %s' % long_rules)
return rules
class GCE(aclgenerator.ACLGenerator):
"""A GCE firewall policy object."""
_PLATFORM = 'gce'
SUFFIX = '.gce'
_SUPPORTED_AF = set(('inet'))
# Supported is 63 but we need to account for dynamic updates when the term
# is rendered (which can add proto and a counter).
_TERM_MAX_LENGTH = 53
_GOOD_DIRECTION = ['INGRESS', 'EGRESS']
_OPTIONAL_SUPPORTED_KEYWORDS = set(['expiration',
'destination_tag',
'source_tag'])
def _BuildTokens(self):
"""Build supported tokens for platform.
norm_ports = []
if norm_ports:
if len(norm_ports) == 1:
portstrings.append('--%sport %s' % (direction, norm_ports[0]))
else:
portstrings.append('-m multiport --%sports %s' %
(direction, ','.join(norm_ports)))
return portstrings
def _SetDefaultAction(self):
"""If term does not specify action, use filter default action."""
if not self.term.action:
self.term.action[0].value = self.default_action
class Iptables(aclgenerator.ACLGenerator):
"""Generates filters and terms from provided policy object."""
_PLATFORM = 'iptables'
_DEFAULT_PROTOCOL = 'all'
SUFFIX = ''
_RENDER_PREFIX = None
_RENDER_SUFFIX = None
_DEFAULTACTION_FORMAT = '-P %s %s'
_DEFAULTACTION_FORMAT_CUSTOM_CHAIN = '-N %s'
_DEFAULT_ACTION = 'DROP'
_TERM = Term
_TERM_MAX_LENGTH = 24
_GOOD_FILTERS = ['INPUT', 'OUTPUT', 'FORWARD']
def _BuildTokens(self):
"""Build supported tokens for platform.
elif isinstance(el, int):
return str(el)
# type is a tuple below here
elif el[0] == el[1]:
return '%d' % el[0]
else:
return '%d-%d' % (el[0], el[1])
if len(group) > 1:
rval = '[ ' + ' '.join([_FormattedGroup(x) for x in group]) + ' ];'
else:
rval = _FormattedGroup(group[0]) + ';'
return rval
class Juniper(aclgenerator.ACLGenerator):
"""JCL rendering class.
This class takes a policy object and renders the output into a syntax
which is understood by juniper routers.
Args:
pol: policy.Policy object
"""
_PLATFORM = 'juniper'
_DEFAULT_PROTOCOL = 'ip'
_SUPPORTED_AF = set(('inet', 'inet6', 'bridge'))
_TERM = Term
SUFFIX = '.jcl'
def _BuildTokens(self):
daddr = 'net-group %s' % daddr
# fix ports
if sport:
sport = ' port-group %d-%d' % (sport[0], sport[1])
else:
sport = ''
if dport:
dport = ' port-group %d-%d' % (dport[0], dport[1])
else:
dport = ''
return (' %s %s %s%s %s%s' % (
action, proto, saddr, sport, daddr, dport)).rstrip()
class Cisco(aclgenerator.ACLGenerator):
"""A cisco policy object."""
_PLATFORM = 'cisco'
_DEFAULT_PROTOCOL = 'ip'
SUFFIX = '.acl'
# Protocols should be emitted as numbers.
_PROTO_INT = True
_TERM_REMARK = True
def _BuildTokens(self):
"""Build supported tokens for platform.
Returns:
tuple containing both supported tokens and sub tokens
"""
supported_tokens, supported_sub_tokens = super(Cisco, self)._BuildTokens()
port_list = []
for port_tuple in ports:
if port_tuple[0] == port_tuple[1]:
port_list.append(str(port_tuple[0]))
else:
port_list.append('%s:%s' % (port_tuple[0], port_tuple[1]))
return '{ %s }' % (
' '.join(list(collections.OrderedDict.fromkeys(port_list))))
def _SetDefaultAction(self):
"""If term does not specify action, use filter default action."""
if not self.term.action:
self.term.action[0].value = self.default_action
class PacketFilter(aclgenerator.ACLGenerator):
"""Generates filters and terms from provided policy object."""
_DEF_MAX_LENGTH = 31
_PLATFORM = 'packetfilter'
_DEFAULT_PROTOCOL = 'all'
SUFFIX = '.pf'
_TERM = Term
def _BuildTokens(self):
"""Build supported tokens for platform.
Returns:
tuple containing both supported tokens and sub tokens
"""
supported_tokens, supported_sub_tokens = super(
PacketFilter, self)._BuildTokens()
ret_str: an array of strings that will eventually be joined to form
the string output for the term.
"""
pass
def _HandlePostRule(self, ret_str):
"""Perform any port-cartesian product transforms on the ret_str array.
Args:
ret_str: an array of strings that will eventually be joined to form
the string output for the term.
"""
pass
class WindowsGenerator(aclgenerator.ACLGenerator):
"""Generates filters and terms from provided policy object."""
_PLATFORM = 'windows'
_DEFAULT_PROTOCOL = 'all'
SUFFIX = '.bat'
_RENDER_PREFIX = None
_DEFAULT_ACTION = 'block'
_TERM = Term
_GOOD_AFS = ['inet', 'inet6']
def _BuildTokens(self):
"""Build supported tokens for platform.
Returns:
tuple containing both supported tokens and sub tokens