Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_query_execution(self, cursor):
query = 'SELECT * FROM one_row'
query_id, future = cursor.execute(query)
result_set = future.result()
future = cursor.query_execution(query_id)
query_execution = future.result()
self.assertEqual(query_execution.database, SCHEMA)
self.assertIsNotNone(query_execution.query_id)
self.assertEqual(query_execution.query, query)
self.assertEqual(query_execution.statement_type, AthenaQueryExecution.STATEMENT_TYPE_DML)
self.assertEqual(query_execution.state, AthenaQueryExecution.STATE_SUCCEEDED)
self.assertIsNone(query_execution.state_change_reason)
self.assertIsNotNone(query_execution.completion_date_time)
self.assertIsInstance(query_execution.completion_date_time, datetime)
self.assertIsNotNone(query_execution.submission_date_time)
self.assertIsInstance(query_execution.submission_date_time, datetime)
self.assertIsNotNone(query_execution.data_scanned_in_bytes)
self.assertIsNotNone(query_execution.execution_time_in_millis)
self.assertIsNotNone(query_execution.output_location)
self.assertIsNone(query_execution.encryption_option)
self.assertIsNone(query_execution.kms_key)
self.assertEqual(query_execution.work_group, 'primary')
self.assertEqual(result_set.database, query_execution.database)
self.assertEqual(result_set.query_id, query_execution.query_id)
self.assertEqual(result_set.query, query_execution.query)
def test_fetchone(self, cursor):
cursor.execute('SELECT * FROM one_row')
self.assertEqual(cursor.rownumber, 0)
self.assertEqual(cursor.fetchone(), (1,))
self.assertEqual(cursor.rownumber, 1)
self.assertIsNone(cursor.fetchone())
self.assertEqual(cursor.database, SCHEMA)
self.assertIsNotNone(cursor.query_id)
self.assertIsNotNone(cursor.query)
self.assertEqual(cursor.statement_type, AthenaQueryExecution.STATEMENT_TYPE_DML)
self.assertEqual(cursor.state, AthenaQueryExecution.STATE_SUCCEEDED)
self.assertIsNone(cursor.state_change_reason)
self.assertIsNotNone(cursor.completion_date_time)
self.assertIsInstance(cursor.completion_date_time, datetime)
self.assertIsNotNone(cursor.submission_date_time)
self.assertIsInstance(cursor.submission_date_time, datetime)
self.assertIsNotNone(cursor.data_scanned_in_bytes)
self.assertIsNotNone(cursor.execution_time_in_millis)
self.assertIsNotNone(cursor.output_location)
self.assertIsNone(cursor.encryption_option)
self.assertIsNone(cursor.kms_key)
self.assertEqual(cursor.work_group, 'primary')
**request)
query_ids = response.get('QueryExecutionIds', None)
if not query_ids:
break # no queries left to check
next_token = response.get('NextToken', None)
query_executions = retry_api_call(
self.connection._client.batch_get_query_execution,
config=self._retry_config,
logger=_logger,
QueryExecutionIds=query_ids
).get('QueryExecutions', [])
for execution in query_executions:
if (
execution['Query'] == query and
execution['Status']['State'] == AthenaQueryExecution.STATE_SUCCEEDED and
execution['StatementType'] == AthenaQueryExecution.STATEMENT_TYPE_DML
):
query_id = execution['QueryExecutionId']
break
if query_id or next_token is None:
break
except Exception:
_logger.warning('Failed to check the cache. Moving on without cache.')
return query_id