Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fix_command(known_args):
"""Fixes previous command. Used when `thefuck` called without arguments."""
settings.init(known_args)
with logs.debug_time('Total'):
logs.debug(u'Run with settings: {}'.format(pformat(settings)))
raw_command = _get_raw_command(known_args)
try:
command = types.Command.from_raw_script(raw_command)
except EmptyCommand:
logs.debug('Empty command, nothing to do')
return
corrected_commands = get_corrected_commands(command)
selected_command = select_command(corrected_commands)
if selected_command:
selected_command.run(command)
else:
sys.exit(1)
def get_output(script, expanded):
"""Runs the script and obtains stdin/stderr.
:type script: str
:type expanded: str
:rtype: str | None
"""
env = dict(os.environ)
env.update(settings.env)
split_expand = shlex.split(expanded)
is_slow = split_expand[0] in settings.slow_commands if split_expand else False
with logs.debug_time(u'Call: {}; with env: {}; is slow: {}'.format(
script, env, is_slow)):
result = Popen(expanded, shell=True, stdin=PIPE,
stdout=PIPE, stderr=STDOUT, env=env)
if _wait_output(result, is_slow):
output = result.stdout.read().decode('utf-8')
logs.debug(u'Received output: {}'.format(output))
return output
else:
logs.debug(u'Execution timed out!')
return None
def from_path(cls, path):
"""Creates rule instance from path.
:type path: pathlib.Path
:rtype: Rule
"""
name = path.name[:-3]
with logs.debug_time(u'Importing rule: {};'.format(name)):
rule_module = load_source(name, str(path))
priority = getattr(rule_module, 'priority', DEFAULT_PRIORITY)
return cls(name, rule_module.match,
rule_module.get_new_command,
getattr(rule_module, 'enabled_by_default', True),
getattr(rule_module, 'side_effect', None),
settings.priority.get(name, priority),
getattr(rule_module, 'requires_output', True))
def is_match(self, command):
"""Returns `True` if rule matches the command.
:type command: Command
:rtype: bool
"""
if command.output is None and self.requires_output:
return False
try:
with logs.debug_time(u'Trying rule: {};'.format(self.name)):
if self.match(command):
return True
except Exception:
logs.rule_failed(self, sys.exc_info())
if six.PY2:
logs.warn('Experimental instant mode is Python 3+ only')
return None
if 'THEFUCK_OUTPUT_LOG' not in os.environ:
logs.warn("Output log isn't specified")
return None
if const.USER_COMMAND_MARK not in os.environ.get('PS1', ''):
logs.warn(
"PS1 doesn't contain user command mark, please ensure "
"that PS1 is not changed after The Fuck alias initialization")
return None
try:
with logs.debug_time(u'Read output from log'):
fd = os.open(os.environ['THEFUCK_OUTPUT_LOG'], os.O_RDONLY)
buffer = mmap.mmap(fd, const.LOG_SIZE_IN_BYTES, mmap.MAP_SHARED, mmap.PROT_READ)
_skip_old_lines(buffer)
lines = _get_output_lines(script, buffer)
output = '\n'.join(lines).strip()
logs.debug(u'Received output: {}'.format(output))
return output
except OSError:
logs.warn("Can't read output log")
return None
except ScriptNotInLog:
logs.warn("Script not found in output log")
return None
def get_output(script):
"""Gets command output from shell logger."""
with logs.debug_time(u'Read output from external shell logger'):
commands = _get_last_n(const.SHELL_LOGGER_LIMIT)
for command in commands:
if command['command'] == script:
lines = _get_output_lines(command['output'])
output = '\n'.join(lines).strip()
return output
else:
logs.warn("Output isn't available in shell logger")
return None