Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
or sql.startswith("\\q")
or sql.startswith("help")
or sql.startswith("exit")
or sql.startswith("quit")
):
_logger.debug(
"Not connected to database. Will not run statement: %s.", sql
)
raise OperationalError("Not connected to database.")
# yield ('Not connected to database', None, None, None)
# return
cur = self.conn.cursor() if self.conn else None
try: # Special command
_logger.debug("Trying a dbspecial command. sql: %r", sql)
for result in special.execute(cur, sql):
yield result
except special.CommandNotFound: # Regular SQL
_logger.debug("Regular sql statement. sql: %r", sql)
cur.execute(sql)
yield self.get_result(cur)
message will be written to the tee file, if enabled. The
message will be written to the output file, if enabled.
"""
if output:
size = self.prompt_app.output.get_size()
margin = self.get_output_margin(status)
fits = True
buf = []
output_via_pager = self.explicit_pager and special.is_pager_enabled()
for i, line in enumerate(output, 1):
self.log_output(line)
special.write_tee(line)
special.write_once(line)
if fits or output_via_pager:
# buffering
buf.append(line)
if len(line) > size.columns or i > (size.rows - margin):
fits = False
if not self.explicit_pager and special.is_pager_enabled():
# doesn't fit, use pager
output_via_pager = True
if not output_via_pager:
# doesn't fit, flush buffer
for line in buf:
click.secho(line)
buf = []
else:
t = time() - start
try:
if result_count > 0:
self.echo("")
try:
self.output(formatted, status)
except KeyboardInterrupt:
pass
self.echo("Time: %0.03fs" % t)
except KeyboardInterrupt:
pass
start = time()
result_count += 1
mutating = mutating or is_mutating(status)
special.unset_once_if_written()
except EOFError as e:
raise e
except KeyboardInterrupt:
# get last connection id
connection_id_to_kill = sqlexecute.connection_id
logger.debug("connection id to kill: %r", connection_id_to_kill)
# Restart connection to the database
sqlexecute.connect()
try:
for title, cur, headers, status in sqlexecute.run(
"kill %s" % connection_id_to_kill
):
status_str = str(status).lower()
if status_str.find("ok") > -1:
logger.debug(
"cancelled query, connection id: %r, sql: %r",
if is_select(status) and cur and cur.rowcount > threshold:
self.echo(
"The result set has more than {} rows.".format(threshold),
fg="red",
)
if not confirm("Do you want to continue?"):
self.echo("Aborted!", err=True, fg="red")
break
if self.auto_vertical_output:
max_width = self.prompt_app.output.get_size().columns
else:
max_width = None
formatted = self.format_output(
title, cur, headers, special.is_expanded_output(), max_width
)
t = time() - start
try:
if result_count > 0:
self.echo("")
try:
self.output(formatted, status)
except KeyboardInterrupt:
pass
self.echo("Time: %0.03fs" % t)
except KeyboardInterrupt:
pass
start = time()
result_count += 1
sqlexecute=None,
prompt=None,
logfile=None,
auto_vertical_output=False,
warn=None,
liteclirc=None,
):
self.sqlexecute = sqlexecute
self.logfile = logfile
# Load config.
c = self.config = get_config(liteclirc)
self.multi_line = c["main"].as_bool("multi_line")
self.key_bindings = c["main"]["key_bindings"]
special.set_favorite_queries(self.config)
self.formatter = TabularOutputFormatter(format_name=c["main"]["table_format"])
self.formatter.litecli = self
self.syntax_style = c["main"]["syntax_style"]
self.less_chatty = c["main"].as_bool("less_chatty")
self.cli_style = c["colors"]
self.output_style = style_factory_output(self.syntax_style, self.cli_style)
self.wider_completion_menu = c["main"].as_bool("wider_completion_menu")
c_dest_warning = c["main"].as_bool("destructive_warning")
self.destructive_warning = c_dest_warning if warn is None else warn
self.login_path_as_host = c["main"].as_bool("login_path_as_host")
# read from cli argument or user config file
self.auto_vertical_output = auto_vertical_output or c["main"].as_bool(
"auto_vertical_output"
)