Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_ops(self):
conn = self.connect()
cursor = conn.cursor()
self.assertEqual(cursor.rowcount, -1)
cursor.setinputsizes([])
cursor.setoutputsize(1, 'blah')
self.assertRaises(NotSupportedError, lambda: cursor.executemany(
'SELECT * FROM one_row', []))
conn.commit()
self.assertRaises(NotSupportedError, lambda: conn.rollback())
cursor.close()
conn.close()
def test_no_ops(self):
conn = self.connect()
cursor = conn.cursor()
self.assertEqual(cursor.rowcount, -1)
cursor.setinputsizes([])
cursor.setoutputsize(1, 'blah')
self.assertRaises(NotSupportedError, lambda: cursor.executemany(
'SELECT * FROM one_row', []))
conn.commit()
self.assertRaises(NotSupportedError, lambda: conn.rollback())
cursor.close()
conn.close()
def test_cancel(self, cursor):
def cancel(c):
time.sleep(randint(1, 5))
c.cancel()
with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(cancel, cursor)
self.assertRaises(DatabaseError, lambda: cursor.execute("""
SELECT a.a * rand(), b.a * rand()
def test_fetch_no_data(self, cursor):
self.assertRaises(ProgrammingError, cursor.fetchone)
self.assertRaises(ProgrammingError, cursor.fetchmany)
self.assertRaises(ProgrammingError, cursor.fetchall)
def test_poll(self, cursor):
query_id, _ = cursor.execute("SELECT * FROM one_row")
future = cursor.poll(query_id)
query_execution = future.result()
self.assertIn(query_execution.state, [AthenaQueryExecution.STATE_QUEUED,
AthenaQueryExecution.STATE_RUNNING,
AthenaQueryExecution.STATE_SUCCEEDED,
AthenaQueryExecution.STATE_FAILED,
AthenaQueryExecution.STATE_CANCELLED])
def test_as_pandas(self, cursor):
df = cursor.execute('SELECT * FROM one_row').as_pandas()
self.assertEqual(df.shape[0], 1)
self.assertEqual(df.shape[1], 1)
self.assertEqual([(row['number_of_rows'],) for _, row in df.iterrows()], [(1,)])
self.assertIsNotNone(cursor.query_id)
self.assertIsNotNone(cursor.query)
self.assertEqual(cursor.state, AthenaQueryExecution.STATE_SUCCEEDED)
self.assertIsNone(cursor.state_change_reason)
self.assertIsNotNone(cursor.completion_date_time)
self.assertIsInstance(cursor.completion_date_time, datetime)
self.assertIsNotNone(cursor.submission_date_time)
self.assertIsInstance(cursor.submission_date_time, datetime)
self.assertIsNotNone(cursor.data_scanned_in_bytes)
self.assertIsNotNone(cursor.execution_time_in_millis)
self.assertIsNotNone(cursor.output_location)
def execute_other_thread():
with contextlib.closing(connect(schema_name=SCHEMA)) as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM one_row')
return cursor.fetchall()
def connect(self, work_group=None):
from pyathena import connect
return connect(schema_name=SCHEMA, work_group=work_group)
def test_fetchone(self, cursor):
cursor.execute('SELECT * FROM one_row')
self.assertEqual(cursor.rownumber, 0)
self.assertEqual(cursor.fetchone(), (1,))
self.assertEqual(cursor.rownumber, 1)
self.assertIsNone(cursor.fetchone())
self.assertEqual(cursor.database, SCHEMA)
self.assertIsNotNone(cursor.query_id)
self.assertIsNotNone(cursor.query)
self.assertEqual(cursor.statement_type, AthenaQueryExecution.STATEMENT_TYPE_DML)
self.assertEqual(cursor.state, AthenaQueryExecution.STATE_SUCCEEDED)
self.assertIsNone(cursor.state_change_reason)
self.assertIsNotNone(cursor.completion_date_time)
self.assertIsInstance(cursor.completion_date_time, datetime)
self.assertIsNotNone(cursor.submission_date_time)
self.assertIsInstance(cursor.submission_date_time, datetime)
self.assertIsNotNone(cursor.data_scanned_in_bytes)
self.assertIsNotNone(cursor.execution_time_in_millis)
self.assertIsNotNone(cursor.output_location)
self.assertIsNone(cursor.encryption_option)
self.assertIsNone(cursor.kms_key)
self.assertEqual(cursor.work_group, 'primary')
def test_fetchone(self, cursor):
query_id, future = cursor.execute('SELECT * FROM one_row')
result_set = future.result()
self.assertEqual(result_set.rownumber, 0)
self.assertEqual(result_set.fetchone(), (1,))
self.assertEqual(result_set.rownumber, 1)
self.assertIsNone(result_set.fetchone())
self.assertIsNotNone(result_set.query_id)
self.assertIsNotNone(result_set.query)
self.assertEqual(result_set.state, AthenaQueryExecution.STATE_SUCCEEDED)
self.assertIsNone(result_set.state_change_reason)
self.assertIsNotNone(result_set.completion_date_time)
self.assertIsInstance(result_set.completion_date_time, datetime)
self.assertIsNotNone(result_set.submission_date_time)
self.assertIsInstance(result_set.submission_date_time, datetime)
self.assertIsNotNone(result_set.data_scanned_in_bytes)
self.assertIsNotNone(result_set.execution_time_in_millis)
self.assertIsNotNone(result_set.output_location)