Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rewrite_kwargs(self):
data = {'project': 'my-project', 'foo': 'bar'}
expected_general = {'name': 'projects/my-project', 'foo': 'bar'}
actual_general = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_general, actual_general)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
def test_rewrite_kwargs(self):
data = {'project': 'my-project', 'foo': 'bar'}
expected_general = {'name': 'projects/my-project', 'foo': 'bar'}
actual_general = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_general, actual_general)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_no_change, actual_no_change)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_no_change, actual_no_change)
def test_rewrite_kwargs(self):
data = {'project': 'my-project', 'foo': 'bar'}
expected_general = {'name': 'projects/my-project', 'foo': 'bar'}
actual_general = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_general, actual_general)
data = {'project': 'my-project', 'foo': 'bar'}
expected_cloud_storage = {'foo': 'bar'}
actual_cloud_storage = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_cloud_storage, actual_cloud_storage)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('cloud', data,
module_name='storage')
self.assertEqual(expected_no_change, actual_no_change)
data = {'foo': 'bar'}
expected_no_change = {'foo': 'bar'}
actual_no_change = utils.rewrite_kwargs('general', data)
self.assertEqual(expected_no_change, actual_no_change)
def decorated_function(*args, **kwargs):
# Import here to avoid circular import issue
from cloudaux.gcp.auth import get_client
(conn_args, kwargs) = get_creds_from_kwargs(kwargs)
client_details, client = get_client(
service, service_type=service_type,
future_expiration_minutes=15, **conn_args)
if client_details:
kwargs = rewrite_kwargs(client_details['client_type'], kwargs,
client_details['module_name'])
kwargs['client'] = client
return f(*args, **kwargs)