Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
Connection.describe_table
"""
with patch(PATCH_METHOD) as req:
req.return_value = DESCRIBE_TABLE_DATA
conn = Connection(self.region)
conn.describe_table(self.test_table_name)
self.assertEqual(req.call_args[0][1], {'TableName': 'ci-table'})
with self.assertRaises(TableDoesNotExist):
with patch(PATCH_METHOD) as req:
req.side_effect = ClientError({'Error': {'Code': 'ResourceNotFoundException', 'Message': 'Not Found'}}, "DescribeTable")
conn = Connection(self.region)
conn.describe_table(self.test_table_name)
with self.assertRaises(TableDoesNotExist):
with patch(PATCH_METHOD) as req:
req.side_effect = ValueError()
conn = Connection(self.region)
conn.describe_table(self.test_table_name)
async def exists(cls):
"""
Returns True if this table exists, False otherwise
"""
try:
await cls._get_connection().describe_table()
return True
except TableDoesNotExist:
return False
def exists(cls):
"""
Returns True if this table exists, False otherwise
"""
try:
cls._get_connection().describe_table()
return True
except TableDoesNotExist:
return False
def get_meta_table(self, table_name, refresh=False):
"""
Returns a MetaTable
"""
if table_name not in self._tables or refresh:
operation_kwargs = {
TABLE_NAME: table_name
}
try:
data = self.dispatch(DESCRIBE_TABLE, operation_kwargs)
self._tables[table_name] = MetaTable(data.get(TABLE_KEY))
except BotoCoreError as e:
six.raise_from(TableError("Unable to describe table: {}".format(e), e), None)
except ClientError as e:
if 'ResourceNotFound' in e.response['Error']['Code']:
six.raise_from(TableDoesNotExist(e.response['Error']['Message']), None)
else:
raise
return self._tables[table_name]
async def exists(cls):
"""
Returns True if this table exists, False otherwise
"""
try:
await cls._get_connection().describe_table()
return True
except TableDoesNotExist:
return False
def __init__(self, table_name):
msg = "Table does not exist: `{}`".format(table_name)
super(TableDoesNotExist, self).__init__(msg)