Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
with patch(PATCH_METHOD) as req:
req.return_value = MODEL_TABLE_DATA
item = UserModel('foo', 'bar')
with patch(PATCH_METHOD) as req:
req.return_value = {}
self.assertRaises(item.DoesNotExist, item.refresh)
with patch(PATCH_METHOD) as req:
req.return_value = GET_MODEL_ITEM_DATA
item.picture = b'to-be-removed'
item.refresh()
self.assertEqual(
item.custom_user_name,
GET_MODEL_ITEM_DATA.get(ITEM).get('user_name').get(STRING_SHORT))
self.assertIsNone(item.picture)
def test_scan_limit_with_page_size(self):
with patch(PATCH_METHOD) as req:
items = []
for idx in range(30):
item = copy.copy(GET_MODEL_ITEM_DATA.get(ITEM))
item['user_id'] = {STRING_SHORT: 'id-{}'.format(idx)}
items.append(item)
req.side_effect = [
{'Count': 10, 'ScannedCount': 20, 'Items': items[:10], 'LastEvaluatedKey': {'user_id': 'x'}},
{'Count': 10, 'ScannedCount': 20, 'Items': items[10:20], 'LastEvaluatedKey': {'user_id': 'y'}},
{'Count': 10, 'ScannedCount': 20, 'Items': items[20:30], 'LastEvaluatedKey': {'user_id': 'z'}},
]
results_iter = UserModel.scan(limit=25, page_size=10)
results = list(results_iter)
self.assertEqual(len(results), 25)
self.assertEqual(len(req.mock_calls), 3)
self.assertEqual(req.mock_calls[0][1][1]['Limit'], 10)
self.assertEqual(req.mock_calls[1][1][1]['Limit'], 10)
self.assertEqual(req.mock_calls[2][1][1]['Limit'], 10)
self.assertEqual(results_iter.last_evaluated_key, {'user_id': items[24]['user_id']})
def _update_futures(self):
for model, data in zip(self._futures, self._results):
model.update_with_raw_data(data.get(ITEM))
self.pending_operations = []
if not len(put_items) and not len(delete_items):
return
data = await self.model._get_connection().batch_write_item(
put_items=put_items,
delete_items=delete_items
)
if data is None:
return
unprocessed_items = data.get(UNPROCESSED_ITEMS, {}).get(self.model.Meta.table_name)
while unprocessed_items:
put_items = []
delete_items = []
for item in unprocessed_items:
if PUT_REQUEST in item:
put_items.append(item.get(PUT_REQUEST).get(ITEM))
elif DELETE_REQUEST in item:
delete_items.append(item.get(DELETE_REQUEST).get(KEY))
log.info("Resending %s unprocessed keys for batch operation", len(unprocessed_items))
data = await self.model._get_connection().batch_write_item(
put_items=put_items,
delete_items=delete_items
)
unprocessed_items = data.get(UNPROCESSED_ITEMS, {}).get(self.model.Meta.table_name)
async def save(self, model, condition=None, return_values=None):
operation_kwargs = await model.get_operation_kwargs_from_instance(
key=ITEM,
condition=condition,
return_values_on_condition_failure=return_values
)
self._put_items.append(operation_kwargs)
self._models_for_version_attribute_update.append(model)
def get_item_attribute_map(self, attributes, item_key=ITEM, pythonic_key=True):
"""
Builds up a dynamodb compatible AttributeValue map
"""
if pythonic_key:
item_key = item_key
attr_map = {
item_key: {}
}
for key, value in attributes.items():
# In this case, the user provided a mapping
# {'key': {'S': 'value'}}
if isinstance(value, dict):
attr_map[item_key][key] = value
else:
attr_map[item_key][key] = {
self.get_attribute_type(key): value
def get_item_attribute_map(self, table_name, attributes, item_key=ITEM, pythonic_key=True):
"""
Builds up a dynamodb compatible AttributeValue map
"""
tbl = self.get_meta_table(table_name)
if tbl is None:
raise TableError("No such table {}".format(table_name))
return tbl.get_item_attribute_map(
attributes,
item_key=item_key,
pythonic_key=pythonic_key)
def save(self, model, condition=None, return_values=None):
operation_kwargs = model.get_operation_kwargs_from_instance(
key=ITEM,
condition=condition,
return_values_on_condition_failure=return_values
)
self._put_items.append(operation_kwargs)
self._models_for_version_attribute_update.append(model)
def _update_futures(self):
for model, data in zip(self._futures, self._results):
model.update_with_raw_data(data.get(ITEM))
actions=None,
condition=None,
consistent_read=None,
return_values=None,
return_consumed_capacity=None,
return_item_collection_metrics=None,
return_values_on_condition_failure=None):
self._check_condition('condition', condition)
operation_kwargs = {}
name_placeholders = {}
expression_attribute_values = {}
operation_kwargs[TABLE_NAME] = table_name
operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key, key=key))
if attributes and operation_kwargs.get(ITEM) is not None:
attrs = self.get_item_attribute_map(table_name, attributes)
operation_kwargs[ITEM].update(attrs[ITEM])
if attributes_to_get is not None:
projection_expression = create_projection_expression(attributes_to_get, name_placeholders)
operation_kwargs[PROJECTION_EXPRESSION] = projection_expression
if condition is not None:
condition_expression = condition.serialize(name_placeholders, expression_attribute_values)
operation_kwargs[CONDITION_EXPRESSION] = condition_expression
if consistent_read is not None:
operation_kwargs[CONSISTENT_READ] = consistent_read
if return_values is not None:
operation_kwargs.update(self.get_return_values_map(return_values))
if return_values_on_condition_failure is not None:
operation_kwargs.update(self.get_return_values_on_condition_failure_map(return_values_on_condition_failure))
if return_consumed_capacity is not None:
operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity))