Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_debug(capsys, settings, debug, stderr):
settings.debug = debug
logs.debug('test')
assert capsys.readouterr() == ('', stderr)
:type expanded: str
:rtype: str | None
"""
env = dict(os.environ)
env.update(settings.env)
split_expand = shlex.split(expanded)
is_slow = split_expand[0] in settings.slow_commands if split_expand else False
with logs.debug_time(u'Call: {}; with env: {}; is slow: {}'.format(
script, env, is_slow)):
result = Popen(expanded, shell=True, stdin=PIPE,
stdout=PIPE, stderr=STDOUT, env=env)
if _wait_output(result, is_slow):
output = result.stdout.read().decode('utf-8')
logs.debug(u'Received output: {}'.format(output))
return output
else:
logs.debug(u'Execution timed out!')
return None
def fix_command(known_args):
"""Fixes previous command. Used when `thefuck` called without arguments."""
settings.init(known_args)
with logs.debug_time('Total'):
logs.debug(u'Run with settings: {}'.format(pformat(settings)))
raw_command = _get_raw_command(known_args)
try:
command = types.Command.from_raw_script(raw_command)
except EmptyCommand:
logs.debug('Empty command, nothing to do')
return
corrected_commands = get_corrected_commands(command)
selected_command = select_command(corrected_commands)
if selected_command:
selected_command.run(command)
else:
sys.exit(1)
"""
env = dict(os.environ)
env.update(settings.env)
split_expand = shlex.split(expanded)
is_slow = split_expand[0] in settings.slow_commands if split_expand else False
with logs.debug_time(u'Call: {}; with env: {}; is slow: {}'.format(
script, env, is_slow)):
result = Popen(expanded, shell=True, stdin=PIPE,
stdout=PIPE, stderr=STDOUT, env=env)
if _wait_output(result, is_slow):
output = result.stdout.read().decode('utf-8')
logs.debug(u'Received output: {}'.format(output))
return output
else:
logs.debug(u'Execution timed out!')
return None
def _kill_process(proc):
"""Tries to kill the process otherwise just logs a debug message, the
process will be killed when thefuck terminates.
:type proc: Process
"""
try:
proc.kill()
except AccessDenied:
logs.debug(u'Rerun: process PID {} ({}) could not be terminated'.format(
proc.pid, proc.exe()))
def script_parts(self):
if not hasattr(self, '_script_parts'):
try:
self._script_parts = shell.split_command(self.script)
except Exception:
logs.debug(u"Can't split command script {} because:\n {}".format(
self, sys.exc_info()))
self._script_parts = []
return self._script_parts
def run(self, old_cmd):
"""Runs command from rule for passed command.
:type old_cmd: Command
"""
if self.side_effect:
self.side_effect(old_cmd, self.script)
if settings.alter_history:
shell.put_to_history(self.script)
# This depends on correct setting of PYTHONIOENCODING by the alias:
logs.debug(u'PYTHONIOENCODING: {}'.format(
os.environ.get('PYTHONIOENCODING', '!!not-set!!')))
print(self._get_script())
return None
if const.USER_COMMAND_MARK not in os.environ.get('PS1', ''):
logs.warn(
"PS1 doesn't contain user command mark, please ensure "
"that PS1 is not changed after The Fuck alias initialization")
return None
try:
with logs.debug_time(u'Read output from log'):
fd = os.open(os.environ['THEFUCK_OUTPUT_LOG'], os.O_RDONLY)
buffer = mmap.mmap(fd, const.LOG_SIZE_IN_BYTES, mmap.MAP_SHARED, mmap.PROT_READ)
_skip_old_lines(buffer)
lines = _get_output_lines(script, buffer)
output = '\n'.join(lines).strip()
logs.debug(u'Received output: {}'.format(output))
return output
except OSError:
logs.warn("Can't read output log")
return None
except ScriptNotInLog:
logs.warn("Script not found in output log")
return None