Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _update_query_results(self, queryResult):
self._queryResult = queryResult
self._completion_query_info = queryResult.completion_query_info
self._completion_query_resource_consumption = queryResult.completion_query_resource_consumption
self._dataSetCompletion = queryResult.dataSetCompletion
self._json_response = queryResult.json_response
queryResultTable = queryResult.tables[self.fork_table_id]
self._dataframe = None
# schema
self.columns_name = queryResultTable.keys()
self.columns_type = queryResultTable.types()
self.columns_datafarme_type = queryResultTable.datafarme_types
self.field_names = _unduplicate_field_names(self.columns_name)
# table printing style to any of prettytable's defined styles (currently DEFAULT, MSWORD_FRIENDLY, PLAIN_COLUMNS, RANDOM)
prettytable_style = prettytable.__dict__[self.options.get("prettytable_style", "DEFAULT").upper()]
self.pretty = PrettyTable(self.field_names, style=prettytable_style) if len(self.field_names) > 0 else None
self.records_count = queryResultTable.recordscount()
self.is_partial_table = queryResultTable.ispartial()
self.visualization_properties = queryResultTable.visualization_properties
# table
auto_limit = 0 if not self.options.get("auto_limit") else self.options.get("auto_limit")
if queryResultTable.returns_rows():
if auto_limit > 0:
list.__init__(self, queryResultTable.fetchmany(size=auto_limit))
else:
list.__init__(self, queryResultTable.fetchall())
else:
list.__init__(self, [])
self._fork_table_resultSets[str(self.fork_table_id)] = self
def __init__(self, sqlaproxy, sql, config):
self.keys = sqlaproxy.keys()
self.sql = sql
self.config = config
self.limit = config.autolimit
style_name = config.style
self.style = prettytable.__dict__[style_name.upper()]
if sqlaproxy.returns_rows:
if self.limit:
list.__init__(self, sqlaproxy.fetchmany(size=self.limit))
else:
list.__init__(self, sqlaproxy.fetchall())
self.field_names = unduplicate_field_names(self.keys)
self.pretty = PrettyTable(self.field_names, style=self.style)
# self.pretty.set_style(self.style)
else:
list.__init__(self, [])
self.pretty = None
def __init__(self, queryResult, parametrized_query_dict, connection, fork_table_id, fork_table_resultSets, metadata, options):
# self.current_colors_palette = ['rgb(184, 247, 212)', 'rgb(111, 231, 219)', 'rgb(127, 166, 238)', 'rgb(131, 90, 241)']
self.parametrized_query_dict = parametrized_query_dict
self.fork_table_id = fork_table_id
self._fork_table_resultSets = fork_table_resultSets
self.options = options
self.conn = connection
# set by caller
self.metadata = metadata
self.feedback_info = []
# table printing style to any of prettytable's defined styles (currently DEFAULT, MSWORD_FRIENDLY, PLAIN_COLUMNS, RANDOM)
self.prettytable_style = prettytable.__dict__[self.options.get("prettytable_style", "DEFAULT").upper()]
self.display_info = True
self.suppress_result = False
self._update(queryResult)
def __init__(self, results, query, config):
self._results = results
self._labels = defaultdict(int)
self._types = defaultdict(int)
self._graph = None
self._dataframe = None
self.keys = results.columns
self.query = query
self.config = config
self.limit = config.auto_limit
style_name = config.style
self.style = prettytable.__dict__[style_name.upper()]
if results is not None:
if not config.rest:
_results = results.rows
else:
_results = results
_results = _results or []
if self.limit:
list.__init__(self, _results[:self.limit])
else:
list.__init__(self, _results)
if self.keys is not None:
self.field_names = unduplicate_field_names(self.keys)
else:
self.field_names = []
self.pretty = prettytable.PrettyTable(self.field_names)
if not config.auto_pandas: