How to use the pynamodb.constants.ITEM function in pynamodb

To help you get started, we’ve selected a few pynamodb examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github pynamodb / PynamoDB / tests / test_model.py View on Github external
"""
        with patch(PATCH_METHOD) as req:
            req.return_value = MODEL_TABLE_DATA
            item = UserModel('foo', 'bar')

        with patch(PATCH_METHOD) as req:
            req.return_value = {}
            self.assertRaises(item.DoesNotExist, item.refresh)

        with patch(PATCH_METHOD) as req:
            req.return_value = GET_MODEL_ITEM_DATA
            item.picture = b'to-be-removed'
            item.refresh()
            self.assertEqual(
                item.custom_user_name,
                GET_MODEL_ITEM_DATA.get(ITEM).get('user_name').get(STRING_SHORT))
            self.assertIsNone(item.picture)
github pynamodb / PynamoDB / tests / test_model.py View on Github external
def test_scan_limit_with_page_size(self):
        with patch(PATCH_METHOD) as req:
            items = []
            for idx in range(30):
                item = copy.copy(GET_MODEL_ITEM_DATA.get(ITEM))
                item['user_id'] = {STRING_SHORT: 'id-{}'.format(idx)}
                items.append(item)

            req.side_effect = [
                {'Count': 10, 'ScannedCount': 20, 'Items': items[:10], 'LastEvaluatedKey': {'user_id': 'x'}},
                {'Count': 10, 'ScannedCount': 20, 'Items': items[10:20], 'LastEvaluatedKey': {'user_id': 'y'}},
                {'Count': 10, 'ScannedCount': 20, 'Items': items[20:30], 'LastEvaluatedKey': {'user_id': 'z'}},
            ]
            results_iter = UserModel.scan(limit=25, page_size=10)
            results = list(results_iter)
            self.assertEqual(len(results), 25)
            self.assertEqual(len(req.mock_calls), 3)
            self.assertEqual(req.mock_calls[0][1][1]['Limit'], 10)
            self.assertEqual(req.mock_calls[1][1][1]['Limit'], 10)
            self.assertEqual(req.mock_calls[2][1][1]['Limit'], 10)
            self.assertEqual(results_iter.last_evaluated_key, {'user_id': items[24]['user_id']})
github MyMusicTaste / InPynamoDB / inpynamodb / transactions.py View on Github external
def _update_futures(self):
        for model, data in zip(self._futures, self._results):
            model.update_with_raw_data(data.get(ITEM))
github MyMusicTaste / InPynamoDB / inpynamodb / models.py View on Github external
self.pending_operations = []
        if not len(put_items) and not len(delete_items):
            return
        data = await self.model._get_connection().batch_write_item(
            put_items=put_items,
            delete_items=delete_items
        )
        if data is None:
            return
        unprocessed_items = data.get(UNPROCESSED_ITEMS, {}).get(self.model.Meta.table_name)
        while unprocessed_items:
            put_items = []
            delete_items = []
            for item in unprocessed_items:
                if PUT_REQUEST in item:
                    put_items.append(item.get(PUT_REQUEST).get(ITEM))
                elif DELETE_REQUEST in item:
                    delete_items.append(item.get(DELETE_REQUEST).get(KEY))
            log.info("Resending %s unprocessed keys for batch operation", len(unprocessed_items))
            data = await self.model._get_connection().batch_write_item(
                put_items=put_items,
                delete_items=delete_items
            )
            unprocessed_items = data.get(UNPROCESSED_ITEMS, {}).get(self.model.Meta.table_name)
github MyMusicTaste / InPynamoDB / inpynamodb / transactions.py View on Github external
async def save(self, model, condition=None, return_values=None):
        operation_kwargs = await model.get_operation_kwargs_from_instance(
            key=ITEM,
            condition=condition,
            return_values_on_condition_failure=return_values
        )
        self._put_items.append(operation_kwargs)
        self._models_for_version_attribute_update.append(model)
github pynamodb / PynamoDB / pynamodb / connection / base.py View on Github external
    def get_item_attribute_map(self, attributes, item_key=ITEM, pythonic_key=True):
        """
        Builds up a dynamodb compatible AttributeValue map
        """
        if pythonic_key:
            item_key = item_key
        attr_map = {
            item_key: {}
        }
        for key, value in attributes.items():
            # In this case, the user provided a mapping
            # {'key': {'S': 'value'}}
            if isinstance(value, dict):
                attr_map[item_key][key] = value
            else:
                attr_map[item_key][key] = {
                    self.get_attribute_type(key): value
github pynamodb / PynamoDB / pynamodb / connection / base.py View on Github external
    def get_item_attribute_map(self, table_name, attributes, item_key=ITEM, pythonic_key=True):
        """
        Builds up a dynamodb compatible AttributeValue map
        """
        tbl = self.get_meta_table(table_name)
        if tbl is None:
            raise TableError("No such table {}".format(table_name))
        return tbl.get_item_attribute_map(
            attributes,
            item_key=item_key,
            pythonic_key=pythonic_key)
github pynamodb / PynamoDB / pynamodb / transactions.py View on Github external
def save(self, model, condition=None, return_values=None):
        operation_kwargs = model.get_operation_kwargs_from_instance(
            key=ITEM,
            condition=condition,
            return_values_on_condition_failure=return_values
        )
        self._put_items.append(operation_kwargs)
        self._models_for_version_attribute_update.append(model)
github pynamodb / PynamoDB / pynamodb / transactions.py View on Github external
def _update_futures(self):
        for model, data in zip(self._futures, self._results):
            model.update_with_raw_data(data.get(ITEM))
github pynamodb / PynamoDB / pynamodb / connection / base.py View on Github external
actions=None,
                             condition=None,
                             consistent_read=None,
                             return_values=None,
                             return_consumed_capacity=None,
                             return_item_collection_metrics=None,
                             return_values_on_condition_failure=None):
        self._check_condition('condition', condition)

        operation_kwargs = {}
        name_placeholders = {}
        expression_attribute_values = {}

        operation_kwargs[TABLE_NAME] = table_name
        operation_kwargs.update(self.get_identifier_map(table_name, hash_key, range_key, key=key))
        if attributes and operation_kwargs.get(ITEM) is not None:
            attrs = self.get_item_attribute_map(table_name, attributes)
            operation_kwargs[ITEM].update(attrs[ITEM])
        if attributes_to_get is not None:
            projection_expression = create_projection_expression(attributes_to_get, name_placeholders)
            operation_kwargs[PROJECTION_EXPRESSION] = projection_expression
        if condition is not None:
            condition_expression = condition.serialize(name_placeholders, expression_attribute_values)
            operation_kwargs[CONDITION_EXPRESSION] = condition_expression
        if consistent_read is not None:
            operation_kwargs[CONSISTENT_READ] = consistent_read
        if return_values is not None:
            operation_kwargs.update(self.get_return_values_map(return_values))
        if return_values_on_condition_failure is not None:
            operation_kwargs.update(self.get_return_values_on_condition_failure_map(return_values_on_condition_failure))
        if return_consumed_capacity is not None:
            operation_kwargs.update(self.get_consumed_capacity_map(return_consumed_capacity))