Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import datetime
import re
from string import Template # pylint: disable=g-importing-member
from capirca.lib import aclgenerator
from capirca.lib import nacaddr
from absl import logging
class Term(aclgenerator.Term):
"""Generate Iptables policy terms."""
# Validate that term does not contain any fields we do not
# support. This prevents us from thinking that our output is
# correct in cases where we've omitted fields from term.
_PLATFORM = 'iptables'
_POSTJUMP_FORMAT = None
_PREJUMP_FORMAT = Template('-A $filter -j $term')
_TERM_FORMAT = Template('-N $term')
_COMMENT_FORMAT = Template('-A $term -m comment --comment "$comment"')
_FILTER_TOP_FORMAT = Template('-A $term')
_LOG_FORMAT = Template('-j LOG --log-prefix $term')
_PROTO_TABLE = {
'icmpv6': '-p ipv6-icmp',
'icmp': '-p icmp',
'tcp': '-p tcp',
if not port:
continue
port_key = '%s-%s' % (port[0], port[1])
if port_key not in ports:
ports[port_key] = True
ret_str.append('object-group port %s' % port_key)
if port[0] != port[1]:
ret_str.append(' range %d %d' % (port[0], port[1]))
else:
ret_str.append(' eq %d' % port[0])
ret_str.append('exit\n')
return '\n'.join(ret_str)
class Term(aclgenerator.Term):
"""A single ACL Term."""
ALLOWED_PROTO_STRINGS = ['eigrp', 'gre', 'icmp', 'igmp', 'igrp', 'ip',
'ipinip', 'nos', 'pim', 'tcp', 'udp',
'sctp', 'ahp']
COMMENT_MAX_WIDTH = 70
def __init__(self, term, af=4, proto_int=True, enable_dsmo=False,
term_remark=True, platform='cisco', verbose=True):
super(Term, self).__init__(term)
self.term = term
self.proto_int = proto_int
self.options = []
self.enable_dsmo = enable_dsmo
self.term_remark = term_remark
self.platform = platform
self.verbose = verbose
Args:
port_num: integer representing the port number.
proto: string representing proto (tcp, udp, etc).
platform: string representing platform (cisco, arista)
Returns:
A name of the protocol or the port number that was provided.
"""
try:
port_map = PortMap._PORT_MAP[platform][proto]
return port_map[port_num]
except KeyError:
return port_num
class Term(aclgenerator.Term):
"""A single ACL Term."""
ALLOWED_PROTO_STRINGS = ['eigrp', 'gre', 'icmp', 'igmp', 'igrp', 'ip',
'ipinip', 'nos', 'pim', 'tcp', 'udp',
'sctp', 'ahp']
COMMENT_MAX_WIDTH = 70
def __init__(self, term, af=4, proto_int=True, enable_dsmo=False,
term_remark=True, platform='cisco', verbose=True):
super(Term, self).__init__(term)
self.term = term
self.proto_int = proto_int
self.options = []
self.enable_dsmo = enable_dsmo
self.term_remark = term_remark
self.platform = platform
self.verbose = verbose
# Firewall rule name has to match specific RE:
# The first character must be a lowercase letter, and all following characters
# must be a dash, lowercase letter, or digit, except the last character, which
# cannot be a dash.
# Details: https://cloud.google.com/compute/docs/reference/latest/firewalls
_TERM_NAME_RE = re.compile(r'^[a-z]([-a-z0-9]*[a-z0-9])?$')
# Protocols allowed by name from:
# https://cloud.google.com/vpc/docs/firewalls#protocols_and_ports
_ALLOW_PROTO_NAME = frozenset(
['tcp', 'udp', 'icmp', 'esp', 'ah', 'ipip', 'sctp',
'all' # Needed for default deny, do not use in policy file.
])
# Any protocol not in _ALLOW_PROTO_NAME must be passed by number.
ALWAYS_PROTO_NUM = set(aclgenerator.Term.PROTO_MAP.keys()) - _ALLOW_PROTO_NAME
def __init__(self, term):
super(Term, self).__init__(term)
self.term = term
self._validateDirection()
if self.term.source_address_exclude and not self.term.source_address:
raise GceFirewallError(
'GCE firewall does not support address exclusions without a source '
'address list.')
if (not self.term.source_address and
not self.term.source_tag) and self.term.direction == 'INGRESS':
raise GceFirewallError(
'GCE firewall needs either to specify source address or source tags.')
if self.term.source_port:
raise GceFirewallError(
class UnsupportedNsxvAccessListError(Error):
"""Raised when we're give a non named access list."""
pass
class NsxvAclTermError(Error):
"""Raised when there is a problem in a nsxv access list."""
pass
class NsxvDuplicateTermError(Error):
"""Raised when there is a duplicate."""
pass
class Term(aclgenerator.Term):
"""Creates a single ACL Term for Nsxv."""
def __init__(self, term, filter_type, applied_to=None, af=4):
self.term = term
# Our caller should have already verified the address family.
assert af in (4, 6)
self.af = af
self.filter_type = filter_type
self.applied_to = applied_to
def __str__(self):
"""Convert term to a rule string.
Returns:
A rule as a string.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import datetime
import re
from string import Template # pylint: disable=g-importing-member
from capirca.lib import aclgenerator
from capirca.lib import nacaddr
from absl import logging
class Term(aclgenerator.Term):
"""Generate Iptables policy terms."""
# Validate that term does not contain any fields we do not
# support. This prevents us from thinking that our output is
# correct in cases where we've omitted fields from term.
_PLATFORM = 'iptables'
_POSTJUMP_FORMAT = None
_PREJUMP_FORMAT = Template('-A $filter -j $term')
_TERM_FORMAT = Template('-N $term')
_COMMENT_FORMAT = Template('-A $term -m comment --comment "$comment"')
_FILTER_TOP_FORMAT = Template('-A $term')
_LOG_FORMAT = Template('-j LOG --log-prefix $term')
_PROTO_TABLE = {
'icmpv6': '-p ipv6-icmp',
'icmp': '-p icmp',
'tcp': '-p tcp',
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import datetime
from string import Template
from capirca.lib import aclgenerator
from capirca.lib import nacaddr
from absl import logging
CMD_PREFIX = 'netsh ipsec static add '
class Term(aclgenerator.Term):
"""Generate generic windows policy terms."""
_PLATFORM = 'windows'
_COMMENT_FORMAT = Template(': $comment')
# filter rules
_ACTION_TABLE = {}
def __init__(self, term, filter_name, filter_action, af='inet'):
"""Setup a new term.
Args:
term: A policy.Term object to represent in windows_ipsec.
filter_name: The name of the filter chan to attach the term to.
filter_action: The default action of the filter.
# limitations under the License.
#
"""Demo generator for capirca."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import datetime
import logging
from capirca.lib import aclgenerator
class Term(aclgenerator.Term):
"""Used to create an individual term.
The __str__ method must be implemented.
Args: term policy.Term object
This is created to be a demo.
"""
_ACTIONS = {'accept': 'allow',
'deny': 'discard',
'reject': 'say go away to',
'next': 'pass it onto the next term',
'reject-with-tcp-rst': 'reset'
}
def __init__(self, term, term_type):
from absl import logging
class Error(Exception):
"""Base error class."""
class InvalidTargetOption(Error):
"""Raised when target specification is invalid."""
class InvalidAddressFamily(Error):
"""Raised when address family specification is invalid."""
class Term(aclgenerator.Term):
"""Representation of an individual nftables term.
This is mostly useful for the __str__() method.
"""
_PLATFORM = 'nftables'
_ACTIONS = {'accept': 'accept',
'deny': 'drop',
'reject': 'reject',
'next': 'continue',
'reject-with-tcp-rst': 'reject with tcp reset'}
MAX_CHARACTERS = 128
def __init__(self, term, af):
"""Setup a new nftables term.
'established',
'first-fragment',
'is-fragment',
'initial',
'rst',
'sample',
'tcp-established',
},
'action': {
'accept',
'deny',
'next',
'reject',
'reject-with-tcp-rst',
},
'icmp_type': set(Term.ICMP_TYPE[4].keys() + Term.ICMP_TYPE[6].keys())
}
return supported_tokens, supported_sub_tokens