Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
}
],
'KeySchema': [
{
'KeyType': 'HASH',
'AttributeName': 'key1'
},
{
'KeyType': 'RANGE',
'AttributeName': 'key2'
}
]
}
with patch(PATCH_METHOD) as req:
req.side_effect = BotoCoreError
self.assertRaises(TableError, conn.create_table, self.test_table_name, **kwargs)
with patch(PATCH_METHOD) as req:
req.return_value = None
conn.create_table(
self.test_table_name,
**kwargs
)
self.assertEqual(req.call_args[0][1], params)
kwargs['global_secondary_indexes'] = [
{
'index_name': 'alt-index',
'key_schema': [
{
'KeyType': 'HASH',
'AttributeName': 'AltKey'
def get_item_attribute_map(self, table_name, attributes, item_key=ITEM, pythonic_key=True):
"""
Builds up a dynamodb compatible AttributeValue map
"""
tbl = self.get_meta_table(table_name)
if tbl is None:
raise TableError("No such table {}".format(table_name))
return tbl.get_item_attribute_map(
attributes,
item_key=item_key,
pythonic_key=pythonic_key)
global_secondary_indexes_list = []
for index in global_secondary_index_updates:
global_secondary_indexes_list.append({
UPDATE: {
INDEX_NAME: index.get(pythonic(INDEX_NAME)),
PROVISIONED_THROUGHPUT: {
READ_CAPACITY_UNITS: index.get(pythonic(READ_CAPACITY_UNITS)),
WRITE_CAPACITY_UNITS: index.get(pythonic(WRITE_CAPACITY_UNITS))
}
}
})
operation_kwargs[GLOBAL_SECONDARY_INDEX_UPDATES] = global_secondary_indexes_list
try:
return self.dispatch(UPDATE_TABLE, operation_kwargs)
except BOTOCORE_EXCEPTIONS as e:
six.raise_from(TableError("Failed to update table: {}".format(e), e), None)
def get_meta_table(self, table_name, refresh=False):
"""
Returns a MetaTable
"""
if table_name not in self._tables or refresh:
operation_kwargs = {
TABLE_NAME: table_name
}
try:
data = self.dispatch(DESCRIBE_TABLE, operation_kwargs)
self._tables[table_name] = MetaTable(data.get(TABLE_KEY))
except BotoCoreError as e:
six.raise_from(TableError("Unable to describe table: {}".format(e), e), None)
except ClientError as e:
if 'ResourceNotFound' in e.response['Error']['Code']:
six.raise_from(TableDoesNotExist(e.response['Error']['Message']), None)
else:
raise
return self._tables[table_name]
def get_attribute_type(self, table_name, attribute_name, value=None):
"""
Returns the proper attribute type for a given attribute name
:param value: The attribute value an be supplied just in case the type is already included
"""
tbl = self.get_meta_table(table_name)
if tbl is None:
raise TableError("No such table {}".format(table_name))
return tbl.get_attribute_type(attribute_name, value=value)
return_consumed_capacity=None,
scan_index_forward=None,
select=None):
"""
Performs the Query operation and returns the result
"""
self._check_condition('range_key_condition', range_key_condition)
self._check_condition('filter_condition', filter_condition)
operation_kwargs = {TABLE_NAME: table_name}
name_placeholders = {}
expression_attribute_values = {}
tbl = await self.get_meta_table(table_name)
if tbl is None:
raise TableError("No such table: {}".format(table_name))
if index_name:
if not tbl.has_index_name(index_name):
raise ValueError("Table {} has no index: {}".format(table_name, index_name))
hash_keyname = tbl.get_index_hash_keyname(index_name)
if not hash_keyname:
raise ValueError("No hash key attribute for index: {}".format(index_name))
range_keyname = tbl.get_index_range_keyname(index_name)
else:
hash_keyname = tbl.hash_keyname
range_keyname = tbl.range_keyname
hash_condition_value = {
await self.get_attribute_type(table_name, hash_keyname, hash_key): self.parse_attribute(hash_key)
}
key_condition = getattr(Path([hash_keyname]), '__eq__')(hash_condition_value)
def get_identifier_map(self, table_name, hash_key, range_key=None, key=KEY):
"""
Builds the identifier map that is common to several operations
"""
tbl = self.get_meta_table(table_name)
if tbl is None:
raise TableError("No such table {}".format(table_name))
return tbl.get_identifier_map(hash_key, range_key=range_key, key=key)