Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.assertTrue(results.is_truncated)
self.assertTrue(results.supports_total)
self.assertEqual(results.total_results, 4)
self.assertEqual(results.data, objects)
# A list with limit=2 and marker=2
results = ClientPagedResultList(self.provider, objects, 2, 2)
self.assertListEqual(results, list(itertools.islice(objects, 2, 4)))
self.assertEqual(results.marker, None)
self.assertFalse(results.is_truncated)
self.assertTrue(results.supports_total)
self.assertEqual(results.total_results, 4)
self.assertEqual(results.data, objects)
# A list with limit=2 and marker=3
results = ClientPagedResultList(self.provider, objects, 2, 3)
self.assertListEqual(results, list(itertools.islice(objects, 3, 4)))
self.assertFalse(results.is_truncated)
self.assertEqual(results.marker, None)
self.assertEqual(results.data, objects)
self.assertFalse(results.supports_server_paging, "Client paged result"
" lists should return False for server paging.")
def list(self, firewall, limit=None, marker=None):
rules = []
for fw in firewall.delegate.iter_firewalls(
firewall.name, firewall.network.name):
rule = GCPVMFirewallRule(firewall, fw['id'])
if rule.is_dummy_rule():
self._dummy_rule = rule
else:
rules.append(rule)
return ClientPagedResultList(self.provider, rules,
limit=limit, marker=marker)
def list(self, limit=None, marker=None):
key_pairs = []
for item in helpers.find_matching_metadata_items(
self.provider, GCPKeyPair.KP_TAG_REGEX):
metadata_value = json.loads(item['value'])
kp_info = GCPKeyPair.GCPKeyInfo(**metadata_value)
key_pairs.append(GCPKeyPair(self.provider, kp_info))
return ClientPagedResultList(self.provider, key_pairs,
limit=limit, marker=marker)
def list(self, network=None, limit=None, marker=None):
if network:
network_id = (network.id if isinstance(network, OpenStackNetwork)
else network)
subnets = [subnet for subnet in self if network_id ==
subnet.network_id]
else:
subnets = [OpenStackSubnet(self.provider, subnet) for subnet in
self.provider.neutron.list_subnets().get('subnets', [])]
return ClientPagedResultList(self.provider, subnets,
limit=limit, marker=marker)
def list(self, limit=None, marker=None):
# pylint:disable=protected-access
rules = [AWSVMFirewallRule(self.firewall,
TrafficDirection.INBOUND, r)
for r in self.firewall._vm_firewall.ip_permissions]
rules = rules + [
AWSVMFirewallRule(
self.firewall, TrafficDirection.OUTBOUND, r)
for r in self.firewall._vm_firewall.ip_permissions_egress]
return ClientPagedResultList(self._provider, rules,
limit=limit, marker=marker)
filter = 'network eq %s' % network.resource_url
if zone:
region_name = self._zone_to_region(zone)
else:
region_name = self.provider.region_name
subnets = []
response = (self.provider
.gcp_compute
.subnetworks()
.list(project=self.provider.project_name,
region=region_name,
filter=filter)
.execute())
for subnet in response.get('items', []):
subnets.append(GCPSubnet(self.provider, subnet))
return ClientPagedResultList(self.provider, subnets,
limit=limit, marker=marker)
def list(self, limit=None, marker=None):
networks = [OpenStackNetwork(self.provider, network)
for network in self.provider.neutron.list_networks()
.get('networks') if network]
return ClientPagedResultList(self.provider, networks,
limit=limit, marker=marker)
def list(self, firewall, limit=None, marker=None):
# pylint:disable=protected-access
rules = [OpenStackVMFirewallRule(firewall, r)
for r in firewall._vm_firewall.security_group_rules]
return ClientPagedResultList(self.provider, rules,
limit=limit, marker=marker)
def list(self, limit=None, marker=None):
firewalls = [
OpenStackVMFirewall(self.provider, fw)
for fw in self.provider.os_conn.network.security_groups()]
return ClientPagedResultList(self.provider, firewalls,
limit=limit, marker=marker)