Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@special_command(
".load",
".load path",
"Load an extension library.",
arg_type=PARSED_QUERY,
case_sensitive=True,
)
def load_extension(cur, arg, **_):
args = shlex.split(arg)
if len(args) != 1:
raise TypeError(".load accepts exactly one path")
path = args[0]
conn = cur.connection
conn.enable_load_extension(True)
conn.load_extension(path)
return [(None, None, None, "")]
@special_command("system", "system [command]", "Execute a system shell commmand.")
def execute_system_command(arg, **_):
"""Execute a system shell command."""
usage = "Syntax: system [command].\n"
if not arg:
return [(None, None, None, usage)]
try:
command = arg.strip()
if command.startswith("cd"):
ok, error_message = handle_cd_command(arg)
if not ok:
return [(None, None, None, error_message)]
return [(None, None, None, "")]
args = arg.split(" ")
@special_command(
".status",
"\\s",
"Show current settings.",
arg_type=RAW_QUERY,
aliases=("\\s",),
case_sensitive=True,
)
def status(cur, **_):
# Create output buffers.
footer = []
footer.append("--------------")
# Output the litecli client information.
implementation = platform.python_implementation()
version = platform.python_version()
client_info = []
@special_command(
".tables",
"\\dt",
"List tables.",
arg_type=PARSED_QUERY,
case_sensitive=True,
aliases=("\\dt",),
)
def list_tables(cur, arg=None, arg_type=PARSED_QUERY, verbose=False):
if arg:
args = ("{0}%".format(arg),)
query = """
SELECT name FROM sqlite_master
WHERE type IN ('table','view') AND name LIKE ? AND name NOT LIKE 'sqlite_%'
ORDER BY 1
"""
else:
@special_command("\\fd", "\\fd [name]", "Delete a favorite query.")
def delete_favorite_query(arg, **_):
"""Delete an existing favorite query.
"""
usage = "Syntax: \\fd name.\n\n" + favoritequeries.usage
if not arg:
return [(None, None, None, usage)]
status = favoritequeries.delete(arg)
return [(None, None, None, status)]
@special_command(
"watch",
"watch [seconds] [-c] query",
"Executes the query every [seconds] seconds (by default 5).",
)
def watch_query(arg, **kwargs):
usage = """Syntax: watch [seconds] [-c] query.
* seconds: The interval at the query will be repeated, in seconds.
By default 5.
* -c: Clears the screen between every iteration.
"""
if not arg:
yield (None, None, None, usage)
raise StopIteration
seconds = 5
clear_screen = False
statement = None
@special_command(
"pager",
"\\P [command]",
"Set PAGER. Print the query results via PAGER.",
arg_type=PARSED_QUERY,
aliases=("\\P",),
case_sensitive=True,
)
def set_pager(arg, **_):
if arg:
os.environ["PAGER"] = arg
msg = "PAGER set to %s." % arg
set_pager_enabled(True)
else:
if "PAGER" in os.environ:
msg = "PAGER set to %s." % os.environ["PAGER"]
else:
@special_command("notee", "notee", "Stop writing results to an output file.")
def no_tee(arg, **_):
close_tee()
return [(None, None, None, "")]
@special_command(
".once",
"\\o [-o] filename",
"Append next result to an output file (overwrite using -o).",
aliases=("\\o", "\\once"),
)
def set_once(arg, **_):
global once_file
once_file = parseargfile(arg)
return [(None, None, None, "")]
@special_command(
"tee",
"tee [-o] filename",
"Append all results to an output file (overwrite using -o).",
)
def set_tee(arg, **_):
global tee_file
try:
tee_file = open(**parseargfile(arg))
except (IOError, OSError) as e:
raise OSError("Cannot write to file '{}': {}".format(e.filename, e.strerror))
return [(None, None, None, "")]