Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_cancel(self, cursor):
def cancel(c):
time.sleep(randint(1, 5))
c.cancel()
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(cancel, cursor)
self.assertRaises(DatabaseError, lambda: cursor.execute("""
SELECT a.a * rand(), b.a * rand()
def test_description_failed(self, cursor):
try:
cursor.execute('blah_blah')
except DatabaseError:
pass
self.assertIsNone(cursor.description)
def _execute(self, operation, parameters=None, work_group=None, s3_staging_dir=None,
cache_size=0):
query = self._formatter.format(operation, parameters)
_logger.debug(query)
request = self._build_start_query_execution_request(query, work_group, s3_staging_dir)
query_id = self._find_previous_query_id(query, work_group, cache_size)
if query_id is None:
try:
query_id = retry_api_call(self._connection.client.start_query_execution,
config=self._retry_config,
logger=_logger,
**request).get('QueryExecutionId', None)
except Exception as e:
_logger.exception('Failed to execute query.')
raise_from(DatabaseError(*e.args), e)
return query_id