Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _process_meta_data(self, response):
result_set = response.get('ResultSet', None)
if not result_set:
raise DataError('KeyError `ResultSet`')
meta_data = result_set.get('ResultSetMetadata', None)
if not meta_data:
raise DataError('KeyError `ResultSetMetadata`')
column_info = meta_data.get('ColumnInfo', None)
if column_info is None:
raise DataError('KeyError `ColumnInfo`')
self._meta_data = tuple(column_info)
query_execution_context = query_execution.get('QueryExecutionContext', {})
self._database = query_execution_context.get('Database', None)
self._query_id = query_execution.get('QueryExecutionId', None)
if not self._query_id:
raise DataError('KeyError `QueryExecutionId`')
self._query = query_execution.get('Query', None)
if not self._query:
raise DataError('KeyError `Query`')
self._statement_type = query_execution.get('StatementType', None)
status = query_execution.get('Status', None)
if not status:
raise DataError('KeyError `Status`')
self._state = status.get('State', None)
self._state_change_reason = status.get('StateChangeReason', None)
self._completion_date_time = status.get('CompletionDateTime', None)
self._submission_date_time = status.get('SubmissionDateTime', None)
statistics = query_execution.get('Statistics', {})
self._data_scanned_in_bytes = statistics.get('DataScannedInBytes', None)
self._execution_time_in_millis = statistics.get('EngineExecutionTimeInMillis', None)
result_conf = query_execution.get('ResultConfiguration', {})
self._output_location = result_conf.get('OutputLocation', None)
encryption_conf = result_conf.get('EncryptionConfiguration', {})
self._encryption_option = encryption_conf.get('EncryptionOption', None)
self._kms_key = encryption_conf.get('KmsKey', None)
def _process_rows(self, response):
result_set = response.get('ResultSet', None)
if not result_set:
raise DataError('KeyError `ResultSet`')
rows = result_set.get('Rows', None)
if rows is None:
raise DataError('KeyError `Rows`')
processed_rows = []
if len(rows) > 0:
offset = 1 if not self._next_token and self._is_first_row_column_labels(rows) else 0
processed_rows = [
tuple([self._converter.convert(meta.get('Type', None),
row.get('VarCharValue', None))
for meta, row in zip(self._meta_data, rows[i].get('Data', []))])
for i in xrange(offset, len(rows))
]
self._rows.extend(processed_rows)
self._next_token = response.get('NextToken', None)
def _process_rows(self, response):
result_set = response.get('ResultSet', None)
if not result_set:
raise DataError('KeyError `ResultSet`')
rows = result_set.get('Rows', None)
if rows is None:
raise DataError('KeyError `Rows`')
processed_rows = []
if len(rows) > 0:
offset = 1 if not self._next_token and self._is_first_row_column_labels(rows) else 0
processed_rows = [
tuple([self._converter.convert(meta.get('Type', None),
row.get('VarCharValue', None))
for meta, row in zip(self._meta_data, rows[i].get('Data', []))])
for i in xrange(offset, len(rows))
]
self._rows.extend(processed_rows)
self._next_token = response.get('NextToken', None)
def _process_meta_data(self, response):
result_set = response.get('ResultSet', None)
if not result_set:
raise DataError('KeyError `ResultSet`')
meta_data = result_set.get('ResultSetMetadata', None)
if not meta_data:
raise DataError('KeyError `ResultSetMetadata`')
column_info = meta_data.get('ColumnInfo', None)
if column_info is None:
raise DataError('KeyError `ColumnInfo`')
self._meta_data = tuple(column_info)
def _parse_output_location(cls, output_location):
match = cls._pattern_output_location.search(output_location)
if match:
return match.group('bucket'), match.group('key')
else:
raise DataError('Unknown `output_location` format.')