Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wrapped_fn(self, *args, **kwargs):
with contextlib.closing(self.connect()) as conn:
with conn.cursor(PandasCursor) as cursor:
fn(self, cursor, *args, **kwargs)
return wrapped_fn
def test_no_ops(self):
conn = self.connect()
cursor = conn.cursor(PandasCursor)
self.assertRaises(NotSupportedError, lambda: cursor.executemany(
'SELECT * FROM one_row', []))
cursor.close()
conn.close()
def test_open_close(self):
with contextlib.closing(self.connect()) as conn:
with conn.cursor(PandasCursor):
pass
def __init__(self, connection, s3_staging_dir, schema_name, work_group,
poll_interval, encryption_option, kms_key, converter, formatter,
retry_config, **kwargs):
super(PandasCursor, self).__init__(
connection=connection,
s3_staging_dir=s3_staging_dir,
schema_name=schema_name,
work_group=work_group,
poll_interval=poll_interval,
encryption_option=encryption_option,
kms_key=kms_key,
converter=converter,
formatter=formatter,
retry_config=retry_config,
**kwargs)