Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _as_pandas(self):
import pandas as pd
if not self.output_location:
raise ProgrammingError('OutputLocation is none or empty.')
bucket, key = self._parse_output_location(self.output_location)
try:
response = retry_api_call(self._client.get_object,
config=self._retry_config,
logger=_logger,
Bucket=bucket,
Key=key)
except Exception as e:
_logger.exception('Failed to download csv.')
raise_from(OperationalError(*e.args), e)
else:
length = response['ContentLength']
if length:
df = pd.read_csv(io.BytesIO(response['Body'].read()),
dtype=self.dtypes,
converters=self.converters,
parse_dates=self.parse_dates,
infer_datetime_format=True)
df = self._trunc_date(df)
def __fetch(self, next_token=None):
if not self._query_execution.query_id:
raise ProgrammingError('QueryExecutionId is none or empty.')
if self._query_execution.state != AthenaQueryExecution.STATE_SUCCEEDED:
raise ProgrammingError('QueryExecutionState is not SUCCEEDED.')
request = {
'QueryExecutionId': self._query_execution.query_id,
'MaxResults': self._arraysize,
}
if next_token:
request.update({'NextToken': next_token})
try:
response = retry_api_call(self._connection.client.get_query_results,
config=self._retry_config,
logger=_logger,
**request)
except Exception as e:
_logger.exception('Failed to fetch result set.')
raise_from(OperationalError(*e.args), e)
else:
return response
query_id = None
try:
next_token = None
while cache_size > 0:
n = min(cache_size, 50) # 50 is max allowed by AWS API
cache_size -= n
request = self._build_list_query_executions_request(n, work_group, next_token)
response = retry_api_call(self.connection._client.list_query_executions,
config=self._retry_config,
logger=_logger,
**request)
query_ids = response.get('QueryExecutionIds', None)
if not query_ids:
break # no queries left to check
next_token = response.get('NextToken', None)
query_executions = retry_api_call(
self.connection._client.batch_get_query_execution,
config=self._retry_config,
logger=_logger,
QueryExecutionIds=query_ids
).get('QueryExecutions', [])
for execution in query_executions:
if (
execution['Query'] == query and
execution['Status']['State'] == AthenaQueryExecution.STATE_SUCCEEDED and
execution['StatementType'] == AthenaQueryExecution.STATEMENT_TYPE_DML
):
query_id = execution['QueryExecutionId']
break
if query_id or next_token is None:
break
except Exception:
def _get_query_execution(self, query_id):
request = {'QueryExecutionId': query_id}
try:
response = retry_api_call(self._connection.client.get_query_execution,
config=self._retry_config,
logger=_logger,
**request)
except Exception as e:
_logger.exception('Failed to get query execution.')
raise_from(OperationalError(*e.args), e)
else:
return AthenaQueryExecution(response)
def _execute(self, operation, parameters=None, work_group=None, s3_staging_dir=None,
cache_size=0):
query = self._formatter.format(operation, parameters)
_logger.debug(query)
request = self._build_start_query_execution_request(query, work_group, s3_staging_dir)
query_id = self._find_previous_query_id(query, work_group, cache_size)
if query_id is None:
try:
query_id = retry_api_call(self._connection.client.start_query_execution,
config=self._retry_config,
logger=_logger,
**request).get('QueryExecutionId', None)
except Exception as e:
_logger.exception('Failed to execute query.')
raise_from(DatabaseError(*e.args), e)
return query_id
def _find_previous_query_id(self, query, work_group, cache_size):
query_id = None
try:
next_token = None
while cache_size > 0:
n = min(cache_size, 50) # 50 is max allowed by AWS API
cache_size -= n
request = self._build_list_query_executions_request(n, work_group, next_token)
response = retry_api_call(self.connection._client.list_query_executions,
config=self._retry_config,
logger=_logger,
**request)
query_ids = response.get('QueryExecutionIds', None)
if not query_ids:
break # no queries left to check
next_token = response.get('NextToken', None)
query_executions = retry_api_call(
self.connection._client.batch_get_query_execution,
config=self._retry_config,
logger=_logger,
QueryExecutionIds=query_ids
).get('QueryExecutions', [])
for execution in query_executions:
if (
execution['Query'] == query and