Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
rule['priority'] = priority_index + i
rule['match']['config']['srcIpRanges'] = [str(saddr) for saddr in chunk]
rules.append(rule)
# TODO(robankeny@): Review this log entry to make it cleaner/more useful.
# Right now, it prints the entire term which might be huge
if len(source_addr_chunks) > 1:
logging.debug('Current term [%s] was split into %d sub-terms since '
'_MAX_IP_RANGES_PER_TERM was exceeded',
str(term_dict), len(source_addr_chunks))
return rules
class CloudArmor(aclgenerator.ACLGenerator):
"""A CloudArmor policy object."""
_PLATFORM = 'cloudarmor'
SUFFIX = '.gca'
_SUPPORTED_AF = set(('inet', 'inet6', 'mixed'))
# Maximum number of rules that a CloudArmor policy can contain
_MAX_RULES_PER_POLICY = 200
# Warn user when rule count exceeds this number
_RULECOUNT_WARN_THRESHOLD = 190
# Maps indiviudal filter options to their index positions in the POL header
_FILTER_OPTIONS_MAP = {'filter_type': 0}
def _BuildTokens(self):
elif isinstance(el, int):
return str(el)
# type is a tuple below here
elif el[0] == el[1]:
return '%d' % el[0]
else:
return '%d-%d' % (el[0], el[1])
if len(group) > 1:
rval = '[ ' + ' '.join([_FormattedGroup(x) for x in group]) + ' ];'
else:
rval = _FormattedGroup(group[0]) + ';'
return rval
class Juniper(aclgenerator.ACLGenerator):
"""JCL rendering class.
This class takes a policy object and renders the output into a syntax
which is understood by juniper routers.
Args:
pol: policy.Policy object
"""
_PLATFORM = 'juniper'
_DEFAULT_PROTOCOL = 'ip'
_SUPPORTED_AF = set(('inet', 'inet6', 'bridge'))
_TERM = Term
SUFFIX = '.jcl'
def _BuildTokens(self):
# Verify platform specific terms. Skip whole term if platform does not
# match.
if self.term.platform:
if self._PLATFORM not in self.term.platform:
return ''
if self.term.platform_exclude:
if self._PLATFORM in self.term.platform_exclude:
return ''
ret_str = []
self._SetDefaultAction()
# Create a new term
ret_str.append('\n# term %s' % self.term.name)
comments = aclgenerator.WrapWords(self.term.comment, 80)
# append comments to output
if comments and comments[0]:
for line in comments:
ret_str.append('# %s' % str(line))
if str(self.term.action[0]) not in self._ACTION_TABLE:
raise aclgenerator.UnsupportedFilterError('%s %s %s %s' % (
'\n', self.term.name, self.term.action[0],
'action not currently supported.'))
if self.direction and str(self.direction) not in self._DIRECTION_TABLE:
raise aclgenerator.UnsupportedFilterError('%s %s %s %s' % (
'\n', self.term.name, self.term.direction,
'direction not currently supported.'))
# protocol
if self.term.protocol:
_TERM_ADDRESS_LIMIT = 256
# Firewall rule name has to match specific RE:
# The first character must be a lowercase letter, and all following characters
# must be a dash, lowercase letter, or digit, except the last character, which
# cannot be a dash.
# Details: https://cloud.google.com/compute/docs/reference/latest/firewalls
_TERM_NAME_RE = re.compile(r'^[a-z]([-a-z0-9]*[a-z0-9])?$')
# Protocols allowed by name from:
# https://cloud.google.com/vpc/docs/firewalls#protocols_and_ports
_ALLOW_PROTO_NAME = frozenset(
['tcp', 'udp', 'icmp', 'esp', 'ah', 'ipip', 'sctp'])
# Any protocol not in _ALLOW_PROTO_NAME must be passed by number.
ALWAYS_PROTO_NUM = set(aclgenerator.Term.PROTO_MAP.keys()) - _ALLOW_PROTO_NAME
def __init__(self, term):
super(Term, self).__init__(term)
self.term = term
self._validateDirection()
if self.term.source_address_exclude and not self.term.source_address:
raise GceFirewallError(
'GCE firewall does not support address exclusions without a source '
'address list.')
if (not self.term.source_address and
not self.term.source_tag) and self.term.direction == 'INGRESS':
raise GceFirewallError(
'GCE firewall needs either to specify source address or source tags.')
if self.term.source_port:
raise GceFirewallError(