Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@gcp_conn('gce')
def list_networks(client=None, **kwargs):
"""
:rtype: ``list``
"""
return gce_list(service=client.networks(),
**kwargs)
@gcp_conn('iam')
def get_iam_policy(client=None, **kwargs):
"""
service_account='string'
"""
service_account=kwargs.pop('service_account')
resp = client.projects().serviceAccounts().getIamPolicy(
resource=service_account).execute()
# TODO(supertom): err handling, check if 'bindings' is correct
if 'bindings' in resp:
return resp['bindings']
else:
return None
@gcp_conn('gcs')
def get_bucket(client=None, **kwargs):
"""
Get bucket object.
:param client: client object to use.
:type client: Google Cloud Storage client
:returns: Bucket object
:rtype: ``object``
"""
bucket = client.lookup_bucket(kwargs['Bucket'])
return bucket
@gcp_conn('gce')
def get_subnetwork(client=None, **kwargs):
service = client.subnetworks()
req = service.get(project=kwargs['project'],
subnetwork=kwargs['Subnetwork'], region=kwargs['Region'])
resp = req.execute()
return resp
@gcp_conn('iam')
def get_serviceaccount_keys(client=None, **kwargs):
"""
service_account='string'
"""
service_account=kwargs.pop('service_account')
kwargs['name'] = service_account
return service_list(client.projects().serviceAccounts().keys(),
key_name='keys', **kwargs)
@gcp_conn('iam')
@gcp_conn('gce')
def list_subnetworks(client=None, **kwargs):
"""
:rtype: ``list``
"""
return gce_list_aggregated(service=client.subnetworks(),
key_name='subnetworks', **kwargs)
@gcp_conn('iam')
def list_serviceaccounts(client=None, **kwargs):
return service_list(service=client.projects().serviceAccounts(),
key_name='accounts', **kwargs)
@gcp_conn('iam')
@gcp_conn('crm')
def get_iam_policy(client=None, **kwargs):
# body={} workaround for https://github.com/googleapis/google-api-python-client/issues/713
req = client.projects().getIamPolicy(resource=kwargs['resource'], body={})
return req.execute()
@gcp_conn('gcs')
def list_buckets(client=None, **kwargs):
"""
List buckets for a project.
:param client: client object to use.
:type client: Google Cloud Storage client
:returns: list of dictionary reprsentation of Bucket
:rtype: ``list`` of ``dict``
"""
buckets = client.list_buckets(**kwargs)
return [b.__dict__ for b in buckets]
@gcp_conn('iam')
def get_serviceaccount(client=None, **kwargs):
"""
service_account='string'
"""
service_account=kwargs.pop('service_account')
resp = client.projects().serviceAccounts().get(
name=service_account).execute()
return resp