Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_poll(self, cursor):
query_id, _ = cursor.execute("SELECT * FROM one_row")
future = cursor.poll(query_id)
query_execution = future.result()
self.assertIn(query_execution.state, [AthenaQueryExecution.STATE_QUEUED,
AthenaQueryExecution.STATE_RUNNING,
AthenaQueryExecution.STATE_SUCCEEDED,
AthenaQueryExecution.STATE_FAILED,
AthenaQueryExecution.STATE_CANCELLED])
def _get_query_execution(self, query_id):
request = {'QueryExecutionId': query_id}
try:
response = retry_api_call(self._connection.client.get_query_execution,
config=self._retry_config,
logger=_logger,
**request)
except Exception as e:
_logger.exception('Failed to get query execution.')
raise_from(OperationalError(*e.args), e)
else:
return AthenaQueryExecution(response)
def _poll(self, query_id):
while True:
query_execution = self._get_query_execution(query_id)
if query_execution.state in [AthenaQueryExecution.STATE_SUCCEEDED,
AthenaQueryExecution.STATE_FAILED,
AthenaQueryExecution.STATE_CANCELLED]:
return query_execution
else:
time.sleep(self._poll_interval)