Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def execute(self, params, **kwargs):
folder_name = kwargs['folder'] if 'folder' in kwargs else None
if folder_name in params.folder_cache:
display.formatted_tree(params, params.folder_cache[folder_name])
else:
rs = try_resolve_path(params, folder_name)
if rs is not None:
folder, pattern = rs
if len(pattern) == 0:
display.formatted_tree(params, folder)
else:
logging.warning('Folder %s not found', folder_name)
class FolderMakeCommand(Command):
def get_parser(self):
return mkdir_parser
def execute(self, params, **kwargs):
base_folder = params.folder_cache[params.current_folder] if params.current_folder in params.folder_cache else params.root_folder
name = kwargs['folder'] if 'folder' in kwargs else None
if name:
rs = try_resolve_path(params, name)
if rs is not None:
base_folder, name = rs
if len(name) == 0:
logging.warning('Folder "%s" already exists', name)
return
shared_folder = kwargs['shared_folder'] if 'shared_folder' in kwargs else None
display.formatted_records(results)
# Search shared folders
results = api.search_shared_folders(params, pattern)
if results:
print('')
display.formatted_shared_folders(results, params=params, skip_details=True)
# Search teams
results = api.search_teams(params, pattern)
if results:
print('')
display.formatted_teams(results, params=params, skip_details=True)
class RecordListCommand(Command):
def get_parser(self):
return list_parser
def execute(self, params, **kwargs):
pattern = kwargs['pattern'] if 'pattern' in kwargs else None
results = api.search_records(params, pattern or '')
if results:
if len(results) < 5:
api.get_record_shares(params, [x.record_uid for x in results])
display.formatted_records(results)
class RecordListSfCommand(Command):
def execute(self, params, **kwargs):
pattern = kwargs['pattern'] if 'pattern' in kwargs else None
results = api.search_shared_folders(params, pattern or '')
permissions = rec['shares']['user_permissions']
ro['shared_with'] = [{
'username': su['username'],
'owner': su.get('owner') or False,
'editable': su.get('editable') or False,
'sharable': su.get('sharable') or False
} for su in permissions]
print(json.dumps(ro, indent=2))
elif fmt == 'password':
print(r.password)
else:
r.display(params=params)
class RecordDownloadAttachmentCommand(Command):
def get_parser(self):
return download_parser
def execute(self, params, **kwargs):
name = kwargs['record'] if 'record' in kwargs else None
if not name:
self.get_parser().print_help()
return
record_uid = None
if name in params.record_cache:
record_uid = name
else:
rs = try_resolve_path(params, name)
if rs is not None:
emails = [x['to_username'] for x in rs['add_statuses'] if x['status'] in ['pending_accept']]
if emails:
logging.info('Recipient must accept request to complete sharing. Invitation sent to %s. ', ', '.join(emails))
emails = [x['to_username'] for x in rs['add_statuses'] if x['status'] not in ['success', 'pending_accept']]
if emails:
logging.info('Failed to share record with: %s', ', '.join(emails))
if 'remove_statuses' in rs:
emails = [x['to_username'] for x in rs['remove_statuses'] if x['status'] == 'success']
if emails:
logging.info('Stopped sharing record with: %s', ', '.join(emails))
class ShareReportCommand(Command):
def get_parser(self):
return share_report_parser
def execute(self, params, **kwargs):
record_uids = []
user_filter = set()
record_filter = set()
if kwargs.get('record'):
records = kwargs.get('record') or []
for r in records:
if r in params.record_cache:
record_filter.add(r)
else:
r_uid = None
rs = try_resolve_path(params, r)
np = 'y'
if not force:
summary = pdr['would_delete']['deletion_summary']
for x in summary:
print(x)
np = user_choice('Do you like to proceed with deletion?', 'yn', default='n')
if np.lower() == 'y':
rq = {
'command': 'delete',
'pre_delete_token': pdr['pre_delete_token']
}
api.communicate(params, rq)
params.sync_data = True
class FolderMoveCommand(Command):
@staticmethod
def prepare_transition_keys(params, folder, keys, encryption_key):
for f_uid in folder.subfolders:
f = params.folder_cache[f_uid]
FolderMoveCommand.prepare_transition_keys(params, f, keys, encryption_key)
sf = params.subfolder_cache[folder.uid]
transition_key = api.encrypt_aes(sf['folder_key_unencrypted'], encryption_key)
keys.append({
'uid': folder.uid,
'key': transition_key
})
if folder.uid in params.subfolder_record_cache:
for r_uid in params.subfolder_record_cache[folder.uid]:
rec = params.record_cache[r_uid]
results = api.search_records(params, pattern or '')
if results:
if len(results) < 5:
api.get_record_shares(params, [x.record_uid for x in results])
display.formatted_records(results)
class RecordListSfCommand(Command):
def execute(self, params, **kwargs):
pattern = kwargs['pattern'] if 'pattern' in kwargs else None
results = api.search_shared_folders(params, pattern or '')
if results:
display.formatted_shared_folders(results)
class RecordListTeamCommand(Command):
def execute(self, params, **kwargs):
rq = {
'command': 'get_available_teams'
}
rs = api.communicate(params, rq)
result = []
for team in rs['teams']:
team = Team(team_uid=team['team_uid'], name=team['team_name'])
result.append(team)
display.formatted_teams(result, skip_details=True)
class RecordGetUidCommand(Command):
def get_parser(self):
upload_parser = argparse.ArgumentParser(prog='upload-attachment', description='Upload record attachments')
upload_parser.add_argument('--file', dest='file', action='append', required=True, help='file name to upload.')
upload_parser.add_argument('record', action='store', help='record path or UID')
upload_parser.error = raise_parse_exception
upload_parser.exit = suppress_exit
delete_attachment_parser = argparse.ArgumentParser(prog='delete-attachment', description='Delete attachment file')
delete_attachment_parser.add_argument('--name', dest='name', action='append', required=True, help='attachment file name or ID. Can be repeated.')
delete_attachment_parser.add_argument('record', action='store', help='record path or UID')
delete_attachment_parser.error = raise_parse_exception
delete_attachment_parser.exit = suppress_exit
class RecordAddCommand(Command):
def get_parser(self):
return add_parser
def execute(self, params, **kwargs):
title = kwargs['title'] if 'title' in kwargs else None
login = kwargs['login'] if 'login' in kwargs else None
password = kwargs['password'] if 'password' in kwargs else None
url = kwargs['url'] if 'url' in kwargs else None
custom_list = kwargs['custom'] if 'custom' in kwargs else None
notes = kwargs['notes'] if 'notes' in kwargs else None
generate = kwargs['generate'] if 'generate' in kwargs else None
if generate:
password = generator.generate(16)
force = kwargs['force'] if 'force' in kwargs else None
lb = self._fd.read(4)
rs_len = int.from_bytes(lb, byteorder='big')
return self._fd.read(rs_len)
raise Exception('SSH Agent Connect: Unsupported platform')
class ConnectEndpoint:
def __init__(self, name, description, record_uid, record_title, paths):
self.name = name
self.description = description
self.record_uid = record_uid
self.record_title = record_title
self.paths = paths
class ConnectCommand(Command):
LastRevision = 0 # int
Endpoints = [] # type: [ConnectEndpoint]
def get_parser(self):
return connect_parser
def execute(self, params, **kwargs):
if kwargs.get('syntax_help'):
logging.info(connect_command_description)
return
ConnectCommand.find_endpoints(params)
endpoint = kwargs.get('endpoint')
if endpoint:
endpoints = [x for x in ConnectCommand.Endpoints if x.name == endpoint]
add_command.execute(params, title='Keeper credentials for {0}'.format(email), login=email, password=password, force=True)
except Exception:
store = False
logging.error('Failed to create record in Keeper')
else:
store = False
if generate and not store:
logging.warning('Generated password: %s', password)
if params.enterprise:
api.query_enterprise(params)
else:
logging.error(rs['message'])
class ShareFolderCommand(Command):
def get_parser(self):
return share_folder_parser
def execute(self, params, **kwargs):
folder = None
name = kwargs.get('folder')
if name:
if name in params.folder_cache:
folder = params.folder_cache[name]
else:
rs = try_resolve_path(params, name)
if rs is not None:
folder, name = rs
if len(name or '') > 0:
folder = None
elif folder.type == BaseFolderNode.RootFolderType:
s = set()
s.update(att1.keys())
s.symmetric_difference_update(att2.keys())
if len(s) > 0:
for id in s:
yield 'Attachment', RecordHistoryCommand.to_attachment_str(att1.get(id)), RecordHistoryCommand.to_attachment_str(att2.get(id))
class TotpEndpoint:
def __init__(self, record_uid, record_title, paths):
self.record_uid = record_uid
self.record_title = record_title
self.paths = paths
class TotpCommand(Command):
LastRevision = 0 # int
Endpoints = [] # type: [TotpEndpoint]
def get_parser(self):
return totp_parser
def execute(self, params, **kwargs):
record_name = kwargs['record'] if 'record' in kwargs else None
record_uid = None
if record_name:
if record_name in params.record_cache:
record_uid = record_name
else:
rs = try_resolve_path(params, record_name)
if rs is not None:
folder, record_name = rs