Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
arg = arg.strip()
if not arg:
# Oops, we parsed all the arguments without finding a statement
yield (None, None, None, usage)
raise StopIteration
(current_arg, _, arg) = arg.partition(" ")
try:
seconds = float(current_arg)
continue
except ValueError:
pass
if current_arg == "-c":
clear_screen = True
continue
statement = "{0!s} {1!s}".format(current_arg, arg)
destructive_prompt = confirm_destructive_query(statement)
if destructive_prompt is False:
click.secho("Wise choice!")
raise StopIteration
elif destructive_prompt is True:
click.secho("Your call!")
cur = kwargs["cur"]
sql_list = [
(sql.rstrip(";"), "> {0!s}".format(sql)) for sql in sqlparse.split(statement)
]
old_pager_enabled = is_pager_enabled()
while True:
if clear_screen:
click.clear()
try:
# Somewhere in the code the pager its activated after every yield,
# so we disable it in every iteration
exit(1)
if sys.stdin.isatty():
litecli.run_cli()
else:
stdin = click.get_text_stream("stdin")
stdin_text = stdin.read()
try:
sys.stdin = open("/dev/tty")
except (FileNotFoundError, OSError):
litecli.logger.warning("Unable to open TTY as stdin.")
if (
litecli.destructive_warning
and confirm_destructive_query(stdin_text) is False
):
exit(0)
try:
new_line = True
if csv:
litecli.formatter.format_name = "csv"
elif not table:
litecli.formatter.format_name = "tsv"
litecli.run_query(stdin_text, new_line=new_line)
exit(0)
except Exception as e:
click.secho(str(e), err=True, fg="red")
exit(1)
special.set_expanded_output(False)
try:
text = self.handle_editor_command(text)
except RuntimeError as e:
logger.error("sql: %r, error: %r", text, e)
logger.error("traceback: %r", traceback.format_exc())
self.echo(str(e), err=True, fg="red")
return
if not text.strip():
return
if self.destructive_warning:
destroy = confirm_destructive_query(text)
if destroy is None:
pass # Query was not destructive. Nothing to do here.
elif destroy is True:
self.echo("Your call!")
else:
self.echo("Wise choice!")
return
# Keep track of whether or not the query is mutating. In case
# of a multi-statement query, the overall query is considered
# mutating if any one of the component statements is mutating
mutating = False
try:
logger.debug("sql: %r", text)
def execute_from_file(self, arg, **_):
if not arg:
message = "Missing required argument, filename."
return [(None, None, None, message)]
try:
with open(os.path.expanduser(arg), encoding="utf-8") as f:
query = f.read()
except IOError as e:
return [(None, None, None, str(e))]
if self.destructive_warning and confirm_destructive_query(query) is False:
message = "Wise choice. Command execution stopped."
return [(None, None, None, message)]
return self.sqlexecute.run(query)