Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def shell_logger(output):
"""Logs shell output to the `output`.
Works like unix script command with `-f` flag.
"""
if not os.environ.get('SHELL'):
logs.warn("Shell logger doesn't support your platform.")
sys.exit(1)
fd = os.open(output, os.O_CREAT | os.O_TRUNC | os.O_RDWR)
os.write(fd, b'\x00' * const.LOG_SIZE_IN_BYTES)
buffer = mmap.mmap(fd, const.LOG_SIZE_IN_BYTES, mmap.MAP_SHARED, mmap.PROT_WRITE)
return_code = _spawn(os.environ['SHELL'], partial(_read, buffer))
sys.exit(return_code)
parser.print_help()
elif known_args.version:
logs.version(get_installation_info().version,
sys.version.split()[0], shell.info())
# It's important to check if an alias is being requested before checking if
# `TF_HISTORY` is in `os.environ`, otherwise it might mess with subshells.
# Check https://github.com/nvbn/thefuck/issues/921 for reference
elif known_args.alias:
print_alias(known_args)
elif known_args.command or 'TF_HISTORY' in os.environ:
fix_command(known_args)
elif known_args.shell_logger:
try:
from .shell_logger import shell_logger # noqa: E402
except ImportError:
logs.warn('Shell logger supports only Linux and macOS')
else:
shell_logger(known_args.shell_logger)
else:
parser.print_usage()
def get_output(script):
"""Gets command output from shell logger."""
with logs.debug_time(u'Read output from external shell logger'):
commands = _get_last_n(const.SHELL_LOGGER_LIMIT)
for command in commands:
if command['command'] == script:
lines = _get_output_lines(command['output'])
output = '\n'.join(lines).strip()
return output
else:
logs.warn("Output isn't available in shell logger")
return None
def get_output(script):
"""Reads script output from log.
:type script: str
:rtype: str | None
"""
if six.PY2:
logs.warn('Experimental instant mode is Python 3+ only')
return None
if 'THEFUCK_OUTPUT_LOG' not in os.environ:
logs.warn("Output log isn't specified")
return None
if const.USER_COMMAND_MARK not in os.environ.get('PS1', ''):
logs.warn(
"PS1 doesn't contain user command mark, please ensure "
"that PS1 is not changed after The Fuck alias initialization")
return None
try:
with logs.debug_time(u'Read output from log'):
fd = os.open(os.environ['THEFUCK_OUTPUT_LOG'], os.O_RDONLY)
buffer = mmap.mmap(fd, const.LOG_SIZE_IN_BYTES, mmap.MAP_SHARED, mmap.PROT_READ)
def get_output(script):
"""Reads script output from log.
:type script: str
:rtype: str | None
"""
if six.PY2:
logs.warn('Experimental instant mode is Python 3+ only')
return None
if 'THEFUCK_OUTPUT_LOG' not in os.environ:
logs.warn("Output log isn't specified")
return None
if const.USER_COMMAND_MARK not in os.environ.get('PS1', ''):
logs.warn(
"PS1 doesn't contain user command mark, please ensure "
"that PS1 is not changed after The Fuck alias initialization")
return None
try:
with logs.debug_time(u'Read output from log'):
fd = os.open(os.environ['THEFUCK_OUTPUT_LOG'], os.O_RDONLY)
buffer = mmap.mmap(fd, const.LOG_SIZE_IN_BYTES, mmap.MAP_SHARED, mmap.PROT_READ)
_skip_old_lines(buffer)
lines = _get_output_lines(script, buffer)
output = '\n'.join(lines).strip()
logs.debug(u'Received output: {}'.format(output))
def _get_alias(known_args):
if six.PY2:
warn("The Fuck will drop Python 2 support soon, more details "
"https://github.com/nvbn/thefuck/issues/685")
alias = shell.app_alias(known_args.alias)
if known_args.enable_experimental_instant_mode:
if six.PY2:
warn("Instant mode requires Python 3")
elif not which('script'):
warn("Instant mode requires `script` app")
else:
return shell.instant_mode_alias(known_args.alias)
return alias
def _get_alias(known_args):
if six.PY2:
warn("The Fuck will drop Python 2 support soon, more details "
"https://github.com/nvbn/thefuck/issues/685")
alias = shell.app_alias(known_args.alias)
if known_args.enable_experimental_instant_mode:
if six.PY2:
warn("Instant mode requires Python 3")
elif not which('script'):
warn("Instant mode requires `script` app")
else:
return shell.instant_mode_alias(known_args.alias)
return alias
def _setup_db(self):
cache_dir = self._get_cache_dir()
cache_path = Path(cache_dir).joinpath('thefuck').as_posix()
try:
self._db = shelve.open(cache_path)
except shelve_open_error + (ImportError,):
# Caused when switching between Python versions
warn("Removing possibly out-dated cache")
os.remove(cache_path)
self._db = shelve.open(cache_path)
atexit.register(self._db.close)