Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list(self, limit=None, marker=None):
# pylint:disable=protected-access
rules = [AWSVMFirewallRule(self.firewall,
TrafficDirection.INBOUND, r)
for r in self.firewall._vm_firewall.ip_permissions]
rules = rules + [
AWSVMFirewallRule(
self.firewall, TrafficDirection.OUTBOUND, r)
for r in self.firewall._vm_firewall.ip_permissions_egress]
return ClientPagedResultList(self._provider, rules,
limit=limit, marker=marker)
def create(self, direction, protocol=None, from_port=None,
to_port=None, cidr=None, src_dest_fw=None):
src_dest_fw_id = (
src_dest_fw.id if isinstance(src_dest_fw, AWSVMFirewall)
else src_dest_fw)
# pylint:disable=protected-access
ip_perm_entry = AWSVMFirewallRule._construct_ip_perms(
protocol, from_port, to_port, cidr, src_dest_fw_id)
# Filter out empty values to please Boto
ip_perms = [trim_empty_params(ip_perm_entry)]
try:
if direction == TrafficDirection.INBOUND:
# pylint:disable=protected-access
self.firewall._vm_firewall.authorize_ingress(
IpPermissions=ip_perms)
elif direction == TrafficDirection.OUTBOUND:
# pylint:disable=protected-access
self.firewall._vm_firewall.authorize_egress(
IpPermissions=ip_perms)
else:
raise InvalidValueException("direction", direction)
self.firewall.refresh()
def __init__(self, parent_fw, direction, rule):
self._direction = direction
super(AWSVMFirewallRule, self).__init__(parent_fw, rule)
# cache id
md5 = hashlib.md5()
md5.update(self._name.encode('ascii'))
self._id = md5.hexdigest()
def list(self, limit=None, marker=None):
# pylint:disable=protected-access
rules = [AWSVMFirewallRule(self.firewall,
TrafficDirection.INBOUND, r)
for r in self.firewall._vm_firewall.ip_permissions]
rules = rules + [
AWSVMFirewallRule(
self.firewall, TrafficDirection.OUTBOUND, r)
for r in self.firewall._vm_firewall.ip_permissions_egress]
return ClientPagedResultList(self._provider, rules,
limit=limit, marker=marker)
try:
if direction == TrafficDirection.INBOUND:
# pylint:disable=protected-access
self.firewall._vm_firewall.authorize_ingress(
IpPermissions=ip_perms)
elif direction == TrafficDirection.OUTBOUND:
# pylint:disable=protected-access
self.firewall._vm_firewall.authorize_egress(
IpPermissions=ip_perms)
else:
raise InvalidValueException("direction", direction)
self.firewall.refresh()
return AWSVMFirewallRule(self.firewall, direction, ip_perm_entry)
except ClientError as ec2e:
if ec2e.response['Error']['Code'] == "InvalidPermission.Duplicate":
return AWSVMFirewallRule(
self.firewall, direction, ip_perm_entry)
else:
raise ec2e
# Filter out empty values to please Boto
ip_perms = [trim_empty_params(ip_perm_entry)]
try:
if direction == TrafficDirection.INBOUND:
# pylint:disable=protected-access
self.firewall._vm_firewall.authorize_ingress(
IpPermissions=ip_perms)
elif direction == TrafficDirection.OUTBOUND:
# pylint:disable=protected-access
self.firewall._vm_firewall.authorize_egress(
IpPermissions=ip_perms)
else:
raise InvalidValueException("direction", direction)
self.firewall.refresh()
return AWSVMFirewallRule(self.firewall, direction, ip_perm_entry)
except ClientError as ec2e:
if ec2e.response['Error']['Code'] == "InvalidPermission.Duplicate":
return AWSVMFirewallRule(
self.firewall, direction, ip_perm_entry)
else:
raise ec2e