Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def cleanup_network(network):
"""
Delete the supplied network, first deleting any contained subnets.
"""
if network:
try:
for sn in network.subnets:
with cb_helpers.cleanup_action(lambda: cleanup_subnet(sn)):
pass
finally:
network.delete()
network.wait_for([NetworkState.UNKNOWN],
terminal_states=[NetworkState.ERROR])
def cleanup_subnet(subnet):
if subnet:
net = subnet.network
subnet.delete()
subnet.wait_for([SubnetState.UNKNOWN],
terminal_states=[SubnetState.ERROR])
self.assertTrue(
subnet.state == SubnetState.UNKNOWN,
"Subnet.state must be unknown after "
"a delete but got %s"
% subnet.state)
net.delete()
net.wait_for([NetworkState.UNKNOWN],
terminal_states=[NetworkState.ERROR])
self.assertTrue(
net.state == NetworkState.UNKNOWN,
"Network.state must be unknown after "
"a delete but got %s"
% net.state)
def cleanup_net(net):
if net:
net.delete()
net.wait_for([NetworkState.UNKNOWN],
terminal_states=[NetworkState.ERROR])
self.assertTrue(
net.state == NetworkState.UNKNOWN,
"Network.state must be unknown after "
"a delete but got %s"
% net.state)
def state(self):
if self._unknown_state:
return NetworkState.UNKNOWN
try:
return AWSNetwork._NETWORK_STATE_MAP.get(
self._vpc.state, NetworkState.UNKNOWN)
except Exception:
# Ignore all exceptions when querying state
return NetworkState.UNKNOWN
@property
def state(self):
if self._unknown_state:
return NetworkState.UNKNOWN
try:
return AWSNetwork._NETWORK_STATE_MAP.get(
self._vpc.state, NetworkState.UNKNOWN)
except Exception:
# Ignore all exceptions when querying state
return NetworkState.UNKNOWN
return subnets
def attach_gateway(self, gateway):
self._provider.os_conn.update_router(
self.id, ext_gateway_net_id=gateway.id)
def detach_gateway(self, gateway):
# TODO: OpenStack SDK Connection object doesn't appear to have a method
# for detaching/clearing the external gateway.
self._provider.neutron.remove_gateway_router(self.id)
class OpenStackInternetGateway(BaseInternetGateway):
GATEWAY_STATE_MAP = {
NetworkState.AVAILABLE: GatewayState.AVAILABLE,
NetworkState.DOWN: GatewayState.ERROR,
NetworkState.ERROR: GatewayState.ERROR,
NetworkState.PENDING: GatewayState.CONFIGURING,
NetworkState.UNKNOWN: GatewayState.UNKNOWN
}
def __init__(self, provider, gateway_net):
super(OpenStackInternetGateway, self).__init__(provider)
if isinstance(gateway_net, OpenStackNetwork):
# pylint:disable=protected-access
gateway_net = gateway_net._network
self._gateway_net = gateway_net
self._fips_container = OpenStackFloatingIPSubService(provider, self)
@property
def id(self):
cb_vol = OpenStackVolume(self._provider, os_vol)
return cb_vol
class OpenStackNetwork(BaseNetwork):
# Ref: https://github.com/openstack/neutron/blob/master/neutron/plugins/
# common/constants.py
_NETWORK_STATE_MAP = {
'PENDING_CREATE': NetworkState.PENDING,
'PENDING_UPDATE': NetworkState.PENDING,
'PENDING_DELETE': NetworkState.PENDING,
'CREATED': NetworkState.PENDING,
'INACTIVE': NetworkState.PENDING,
'DOWN': NetworkState.DOWN,
'ERROR': NetworkState.ERROR,
'ACTIVE': NetworkState.AVAILABLE
}
def __init__(self, provider, network):
super(OpenStackNetwork, self).__init__(provider)
self._network = network
self._gateway_service = OpenStackGatewaySubService(provider, self)
self._subnet_svc = OpenStackSubnetSubService(provider, self)
@property
def id(self):
return self._network.get('id', None)
@property
def name(self):
return self.id
def refresh(self):
net = self._provider.networking.networks.get(self.id)
if net:
# pylint:disable=protected-access
self._network = net._network
else:
# network no longer exists
self._network['status'] = NetworkState.UNKNOWN
# pylint:disable=protected-access
conn = self._provider._connect_ec2_region(region_name=self.id)
zones = (conn.meta.client.describe_availability_zones()
.get('AvailabilityZones', []))
return [AWSPlacementZone(self._provider, zone.get('ZoneName'),
self.id)
for zone in zones]
class AWSNetwork(BaseNetwork):
# Ref:
# docs.aws.amazon.com/AWSEC2/latest/APIReference/API_DescribeVpcs.html
_NETWORK_STATE_MAP = {
'pending': NetworkState.PENDING,
'available': NetworkState.AVAILABLE,
}
def __init__(self, provider, network):
super(AWSNetwork, self).__init__(provider)
self._vpc = network
self._gtw_container = AWSGatewayContainer(provider, self)
self._unknown_state = False
@property
def id(self):
return self._vpc.id
@property
def name(self):
return self.id
def attach_gateway(self, gateway):
self._provider.os_conn.update_router(
self.id, ext_gateway_net_id=gateway.id)
def detach_gateway(self, gateway):
# TODO: OpenStack SDK Connection object doesn't appear to have a method
# for detaching/clearing the external gateway.
self._provider.neutron.remove_gateway_router(self.id)
class OpenStackInternetGateway(BaseInternetGateway):
GATEWAY_STATE_MAP = {
NetworkState.AVAILABLE: GatewayState.AVAILABLE,
NetworkState.DOWN: GatewayState.ERROR,
NetworkState.ERROR: GatewayState.ERROR,
NetworkState.PENDING: GatewayState.CONFIGURING,
NetworkState.UNKNOWN: GatewayState.UNKNOWN
}
def __init__(self, provider, gateway_net):
super(OpenStackInternetGateway, self).__init__(provider)
if isinstance(gateway_net, OpenStackNetwork):
# pylint:disable=protected-access
gateway_net = gateway_net._network
self._gateway_net = gateway_net
self._fips_container = OpenStackFloatingIPSubService(provider, self)
@property
def id(self):
return self._gateway_net.get('id', None)