Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_subsequent_client_is_cached_when_credentials_truthy(self):
with patch('pynamodb.connection.Connection.session') as session_mock:
session_mock.create_client.return_value._request_signer._credentials = True
conn = Connection()
# make two calls to .client property, expect one call to create client
self.assertIsNotNone(conn.client)
self.assertIsNotNone(conn.client)
self.assertEqual(
session_mock.create_client.mock_calls.count(mock.call('dynamodb', 'us-east-1', endpoint_url=None, config=mock.ANY)),
1
)
def test_create_connection(self):
"""
Connection()
"""
conn = Connection()
self.assertIsNotNone(conn)
conn = Connection(host='http://foohost')
self.assertIsNotNone(conn.client)
self.assertIsNotNone(conn)
self.assertEqual(repr(conn), "Connection<{}>".format(conn.host))
status_code=503,
headers={},
raw='',
)
bad_response._content = 'not_json'.encode('utf-8')
prepared_request = AWSRequest('GET', 'http://lyft.com').prepare()
send_mock = client_mock._endpoint.http_session.send
send_mock.side_effect = [
bad_response,
botocore.exceptions.ReadTimeoutError(endpoint_url='http://lyft.com'),
bad_response,
deserializable_response,
]
c = Connection()
c._max_retry_attempts_exception = 3
c._create_prepared_request = mock.Mock()
c._create_prepared_request.return_value = prepared_request
c._make_api_call('DescribeTable', {'TableName': 'MyTable'})
self.assertEqual(len(send_mock.mock_calls), 4)
for call in send_mock.mock_calls:
self.assertEqual(call[1][0], prepared_request)
def test_get_item(self):
"""
Connection.get_item
"""
conn = Connection(self.region)
table_name = 'Thread'
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn.describe_table(table_name)
with patch(PATCH_METHOD) as req:
req.return_value = GET_ITEM_DATA
item = conn.get_item(table_name, "Amazon DynamoDB", "How do I update multiple items?")
self.assertEqual(item, GET_ITEM_DATA)
with patch(PATCH_METHOD) as req:
req.side_effect = BotoCoreError
self.assertRaises(
GetError,
conn.get_item,
table_name,
def test_update_table(self):
"""
Connection.update_table
"""
with patch(PATCH_METHOD) as req:
req.return_value = None
conn = Connection(self.region)
params = {
'ProvisionedThroughput': {
'WriteCapacityUnits': 2,
'ReadCapacityUnits': 2
},
'TableName': 'ci-table'
}
conn.update_table(
self.test_table_name,
read_capacity_units=2,
write_capacity_units=2
)
self.assertEqual(req.call_args[0][1], params)
self.assertRaises(ValueError, conn.update_table, self.test_table_name, read_capacity_units=2)
def test_batch_get_item(self):
"""
Connection.batch_get_item
"""
items = []
conn = Connection()
table_name = 'Thread'
for i in range(10):
items.append(
{"ForumName": "FooForum", "Subject": "thread-{}".format(i)}
)
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn.describe_table(table_name)
with patch(PATCH_METHOD) as req:
req.side_effect = BotoCoreError
self.assertRaises(
GetError,
conn.batch_get_item,
table_name,
items,
def load_file(host, region, table_name, filename):
proc = multiprocessing.current_process()
queue = proc.queue
connection = Connection(host=host, region=region)
if filename.endswith('.gz'):
opener = gzip.GzipFile
else:
opener = open
with opener(filename, 'r') as infile:
with BatchPutManager(connection, table_name) as batch:
for line in infile:
item = json.loads(line)
batch.put(item)
queue.put(1)
queue.put('complete')
def main(host, region, table_name, load_files, hash_key, range_key):
connection = Connection(host=host, region=region)
desc = connection.describe_table(table_name)
if desc is None:
print "Table does not exist - creating table '{}'...".format(table_name)
table_kwargs = {
'table_name': table_name
}
name, attr = parse_hash_key(hash_key)
table_kwargs[name] = attr
if range_key:
name, attr = parse_range_key(range_key)
table_kwargs[name] = attr
# Dynamically create table model
def load_part(filename):
try:
connection = Connection(host=config['host'], region=config['region'])
if filename.endswith('.gz'):
opener = gzip.GzipFile
else:
opener = open
with opener(filename, 'r') as infile:
with BatchPutManager(connection, config['table_name']) as batch:
for line in infile:
item = json.loads(line)
batch.put(item)
config['queue'].put(1)
config['queue'].put('complete')
except Exception as e:
print('Unhandled exception: {0}'.format(e))