Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def execute(self, params, **kwargs):
emails = kwargs.get('email') or []
if not emails:
logging.error('\'email\' parameter is missing')
return
action = kwargs.get('action') or 'grant'
if action == 'cancel':
answer = user_choice(bcolors.FAIL + bcolors.BOLD + '\nALERT!\n' + bcolors.ENDC +
'This action cannot be undone.\n\n' +
'Do you want to cancel all shares with user(s): ' + ', '.join(emails) + ' ?', 'yn', 'n')
if answer.lower() in {'y', 'yes'}:
for email in emails:
rq = {
'command': 'cancel_share',
'to_email': email
}
try:
rs = api.communicate(params, rq)
except KeeperApiError as kae:
if kae.result_code == 'share_not_found':
logging.info('{0}: No shared records are found.'.format(email))
else:
logging.warning('{0}: {1}.'.format(email, kae.message))
except Exception as e:
break
node_id = None
if kwargs.get('node'):
for node in params.enterprise['nodes']:
if kwargs['node'] in {str(node['node_id']), node['data'].get('displayname')}:
node_id = node['node_id']
break
elif not node.get('parent_id') and kwargs['node'] == params.enterprise['enterprise_name']:
node_id = node['node_id']
break
if kwargs.get('delete'):
if team is not None:
answer = 'y' if kwargs.get('force') else \
user_choice('Delete Team\n\nAre you sure you want to delete {0}'.format(team['name']), 'yn', 'n')
if answer.lower() == 'y':
rq = {
'command': 'team_delete',
'team_uid': team['team_uid']
}
rs = api.communicate(params, rq)
if rs['result'] == 'success':
logging.info('Team %s deleted', team['name'])
api.query_enterprise(params)
else:
logging.warning('Team not found')
return
if kwargs.get('add'):
if team is None:
if node_id is None:
elif user_folder:
if base_folder.type in {BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType}:
request['folder_type'] = 'shared_folder_folder'
else:
request['folder_type'] = 'user_folder'
if request.get('folder_type') is None:
if base_folder.type in {BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType}:
request['folder_type'] = 'shared_folder_folder'
if request.get('folder_type') is None:
inp = user_choice('Do you want to create a shared folder?', 'yn', default='n')
if inp.lower() == 'y':
request['folder_type'] = 'shared_folder'
pq = 'Default user permissions: (A)ll | Manage (U)sers / (R)ecords; Can (E)dit / (S)hare records?'
inp = user_choice(pq, 'aures', multi_choice=True)
request['manage_users'] = False
request['manage_records'] = False
request['can_edit'] = False
request['can_share'] = False
if len(inp) > 0:
s1 = set([x.lower() for x in inp])
if 'a' in s1:
request['manage_users'] = True
request['manage_records'] = True
request['can_edit'] = True
request['can_share'] = True
else:
if 'u' in s1:
request['manage_users'] = True
if 'r' in s1:
request['manage_records'] = True
def execute(self, params, **kwargs):
uc = user_choice('Are you sure you want to delete all Keeper records on the server?', 'yn', default='n')
if uc.lower() == 'y':
api.sync_down(params)
if len(params.record_cache) == 0:
logging.warning('No records to delete')
return
request = {
'command': 'record_update',
'delete_records': [key for key in params.record_cache.keys()]
}
logging.info('removing %s records from Keeper', len(params.record_cache))
response_json = api.communicate(params, request)
success = [info for info in response_json['delete_records'] if info['status'] == 'success']
if len(success) > 0:
logging.info("%s records deleted successfully", len(success))
failures = [info for info in response_json['delete_records'] if info['status'] != 'success']
if folder.type == BaseFolderNode.SharedFolderType:
if folder.uid in params.shared_folder_cache:
sf = params.shared_folder_cache[folder.uid]
rq = {
'command': 'shared_folder_update',
'operation': 'delete',
'shared_folder_uid': sf['shared_folder_uid']
}
if 'shared_folder_key' not in sf:
if 'teams' in sf:
for team in sf['teams']:
rq['from_team_uid'] = team['team_uid']
break
np = 'y' if force else user_choice('Do you want to proceed with deletion?', 'yn', default='n')
if np.lower() == 'y':
api.communicate(params, rq)
params.sync_data = True
else:
del_obj = {
'delete_resolution': 'unlink',
'object_uid': folder.uid,
'object_type': 'user_folder' if folder.type == BaseFolderNode.UserFolderType else 'shared_folder_folder'
}
if parent is None:
del_obj['from_type'] = 'user_folder'
else:
del_obj['from_uid'] = parent.uid
del_obj['from_type'] = parent.type
if parent.type == BaseFolderNode.SharedFolderType:
del_obj['from_type'] = 'shared_folder_folder'
def execute(self, params, **kwargs):
if params.enforcements:
if 'enterprise_invited' in params.enforcements:
print('You\'ve been invited to join {0}.'.format(params.enforcements['enterprise_invited']))
action = user_choice('A(ccept)/D(ecline)/I(gnore)?: ', 'adi')
action = action.lower()
if action == 'a':
action = 'accept'
elif action == 'd':
action = 'decline'
if action in ['accept', 'decline']:
e_rq = {
'command': '{0}_enterprise_invite'.format(action)
}
if action == 'accept':
verification_code = input('Please enter the verification code sent via email: ')
if verification_code:
e_rq['verification_code'] = verification_code
else:
e_rq = None
if e_rq:
def execute(self, params, **kwargs):
api.sync_down(params)
accepted = False
if len(params.pending_share_requests) > 0:
for user in params.pending_share_requests:
accepted = False
print('Note: You have pending share request from ' + user)
answer = user_choice('Do you want to accept these request?', 'yn', 'n')
rq = {
'command': 'accept_share' if answer == 'y' else 'cancel_share',
'from_email': user
}
try:
rs = api.communicate(params, rq)
if rs['result'] == 'success':
accepted = accepted or answer == 'y'
except Exception as e:
logging.debug('Accept share exception: %s', e)
params.pending_share_requests.clear()
if accepted:
params.sync_data = True
else:
logging.error('Shared folders cannot be nested')
return
elif user_folder:
if base_folder.type in {BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType}:
request['folder_type'] = 'shared_folder_folder'
else:
request['folder_type'] = 'user_folder'
if request.get('folder_type') is None:
if base_folder.type in {BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType}:
request['folder_type'] = 'shared_folder_folder'
if request.get('folder_type') is None:
inp = user_choice('Do you want to create a shared folder?', 'yn', default='n')
if inp.lower() == 'y':
request['folder_type'] = 'shared_folder'
pq = 'Default user permissions: (A)ll | Manage (U)sers / (R)ecords; Can (E)dit / (S)hare records?'
inp = user_choice(pq, 'aures', multi_choice=True)
request['manage_users'] = False
request['manage_records'] = False
request['can_edit'] = False
request['can_share'] = False
if len(inp) > 0:
s1 = set([x.lower() for x in inp])
if 'a' in s1:
request['manage_users'] = True
request['manage_records'] = True
request['can_edit'] = True
request['can_share'] = True
else:
if len(table) > 0:
headers = ['#', 'Shared Folder UID', 'Shared Folder Name', 'Record UID', 'Record Title']
if change_edit:
headers.append('Can Edit')
if change_share:
headers.append('Can Share')
logging.info('')
title = (bcolors.OKGREEN + ' {0}' + bcolors.ENDC + ' Shared Folder Record Share permission(s)') \
.format('GRANT' if should_have else 'REVOKE')
dump_report_data(table, headers, title=title)
logging.info('')
logging.info('')
if not kwargs.get('dry_run') and (len(shared_folder_update) > 0 or len(direct_shares_update) > 0):
print('\n\n' + bcolors.WARNING + bcolors.BOLD + 'ALERT!!!' + bcolors.ENDC)
answer = user_choice("Do you want to proceed with these permission changes?", 'yn', 'n') \
if not kwargs.get('force') else 'Y'
if answer.lower() == 'y':
table = []
while len(direct_shares_update) > 0:
batch = direct_shares_update[:80]
direct_shares_update = direct_shares_update[80:]
rq = {
'command': 'record_share_update',
'pt': 'Commander',
'update_shares': batch
}
rs = api.communicate(params, rq)
if 'update_statuses' in rs:
for i, status in enumerate(rs['update_statuses']):
code = status['status']
elif target == 'json':
log_export = AuditLogJsonExport()
elif target == 'azure-la':
log_export = AuditLogAzureLogAnalyticsExport()
else:
print('Audit log export: unsupported target')
return
record = None
record_name = kwargs.get('record') or log_export.default_record_title()
for r_uid in params.record_cache:
rec = api.get_record(params, r_uid)
if record_name in [rec.record_uid, rec.title]:
record = rec
if record is None:
answer = user_choice('Do you want to create a Keeper record to store audit log settings?', 'yn', 'n')
if answer.lower() == 'y':
record_title = input('Choose the title for audit log record [Default: {0}]: '.format(record_name)) or log_export.default_record_title()
cmd = RecordAddCommand()
record_uid = cmd.execute(params, **{
'title': record_title,
'force': True
})
if record_uid:
api.sync_down(params)
record = api.get_record(params, record_uid)
if record is None:
return
props = {}
props['enterprise_name'] = params.enterprise['enterprise_name']
log_export.store_record = False