Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_no_ops(self):
conn = self.connect()
cursor = conn.cursor(AsyncCursor)
self.assertRaises(NotSupportedError, lambda: cursor.executemany(
'SELECT * FROM one_row', []))
cursor.close()
conn.close()
def test_open_close(self):
with contextlib.closing(self.connect()) as conn:
with conn.cursor(AsyncCursor):
pass
def wrapped_fn(self, *args, **kwargs):
with contextlib.closing(self.connect()) as conn:
with conn.cursor(AsyncCursor) as cursor:
fn(self, cursor, *args, **kwargs)
return wrapped_fn
def test_open_close(self):
with contextlib.closing(self.connect()) as conn:
with conn.cursor(AsyncCursor):
pass
def test_no_ops(self):
conn = self.connect()
cursor = conn.cursor(AsyncCursor)
self.assertRaises(NotSupportedError, lambda: cursor.executemany(
'SELECT * FROM one_row', []))
cursor.close()
conn.close()
def __init__(self, connection, s3_staging_dir, schema_name, work_group,
poll_interval, encryption_option, kms_key, converter, formatter,
retry_config, max_workers=(cpu_count() or 1) * 5,
arraysize=CursorIterator.DEFAULT_FETCH_SIZE):
super(AsyncCursor, self).__init__(
connection=connection,
s3_staging_dir=s3_staging_dir,
schema_name=schema_name,
work_group=work_group,
poll_interval=poll_interval,
encryption_option=encryption_option,
kms_key=kms_key,
converter=converter,
formatter=formatter,
retry_config=retry_config)
self._executor = ThreadPoolExecutor(max_workers=max_workers)
self._arraysize = arraysize