Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if self._PREJUMP_FORMAT:
ret_str.append(self._PREJUMP_FORMAT.substitute(filter=self.filter,
term=self.term_name))
if self.verbose:
if self.term.owner:
self.term.comment.append('Owner: %s' % self.term.owner)
# reformat long comments, if needed
#
# iptables allows individual comments up to 256 chars.
# But our generator will limit a single comment line to < 120, using:
# max = 119 - 27 (static chars in comment command) - [length of term name]
comment_max_width = 92 - len(self.term_name)
if comment_max_width < 40:
comment_max_width = 40
comments = aclgenerator.WrapWords(self.term.comment, comment_max_width)
# append comments to output
if comments and comments[0]:
for line in comments:
if not line:
continue # iptables-restore does not like 0-length comments.
# term comments
# Strip out quotes as iptables cant have nested quotes
ret_str.append(self._COMMENT_FORMAT.substitute(
filter=self.filter,
term=self.term_name,
comment=str(line).replace('\"', '')))
# Unsupported configuration; in the case of 'accept' or 'next', we
# skip the rule. In other cases, we blow up (raise an exception)
# to ensure that this is not considered valid configuration.
if self.term.source_prefix or self.term.destination_prefix:
# Verify platform specific terms. Skip whole term if platform does not
# match.
if self.term.platform:
if self._PLATFORM not in self.term.platform:
return ''
if self.term.platform_exclude:
if self._PLATFORM in self.term.platform_exclude:
return ''
ret_str = []
self._SetDefaultAction()
# Create a new term
ret_str.append('\n# term %s' % self.term.name)
comments = aclgenerator.WrapWords(self.term.comment, 80)
# append comments to output
if comments and comments[0]:
for line in comments:
ret_str.append('# %s' % str(line))
if str(self.term.action[0]) not in self._ACTION_TABLE:
raise aclgenerator.UnsupportedFilterError('%s %s %s %s' % (
'\n', self.term.name, self.term.action[0],
'action not currently supported.'))
if self.direction and str(self.direction) not in self._DIRECTION_TABLE:
raise aclgenerator.UnsupportedFilterError('%s %s %s %s' % (
'\n', self.term.name, self.term.direction,
'direction not currently supported.'))
# protocol
if self.term.protocol:
# ADDRESSBOOK
target.extend(self._GenerateAddressBook())
# POLICIES
target.IndentAppend(1, '/*')
target.extend(aclgenerator.AddRepositoryTags(self.INDENT * 1))
target.IndentAppend(1, '*/')
target.IndentAppend(1, 'replace: policies {')
for (header, terms, filter_options) in self.srx_policies:
if self._NOVERBOSE not in filter_options[4:]:
target.IndentAppend(2, '/*')
target.extend([self.INDENT * 2 + line for line in
aclgenerator.WrapWords(header.comment,
self._MAX_HEADER_COMMENT_LENGTH)])
target.IndentAppend(2, '*/')
# ZONE DIRECTION
if filter_options[1] == 'all' and filter_options[3] == 'all':
target.IndentAppend(2, 'global {')
else:
target.IndentAppend(2, 'from-zone ' + filter_options[1] +
' to-zone ' + filter_options[3] + ' {')
# GROUPS
if header.apply_groups:
target.IndentAppend(3, JunipersrxList('apply-groups',
header.apply_groups))
# GROUPS EXCEPT
if header.apply_groups_except:
# Verify platform specific terms. Skip whole term if platform does not
# match.
if self.term.platform:
if self.platform not in self.term.platform:
return ''
if self.term.platform_exclude:
if self.platform in self.term.platform_exclude:
return ''
source_address_set = set()
destination_address_set = set()
ret_str = ['\n']
if self.verbose:
ret_str.append(' remark %s' % self.term.name)
comments = aclgenerator.WrapWords(self.term.comment,
self.COMMENT_MAX_WIDTH)
if comments and comments[0]:
for comment in comments:
ret_str.append(' remark %s' % str(comment))
# Term verbatim output - this will skip over normal term creation
# code by returning early. Warnings provided in policy.py.
if self.term.verbatim:
for next_verbatim in self.term.verbatim:
if next_verbatim[0] == self._PLATFORM:
ret_str.append(str(next_verbatim[1]))
return '\n'.join(ret_str)
# protocol
if not self.term.protocol:
protocol = ['ip']
def __str__(self):
target = []
pretty_platform = '%s%s' % (self._PLATFORM[0].upper(), self._PLATFORM[1:])
if self._RENDER_PREFIX:
target.append(self._RENDER_PREFIX)
for (header, filter_name, filter_type, default_action, terms
) in self.iptables_policies:
# Add comments for this filter
target.append('# %s %s Policy' % (pretty_platform,
header.FilterName(self._PLATFORM)))
# reformat long text comments, if needed
comments = aclgenerator.WrapWords(header.comment, 70)
if comments and comments[0]:
for line in comments:
target.append('# %s' % line)
target.append('#')
# add the p4 tags
target.extend(aclgenerator.AddRepositoryTags('# '))
target.append('# ' + filter_type)
if filter_name in self._GOOD_FILTERS:
if default_action:
target.append(self._DEFAULTACTION_FORMAT % (filter_name,
default_action))
elif self._PLATFORM == 'speedway':
# always specify the default filter states for speedway,
# if default action policy not specified for iptables, do nothing.
target.append(
"""Render config output from this term object."""
# Verify platform specific terms. Skip whole term if platform does not
# match.
if self.term.platform:
if 'srx' not in self.term.platform:
return ''
if self.term.platform_exclude:
if 'srx' in self.term.platform_exclude:
return ''
ret_str = IndentList(JuniperSRX.INDENT)
# COMMENTS
comment_max_width = 68
if self.term.owner and self.verbose:
self.term.comment.append('Owner: %s' % self.term.owner)
comments = aclgenerator.WrapWords(self.term.comment, comment_max_width)
if comments and comments[0] and self.verbose:
ret_str.IndentAppend(3, '/*')
for line in comments:
ret_str.IndentAppend(3, line)
ret_str.IndentAppend(3, '*/')
ret_str.IndentAppend(3, 'policy ' + self.term.name + ' {')
ret_str.IndentAppend(4, 'match {')
# SOURCE-ADDRESS
if self.term.source_address:
saddr_check = set()
for saddr in self.term.source_address:
saddr_check.add(saddr.parent_token)
saddr_check = sorted(saddr_check)
ret_str.IndentAppend(5, JunipersrxList('source-address', saddr_check))
else:
term_af = self.AF_MAP.get(self.filter_type)
if self.term.verbatim:
for next_verbatim in self.term.verbatim:
if next_verbatim.value[0] == _PLATFORM and next_verbatim.value[1]:
ret_str.append('%s%s' % (self._IDENT, next_verbatim.value[1]))
return '\n'.join(t for t in ret_str if t)
comments = self.term.comment[:]
if self.term.owner:
comments.append('Owner: %s' % self.term.owner)
if comments:
for line in aclgenerator.WrapWords(comments,
self._COMMENT_LINE_LENGTH):
ret_str.append('%s%s %s' % (self._IDENT, _COMMENT_MARKER, line))
src_addr_token = ''
dst_addr_token = ''
if self._SOURCE_IS_USER_OPT_STR in self.term.option:
src_addr_token = self._USER_STR
else:
if self.term.source_address:
src_addr = self.term.GetAddressOfVersion('source_address',
term_af)
if not src_addr:
return ''
src_netdest_id = '%s%s' % (self.term.name.lower(),
self.filter_name,
action,
addr.network_address,
addr.hostmask,
self.logstring,
self.dscpstring))
else:
ret_str.append('access-list %s %s %s%s%s' % (self.filter_name,
action,
'any',
self.logstring,
self.dscpstring))
else:
if self.verbose:
ret_str.append(' remark ' + self.term.name)
comments = aclgenerator.WrapWords(self.term.comment,
self.COMMENT_MAX_WIDTH)
if comments and comments[0]:
for comment in comments:
ret_str.append(' remark ' + str(comment))
action = _ACTION_TABLE.get(str(self.term.action[0]))
if v4_addresses:
for addr in v4_addresses:
if addr.prefixlen == 32:
ret_str.append(' %s host %s%s%s' % (action,
addr.network_address,
self.logstring,
self.dscpstring))
elif self.platform == 'arista':
ret_str.append(' %s %s/%s%s%s' % (action,
addr.network_address,