Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __enter__(self):
self.tmux_config = self.tempfile(self.tmux_config_contents())
self.bash_config = self.tempfile(self.bash_config_contents())
self.server = tmuxp.Server()
if self.use_existing_session:
try:
session_dict = self.server.attached_sessions() # until tmuxp bug is fixed
if session_dict is None:
raise tmuxp.exc.TmuxpException
self.session = tmuxp.Session(self.server, **session_dict[0])
except tmuxp.exc.TmuxpException:
self.session = self.server.new_session(session_name='rlundotesting')
else:
self.session = self.server.new_session(session_name='rlundotesting', kill_session=True)
self.window = self.session.new_window(attach=False)
try:
output = self.window.panes[0].cmd('respawn-pane', '-k', 'bash --rcfile %s --noprofile' % (self.bash_config.name, ))
if output.stderr:
raise ValueError(repr(output.stderr) + " " + repr(self.window.panes))
(pane, ) = self.window.panes
output = pane.cmd('respawn-pane', '-k', 'bash --rcfile %s --noprofile' % (self.bash_config.name, ))
if output.stderr:
raise ValueError(repr(output.stderr) + " " + repr(self.window.panes))
pane.cmd('split-window', '-h', 'bash --rcfile %s --noprofile' % (self.bash_config.name, ))
pformats = [
'session_name', 'session_id',
'window_index', 'window_id',
'window_name'
] + formats.PANE_FORMATS
tmux_formats = ['#{%s}\t' % f for f in pformats]
proc = self.cmd(
'list-panes',
'-a',
'-F%s' % ''.join(tmux_formats), # output
)
if proc.stderr:
raise exc.TmuxpException(proc.stderr)
panes = proc.stdout
pformats = [
'session_name', 'session_id',
'window_index', 'window_id', 'window_name'
] + formats.PANE_FORMATS
# combine format keys with values returned from ``tmux list-panes``
panes = [dict(zip(
pformats, window.split('\t'))) for window in panes]
# clear up empty dict
panes = [
dict((k, v) for k, v in window.items() if v) for window in panes
]
proc.wait()
if proc.returncode:
stderr = proc.stderr.read()
proc.stderr.close()
stderr = console_to_str(stderr).split('\n')
stderr = '\n'.join(list(filter(None, stderr))) # filter empty
raise exc.BeforeLoadScriptError(
proc.returncode, os.path.abspath(script_file), stderr
)
return proc.returncode
except OSError as e:
if e.errno == 2:
raise exc.BeforeLoadScriptNotExists(e, os.path.abspath(script_file))
else:
raise e
def command_freeze(session_name, socket_name, socket_path):
"""Snapshot a session into a config.
If SESSION_NAME is provided, snapshot that session. Otherwise, use the
current session."""
t = Server(socket_name=socket_name, socket_path=socket_path)
try:
if session_name:
session = t.find_where({'session_name': session_name})
else:
session = t.list_sessions()[0]
if not session:
raise exc.TmuxpException('Session not found.')
except exc.TmuxpException as e:
print(e)
return
sconf = freeze(session)
configparser = kaptan.Kaptan()
newconfig = config.inline(sconf)
configparser.import_config(newconfig)
config_format = click.prompt(
'Convert to', value_proc=_validate_choices(['yaml', 'json']), default='yaml'
)
if config_format == 'yaml':
newconfig = configparser.export(
'yaml', indent=2, default_flow_style=False, safe=True
)
for window in self._windows:
if 'window_active' in window:
# for now window_active is a unicode
if window.get('window_active') == '1':
active_windows.append(Window(session=self, **window))
else:
continue
if len(active_windows) == int(1):
return active_windows[0]
else:
raise exc.TmuxpException(
'multiple active windows found. %s' % active_windows)
if len(self._windows) == int(0):
raise exc.TmuxpException('No Windows')