Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
af=self.text_af))
return ''
if self.enable_dsmo:
source_address = summarizer.Summarize(source_address)
else:
# source address not set
source_address = ['any']
# destination address
if self.term.destination_address:
destination_address = self.term.GetAddressOfVersion(
'destination_address', self.af)
destination_address_exclude = self.term.GetAddressOfVersion(
'destination_address_exclude', self.af)
if destination_address_exclude:
destination_address = nacaddr.ExcludeAddrs(
destination_address,
destination_address_exclude)
if not destination_address:
logging.debug(self.NO_AF_LOG_ADDR.substitute(term=self.term.name,
direction='destination',
af=self.text_af))
return ''
if self.enable_dsmo:
destination_address = summarizer.Summarize(destination_address)
else:
# destination address not set
destination_address = ['any']
# options
opts = [str(x) for x in self.term.option]
if ((self.PROTO_MAP['tcp'] in protocol or 'tcp' in protocol)
exclude_saddr: source address exclude list of the term
term_daddr: destination address list of the term
exclude_daddr: destination address exclude list of the term
Returns:
tuple containing source address list, source exclude address list,
destination address list, destination exclude address list in
that order
"""
# source address
term_saddr_excluded = []
if not term_saddr:
term_saddr = [self._all_ips]
if exclude_saddr:
term_saddr_excluded.extend(nacaddr.ExcludeAddrs(term_saddr,
exclude_saddr))
# destination address
term_daddr_excluded = []
if not term_daddr:
term_daddr = [self._all_ips]
if exclude_daddr:
term_daddr_excluded.extend(nacaddr.ExcludeAddrs(term_daddr,
exclude_daddr))
# Just to be safe, always have a result of at least 1 to avoid * by zero
# returning incorrect results (10src*10dst=100, but 10src*0dst=0, not 10)
bailout_count = len(exclude_saddr) + len(exclude_daddr) + (
(len(self.term.source_address) or 1) *
(len(self.term.destination_address) or 1))
exclude_count = ((len(term_saddr_excluded) or 1) *
"""
# source address
term_saddr_excluded = []
if not term_saddr:
term_saddr = [self._all_ips]
if exclude_saddr:
term_saddr_excluded.extend(nacaddr.ExcludeAddrs(term_saddr,
exclude_saddr))
# destination address
term_daddr_excluded = []
if not term_daddr:
term_daddr = [self._all_ips]
if exclude_daddr:
term_daddr_excluded.extend(nacaddr.ExcludeAddrs(term_daddr,
exclude_daddr))
# Just to be safe, always have a result of at least 1 to avoid * by zero
# returning incorrect results (10src*10dst=100, but 10src*0dst=0, not 10)
bailout_count = len(exclude_saddr) + len(exclude_daddr) + (
(len(self.term.source_address) or 1) *
(len(self.term.destination_address) or 1))
exclude_count = ((len(term_saddr_excluded) or 1) *
(len(term_daddr_excluded) or 1))
# Use bailout jumps for excluded addresses if it results in fewer output
# lines than nacaddr.ExcludeAddrs() method.
if exclude_count < bailout_count:
exclude_saddr = []
exclude_daddr = []
if term_saddr_excluded:
if term_saddr_excluded:
term_saddr = term_saddr_excluded
if term_daddr_excluded:
term_daddr = term_daddr_excluded
# With many sources and destinations, iptables needs to generate the
# cartesian product of sources and destinations. If there are no
# exclude rules, this can instead be written as exclude [0/0 -
# srcs], exclude [0/0 - dsts].
v4_src_count = len([x for x in term_saddr if x.version == 4])
v4_dst_count = len([x for x in term_daddr if x.version == 4])
v6_src_count = len([x for x in term_saddr if x.version == 6])
v6_dst_count = len([x for x in term_daddr if x.version == 6])
num_pairs = v4_src_count * v4_dst_count + v6_src_count * v6_dst_count
if num_pairs > 100:
new_exclude_source = nacaddr.ExcludeAddrs([self._all_ips], term_saddr)
new_exclude_dest = nacaddr.ExcludeAddrs([self._all_ips], term_daddr)
# Invert the shortest list that does not already have exclude addresses
if len(new_exclude_source) < len(new_exclude_dest) and not exclude_saddr:
if len(new_exclude_source) + len(term_daddr) < num_pairs:
exclude_saddr = new_exclude_source
term_saddr = [self._all_ips]
elif not exclude_daddr:
if len(new_exclude_dest) + len(term_saddr) < num_pairs:
exclude_daddr = new_exclude_dest
term_daddr = [self._all_ips]
term_saddr = [x for x in term_saddr
if x.version == self.AF_MAP[self.af]]
exclude_saddr = [x for x in exclude_saddr
if x.version == self.AF_MAP[self.af]]
term_daddr = [x for x in term_daddr
if x.version == self.AF_MAP[self.af]]
af=self.text_af))
return ''
if self.enable_dsmo:
source_address = summarizer.Summarize(source_address)
else:
# source address not set
source_address = ['any']
# destination address
if self.term.destination_address:
destination_address = self.term.GetAddressOfVersion(
'destination_address', self.af)
destination_address_exclude = self.term.GetAddressOfVersion(
'destination_address_exclude', self.af)
if destination_address_exclude:
destination_address = nacaddr.ExcludeAddrs(
destination_address,
destination_address_exclude)
if not destination_address:
logging.debug(self.NO_AF_LOG_ADDR.substitute(term=self.term.name,
direction='destination',
af=self.text_af))
return ''
if self.enable_dsmo:
destination_address = summarizer.Summarize(destination_address)
else:
# destination address not set
destination_address = ['any']
# options
opts = [str(x) for x in self.term.option]
if ((self.PROTO_MAP['tcp'] in protocol or 'tcp' in protocol)
def _CalculateAddrs(self, addr_list, addr_exclude_list):
addr_list = [addr for addr in addr_list
if addr.version == self.AF_MAP[self.af]]
if addr_exclude_list:
if not addr_list:
addr_list = [self.all_ips]
addr_list = nacaddr.ExcludeAddrs(addr_list, addr_exclude_list)
return addr_list
protocol = ['ip']
elif self.term.protocol == ['hopopt']:
protocol = ['hbh']
elif self.proto_int:
protocol = [proto if proto in self.ALLOWED_PROTO_STRINGS
else self.PROTO_MAP.get(proto)
for proto in self.term.protocol]
else:
protocol = self.term.protocol
# source address
if self.term.source_address:
source_address = self.term.GetAddressOfVersion('source_address', self.af)
source_address_exclude = self.term.GetAddressOfVersion(
'source_address_exclude', self.af)
if source_address_exclude:
source_address = nacaddr.ExcludeAddrs(
source_address,
source_address_exclude)
if not source_address:
logging.debug(self.NO_AF_LOG_ADDR.substitute(term=self.term.name,
direction='source',
af=self.text_af))
return ''
if self.enable_dsmo:
source_address = summarizer.Summarize(source_address)
else:
# source address not set
source_address = ['any']
# destination address
if self.term.destination_address:
destination_address = self.term.GetAddressOfVersion(