How to use the pynamodb.connection.TableConnection function in pynamodb

To help you get started, we’ve selected a few pynamodb examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_put_item(self):
        """
        TableConnection.put_item
        """
        conn = TableConnection(self.test_table_name)
        with patch(PATCH_METHOD) as req:
            req.return_value = DESCRIBE_TABLE_DATA
            conn.describe_table()

        with patch(PATCH_METHOD) as req:
            req.return_value = {}
            conn.put_item(
                'foo-key',
                range_key='foo-range-key',
                attributes={'ForumName': 'foo-value'}
            )
            params = {
                'ReturnConsumedCapacity': 'TOTAL',
                'TableName': self.test_table_name,
                'Item': {'ForumName': {'S': 'foo-value'}, 'Subject': {'S': 'foo-range-key'}}
            }
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_connection_session_set_credentials(self):
        conn = TableConnection(
            self.test_table_name,
            aws_access_key_id='access_key_id',
            aws_secret_access_key='secret_access_key')

        credentials = conn.connection.session.get_credentials()

        self.assertEqual(credentials.access_key, 'access_key_id')
        self.assertEqual(credentials.secret_key, 'secret_access_key')
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_create_connection(self):
        """
        TableConnection()
        """
        conn = TableConnection(self.test_table_name)
        self.assertIsNotNone(conn)
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_delete_item(self):
        """
        TableConnection.delete_item
        """
        conn = TableConnection(self.test_table_name)
        with patch(PATCH_METHOD) as req:
            req.return_value = DESCRIBE_TABLE_DATA
            conn.describe_table()

        with patch(PATCH_METHOD) as req:
            req.return_value = {}
            conn.delete_item(
                "Amazon DynamoDB",
                "How do I update multiple items?")
            params = {
                'ReturnConsumedCapacity': 'TOTAL',
                'Key': {
                    'ForumName': {
                        'S': 'Amazon DynamoDB'
                    },
                    'Subject': {
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_connection_session_set_credentials_with_session_token(self):
        conn = TableConnection(
            self.test_table_name,
            aws_access_key_id='access_key_id',
            aws_secret_access_key='secret_access_key',
            aws_session_token='session_token')

        credentials = conn.connection.session.get_credentials()

        self.assertEqual(credentials.access_key, 'access_key_id')
        self.assertEqual(credentials.secret_key, 'secret_access_key')
        self.assertEqual(credentials.token, 'session_token')
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_scan(self):
        """
        TableConnection.scan
        """
        conn = TableConnection(self.test_table_name)
        with patch(PATCH_METHOD) as req:
            req.return_value = DESCRIBE_TABLE_DATA
            conn.describe_table()
        with patch(PATCH_METHOD) as req:
            req.return_value = HttpOK(), {}
            conn.scan()
            params = {
                'ReturnConsumedCapacity': 'TOTAL',
                'TableName': self.test_table_name
            }
            self.assertEqual(req.call_args[0][1], params)
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
params = {
                'ProvisionedThroughput': {
                    'WriteCapacityUnits': 2,
                    'ReadCapacityUnits': 2
                },
                'TableName': self.test_table_name
            }
            conn.update_table(
                read_capacity_units=2,
                write_capacity_units=2
            )
            self.assertEqual(req.call_args[0][1], params)

        with patch(PATCH_METHOD) as req:
            req.return_value = HttpOK(), None
            conn = TableConnection(self.test_table_name)

            global_secondary_index_updates = [
                {
                    "index_name": "foo-index",
                    "read_capacity_units": 2,
                    "write_capacity_units": 2
                }
            ]
            params = {
                'TableName': self.test_table_name,
                'ProvisionedThroughput': {
                    'ReadCapacityUnits': 2,
                    'WriteCapacityUnits': 2,
                },
                'GlobalSecondaryIndexUpdates': [
                    {
github pynamodb / PynamoDB / tests / test_table_connection.py View on Github external
def test_batch_get_item(self):
        """
        TableConnection.batch_get_item
        """
        items = []
        conn = TableConnection(self.test_table_name)
        for i in range(10):
            items.append(
                {"ForumName": "FooForum", "Subject": "thread-{}".format(i)}
            )
        with patch(PATCH_METHOD) as req:
            req.return_value = DESCRIBE_TABLE_DATA
            conn.describe_table()

        with patch(PATCH_METHOD) as req:
            req.return_value = {}
            conn.batch_get_item(
                items
            )
            params = {
                'ReturnConsumedCapacity': 'TOTAL',
                'RequestItems': {
github adamchainz / dynamodb_utils / ddb-loader.py View on Github external
if desc is None:
        print "Table does not exist - creating table '{}'...".format(table_name)

        table_kwargs = {
            'table_name': table_name
        }
        name, attr = parse_hash_key(hash_key)
        table_kwargs[name] = attr

        if range_key:
            name, attr = parse_range_key(range_key)
            table_kwargs[name] = attr

        # Dynamically create table model
        table = type('table', (Model,), table_kwargs)
        table.connection = TableConnection(table_name=table_name, host=host, region=region)
        table.create_table(
            read_capacity_units=1000,
            write_capacity_units=1000,
            wait=True
        )

    desc = connection.describe_table(table_name)

    queue = multiprocessing.Queue()
    pool = multiprocessing.Pool(
        initializer=load_init,
        initargs=(queue,)
    )
    for filename in load_files:
        pool.apply_async(load_file, [host, region, table_name, filename])