Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __remove_sub_allocate_ip_pool(self):
gateway = Environment. \
get_test_gateway(TestExtNet._sys_admin_client)
gateway_obj = Gateway(TestExtNet._sys_admin_client,
href=gateway.get('href'))
ext_net = TestExtNet._config['external_network']['name']
task = gateway_obj.remove_sub_allocated_ip_pools(
ext_net, [TestExtNet._gateway_sub_allocate_ip_pool_range])
TestExtNet._sys_admin_client.get_task_monitor(). \
wait_for_success(task=task)
def test_0025_remove_external_network(self):
"""Remove an exernal netowrk from the gateway.
Invoke the remove_external_network function of gateway.
"""
gateway_obj = Gateway(TestGateway._client, self._name,
TestGateway._gateway.get('href'))
task = gateway_obj.remove_external_network(
TestGateway._external_network2.get('name'))
result = TestGateway._client.get_task_monitor().wait_for_success(
task=task)
self.assertEqual(result.get('status'), TaskStatus.SUCCESS.value)
self._delete_external_network(TestGateway._external_network2)
def test_0095_add_firewall_rule(self):
"""Add Firewall Rule's in the gateway."""
gateway_obj = Gateway(TestGateway._client, self._name,
TestGateway._gateway.get('href'))
gateway_obj.add_firewall_rule(TestGateway._firewall_rule_name)
firewall_rule = gateway_obj.get_firewall_rules()
# Verify
matchFound = False
for firewallRule in firewall_rule.firewallRules.firewallRule:
if firewallRule['name'] == TestGateway._firewall_rule_name:
matchFound = True
break
self.assertTrue(matchFound)
def test_0040_add_sub_allocated_ip_pools(self):
"""It adds the sub allocated ip pools to gateway.
Invokes the add_sub_allocated_ip_pools of the gateway.
"""
gateway_obj = Gateway(TestGateway._client, self._name,
TestGateway._gateway.get('href'))
ip_allocations = gateway_obj.list_configure_ip_settings()
ip_allocation = ip_allocations[0]
ext_network = ip_allocation.get('external_network')
config = TestGateway._config['external_network']
gateway_sub_allocated_ip_range = \
config['gateway_sub_allocated_ip_range']
ip_range_list = [gateway_sub_allocated_ip_range]
task = gateway_obj.add_sub_allocated_ip_pools(ext_network,
ip_range_list)
result = TestGateway._client.get_task_monitor().wait_for_success(
task=task)
self.assertEqual(result.get('status'), TaskStatus.SUCCESS.value)
gateway_obj = Gateway(TestGateway._client, self._name,
TestGateway._gateway.get('href'))
def test_002_get_dhcp_pool_info(self):
"""Get the details of DHCP Pool.
Invokes the get_pool_info of the DhcpPool.
"""
gateway = Environment. \
get_test_gateway(TestDhcp._client)
gateway_obj = Gateway(TestDhcp._client,
TestDhcp._name,
href=gateway.get('href'))
dhcp_pool_list = gateway_obj.list_dhcp_pools()
pool_id = dhcp_pool_list[0]['ID']
pool_obj = DhcpPool(TestDhcp._client, self._name, pool_id)
pool_info = pool_obj.get_pool_info()
#Verify
self.assertTrue(len(pool_info) > 0)
self.assertEqual(pool_info['IPRange'], TestDhcp._pool_ip_range)
Invokes the edit_rate_limits of the gateway.
"""
gateway_obj = Gateway(TestGateway._client, self._name,
TestGateway._gateway.get('href'))
ip_allocations = gateway_obj.list_configure_ip_settings()
ip_allocation = ip_allocations[0]
ext_network = ip_allocation.get('external_network')
config = dict()
config[ext_network] = [self._rate_limit_start, self._rate_limit_end]
task = gateway_obj.edit_rate_limits(config)
result = TestGateway._client.get_task_monitor().wait_for_success(
task=task)
self.assertEqual(result.get('status'), TaskStatus.SUCCESS.value)
gateway_obj = Gateway(TestGateway._client, self._name,
TestGateway._gateway.get('href'))
for gateway_inf in \
gateway_obj.get_resource()\
.Configuration.GatewayInterfaces.GatewayInterface:
if gateway_inf.Name == ext_network:
self.assertEqual(self._rate_limit_start,
gateway_inf.InRateLimit.text)
self.assertEqual(self._rate_limit_end,
gateway_inf.OutRateLimit.text)
gateway = vdc.create_gateway_api_version_30(
TestIpSecVpn._gateway_name, [ext_config['name']])
elif float(api_version) == float(ApiVersion.VERSION_31.value):
gateway = vdc.create_gateway_api_version_31(
TestIpSecVpn._gateway_name,
[ext_config['name']],
should_create_as_advanced=True)
elif float(api_version) >= float(ApiVersion.VERSION_32.value):
gateway = vdc.create_gateway_api_version_32(
TestIpSecVpn._gateway_name, [ext_config['name']],
should_create_as_advanced=True)
TestIpSecVpn._client.get_task_monitor(). \
wait_for_success(task=gateway.Tasks.Task[0])
TestIpSecVpn._gateway_href = gateway.get('href')
TestIpSecVpn._gateway_obj = Gateway(TestIpSecVpn._client,
href=TestIpSecVpn._gateway_href)
TestIpSecVpn._gateway_resource = TestIpSecVpn. \
_gateway_obj.get_resource()
TestNatRule._name,
href=gateway.get('href'))
ip_allocations = gateway_obj.list_configure_ip_settings()
ip_allocation = ip_allocations[0]
ext_network = ip_allocation.get('external_network')
config = TestNatRule._config['external_network']
gateway_sub_allocated_ip_range = \
config['gateway_sub_allocated_ip_range']
ip_range_list = [gateway_sub_allocated_ip_range]
task = gateway_obj.add_sub_allocated_ip_pools(ext_network,
ip_range_list)
result = TestNatRule._client.get_task_monitor().wait_for_success(
task=task)
self.assertEqual(result.get('status'), TaskStatus.SUCCESS.value)
gateway_obj = Gateway(TestNatRule._client,
TestNatRule._name,
href=gateway.get('href'))
subnet_participation = self.__get_subnet_participation(
gateway_obj.get_resource(), ext_network)
ip_ranges = gateway_obj.get_sub_allocate_ip_ranges_element(
subnet_participation)
self.__validate_ip_range(ip_ranges, gateway_sub_allocated_ip_range)
def update_gw(self):
response = dict()
response['changed'] = False
gateway_name = self.params.get('gateway_name')
new_gateway_name = self.params.get('new_gateway_name')
description = self.params.get('description')
ha_enabled = self.params.get('ha_enabled')
edge_gateway_href = None
try:
gateway = self.get_gateway(gateway_name)
for key, value in gateway.items():
if key == "href":
edge_gateway_href = value
break
gateway = Gateway(
self.client, name=gateway_name, href=edge_gateway_href)
update_task = gateway.edit_gateway(newname=new_gateway_name,
desc=description, ha=ha_enabled)
self.execute_task(update_task)
msg = "Edge Gateway {0} has been updated with {1}"
response['msg'] = msg.format(gateway_name, new_gateway_name)
response['changed'] = True
except EntityNotFoundException:
msg = 'Edge Gateway {0} is not present'
response['warnings'] = msg.format(gateway_name)
return response
def __find_element(self, type, object_type, value, group_type):
"""Find element in the properties using group type.
:param str type: It can be source/destination
:param dict object_type: object types
:param str value: value
:param str group_type: group type. e.g., groupingObjectId
"""
gateway_res = Gateway(self.client, resource=self.parent)
object_list = gateway_res.list_firewall_objects(type, object_type)
for object in object_list:
if object.get('name') == value:
properties = object.get('prop')
for prop in properties:
if prop.get('name') == group_type:
return create_element(group_type, prop.get('value'))