Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@cmd2.with_category(CMD_CAT_SSHAME)
def do_creds(self, arg):
'Display credentials'
if arg.list:
q = self.db.query(Credential.username, Credential.valid, Credential.host_address, Credential.host_port, Credential.key_fingerprint,
Key.source, Credential.updated, Host.dn).join('host').join('key').order_by(Credential.host_address)
if not arg.verbose:
q = q.filter(Credential.valid == True)
else:
q = q.filter(Credential.valid != None)
s = ""
lc = hc = 0
prev_host = None
for r in q.all():
lc += 1
host = f"{r[2]}:{r[3]}"
if prev_host != host:
@cmd2.with_category(CMD_CAT_CONNECTING)
def do_which(self, _):
"""Which command"""
self.poutput('Which')
@cmd2.with_category("Command Management")
def do_enable_commands(self, _):
"""Enable the Application Management commands"""
self.enable_category(self.CMD_CAT_APP_MGMT)
self.poutput("The Application Management commands have been enabled")
@cmd2.with_category(__CMD_CAT_OBJ)
def do_user(self, stmt):
'''Create, list and delete users.
A user is a username used to authenticate on an endpoint. Once a user
is added to the workspace, it can be used with "set" and "connect".
'''
func = getattr(stmt, 'func', None)
if func is not None:
# Call whatever subcommand function was selected
func(self, stmt)
else:
self.__user_list(stmt)
@cmd2.with_category(CLANVAS_CATEGORY)
@cmd2.with_argparser(la_parser)
@argparser_course_required_wrapper
def do_la(self, course, opts):
return list_assignments(course, self.list_assignments_cached, long=opts.long,
submissions=opts.submissions, upcoming=opts.upcoming)
@cmd2.with_category(CMD_CAT_SSHAME)
def do_session(self, arg):
'Set session name'
if arg.session:
self.init_db(arg.session)
else:
self.poutput(f"Current session: {self.session_name}")
if arg.verbose:
h = self.db.query(Host).count_star()
ho = self.db.query(Credential.host_address, Credential.host_port).distinct(
).filter(Credential.valid == True).count()
self.poutput(f"Hosts : {h}")
self.poutput(f"Keys : {self.db.query(Key).count_star()}")
self.poutput(f"Creds tested: {self.db.query(Credential).filter(Credential.valid != None).count_star()}")
self.poutput(f"Creds valid : {self.db.query(Credential).filter(Credential.valid == True).count_star()}")
self.poutput(f"Hosts open : {ho} ({ho*100//h}%)")
@with_category(CAT_LINK_MANAGER)
@with_argparser(list_parser)
def do_list(self, opts):
"""Manager link options of the emulation"""
def print_data(link):
data = []
self.poutput(msg="")
cid = ["ID", "{id}".format(id=link.getId())]
data.append(cid)
name = ["Name", "{name}".format(name=link.getName())]
data.append(name)
type = ["Type", "{type}".format(type=link.getType().describe())]
data.append(type)
encap = ["Encapsulation", "{encap}".format(encap=link.getEncap().describe())]
data.append(encap)
source = ["Source", "{src}".format(src=link.getSource().getName())]
@cmd2.with_category(CLANVAS_CATEGORY)
@cmd2.with_argparser(whoami_parser)
def do_whoami(self, opts):
profile = self.canvas.get_current_user().get_profile()
if not opts.verbose:
get_outputter().poutput(profile['name'] + ' (' + profile['login_id'] + ')')
else:
verbose_fields = ['name', 'short_name', 'login_id', 'primary_email', 'id', 'time_zone']
get_outputter().poutput('\n'.join([field + ': ' + str(profile[field]) for field in verbose_fields]))
@cmd2.with_category(__CMD_CAT_OBJ)
def do_endpoint(self, stmt):
'''Create, list, search and delete endpoints.
An endpoint is a couple of an IP and a port on which a SSH service
should be running. Once added, an endpoint must be reached using "probe"
and then connected using "connect".
'''
func = getattr(stmt, 'func', None)
if func is not None:
# Call whatever subcommand function was selected
func(self, stmt)
else:
self.__endpoint_list(stmt)
@cmd2.with_category(__CMD_CAT_CON)
def do_connect(self, stmt):
'''Try to authenticate on an Enpoint using a User and Creds'''
connection = getattr(stmt, 'connection', None)
verbose = getattr(stmt, 'verbose', False)
force = getattr(stmt, 'force', False)
probe_auto = getattr(stmt, 'probe', False)
targets = self.workspace.enum_connect(connection, force=force, unprobed=probe_auto)
nb_targets = len(targets)
if nb_targets > 1:
if not yes_no("This will attempt up to "+str(nb_targets)+" connections. Proceed ?", False, list_val=targets):
return
try:
nb_working = self.workspace.connect(targets, verbose, probe_auto)
except NoPathError: