Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def execute_other_thread():
with contextlib.closing(connect(schema_name=SCHEMA)) as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM one_row')
return cursor.fetchall()
def connect(self, work_group=None):
from pyathena import connect
return connect(schema_name=SCHEMA, work_group=work_group)
def run_query(self, query, user):
cursor = pyathena.connect(
s3_staging_dir=self.configuration["s3_staging_dir"],
schema_name=self.configuration.get("schema", "default"),
encryption_option=self.configuration.get("encryption_option", None),
kms_key=self.configuration.get("kms_key", None),
work_group=self.configuration.get("work_group", "primary"),
formatter=SimpleFormatter(),
**self._get_iam_credentials(user=user)
).cursor()
try:
cursor.execute(query)
column_tuples = [
(i[0], _TYPE_MAPPINGS.get(i[1], None)) for i in cursor.description
]
columns = self.fetch_columns(column_tuples)
rows = [
def __init__(self, aws_config=None):
super().__init__(aws_config=aws_config)
from pyathena import connect
self.cursor = connect(
aws_access_key_id=self.aws_config.get('aws_access_key_id'),
aws_secret_access_key=self.aws_config.get('aws_secret_access_key'),
s3_staging_dir=self.aws_config.get('s3_staging_dir'),
region_name=self.aws_config.get('region_name')
).cursor()
:param workflow: Workflow to store the new ata
:param conn: AthenaConnection object with the connection parameters.
:param run_params: Dictionary with additional connection parameters
:param log_item: Log object to reflect the status of the execution
:return: Nothing.
"""
staging_dir = 's3://{0}'.format(conn.aws_bucket_name)
if conn.aws_file_path:
staging_dir = staging_dir + '/' + conn.aws_file_path
cursor = connect(
aws_access_key_id=conn.aws_access_key,
aws_secret_access_key=run_params['aws_secret_access_key'],
aws_session_token=run_params['aws_session_token'],
s3_staging_dir=staging_dir,
region_name=conn.aws_region_name)
data_frame = pd.read_sql(
'SELECT * FROM {0}'.format(run_params['table_name']),
cursor)
# Strip white space from all string columns and try to convert to
# datetime just in case
data_frame = _process_object_column(data_frame)
verify_data_frame(data_frame)
def connect(self, database=None):
conn = pyathena.connect(
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
region_name=self.region_name,
s3_staging_dir=self.s3_staging_dir,
schema_name=database or self.database,
role_arn=self.role_arn,
poll_interval=0.2 # 200ms
)
self.database = database or self.database
if hasattr(self, 'conn'):
self.conn.close()
self.conn = conn
def _open_connection(self):
return pyathena.connect(
aws_access_key_id=self.config.access_key,
aws_secret_access_key=self.config.secret_key,
s3_staging_dir=self.config.staging_dir,
region_name=self.config.region,
)