Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_put_item(self):
"""
TableConnection.put_item
"""
conn = TableConnection(self.test_table_name)
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn.describe_table()
with patch(PATCH_METHOD) as req:
req.return_value = {}
conn.put_item(
'foo-key',
range_key='foo-range-key',
attributes={'ForumName': 'foo-value'}
)
params = {
'ReturnConsumedCapacity': 'TOTAL',
'TableName': self.test_table_name,
'Item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}}
}
def test_connection_session_set_credentials(self):
conn = TableConnection(
self.test_table_name,
aws_access_key_id='access_key_id',
aws_secret_access_key='secret_access_key')
credentials = conn.connection.session.get_credentials()
self.assertEqual(credentials.access_key, 'access_key_id')
self.assertEqual(credentials.secret_key, 'secret_access_key')
def test_create_connection(self):
"""
TableConnection()
"""
conn = TableConnection(self.test_table_name)
self.assertIsNotNone(conn)
def test_delete_item(self):
"""
TableConnection.delete_item
"""
conn = TableConnection(self.test_table_name)
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn.describe_table()
with patch(PATCH_METHOD) as req:
req.return_value = {}
conn.delete_item(
"Amazon DynamoDB",
"How do I update multiple items?")
params = {
'ReturnConsumedCapacity': 'TOTAL',
'Key': {
'ForumName': {
'S': 'Amazon DynamoDB'
},
'Subject': {
def test_connection_session_set_credentials_with_session_token(self):
conn = TableConnection(
self.test_table_name,
aws_access_key_id='access_key_id',
aws_secret_access_key='secret_access_key',
aws_session_token='session_token')
credentials = conn.connection.session.get_credentials()
self.assertEqual(credentials.access_key, 'access_key_id')
self.assertEqual(credentials.secret_key, 'secret_access_key')
self.assertEqual(credentials.token, 'session_token')
def test_scan(self):
"""
TableConnection.scan
"""
conn = TableConnection(self.test_table_name)
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn.describe_table()
with patch(PATCH_METHOD) as req:
req.return_value = HttpOK(), {}
conn.scan()
params = {
'ReturnConsumedCapacity': 'TOTAL',
'TableName': self.test_table_name
}
self.assertEqual(req.call_args[0][1], params)
params = {
'ProvisionedThroughput': {
'WriteCapacityUnits': 2,
'ReadCapacityUnits': 2
},
'TableName': self.test_table_name
}
conn.update_table(
read_capacity_units=2,
write_capacity_units=2
)
self.assertEqual(req.call_args[0][1], params)
with patch(PATCH_METHOD) as req:
req.return_value = HttpOK(), None
conn = TableConnection(self.test_table_name)
global_secondary_index_updates = [
{
"index_name": "foo-index",
"read_capacity_units": 2,
"write_capacity_units": 2
}
]
params = {
'TableName': self.test_table_name,
'ProvisionedThroughput': {
'ReadCapacityUnits': 2,
'WriteCapacityUnits': 2,
},
'GlobalSecondaryIndexUpdates': [
{
def test_batch_get_item(self):
"""
TableConnection.batch_get_item
"""
items = []
conn = TableConnection(self.test_table_name)
for i in range(10):
items.append(
{"ForumName": "FooForum", "Subject": "thread-{}".format(i)}
)
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn.describe_table()
with patch(PATCH_METHOD) as req:
req.return_value = {}
conn.batch_get_item(
items
)
params = {
'ReturnConsumedCapacity': 'TOTAL',
'RequestItems': {
if desc is None:
print "Table does not exist - creating table '{}'...".format(table_name)
table_kwargs = {
'table_name': table_name
}
name, attr = parse_hash_key(hash_key)
table_kwargs[name] = attr
if range_key:
name, attr = parse_range_key(range_key)
table_kwargs[name] = attr
# Dynamically create table model
table = type('table', (Model,), table_kwargs)
table.connection = TableConnection(table_name=table_name, host=host, region=region)
table.create_table(
read_capacity_units=1000,
write_capacity_units=1000,
wait=True
)
desc = connection.describe_table(table_name)
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(
initializer=load_init,
initargs=(queue,)
)
for filename in load_files:
pool.apply_async(load_file, [host, region, table_name, filename])