Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def register_shared_folder(shared_folder, records):
# type: (shared_folder.SharedFolder, dict) -> bytes
shared_folder_key = api.generate_aes_key()
sf = {
'shared_folder_uid': shared_folder.shared_folder_uid,
'key_type': 1,
'shared_folder_key': api.encrypt_aes(shared_folder_key, _USER_DATA_KEY),
'name': api.encrypt_aes(shared_folder.name.encode('utf-8'), shared_folder_key),
'is_account_folder': False,
'manage_records': False,
'manage_users': False,
'default_manage_records': True,
'default_manage_users': True,
'default_can_edit': True,
'default_can_share': True,
'full_sync': True,
'records': [{
'record_uid': x[0],
'record_key': api.encrypt_aes(x[1], shared_folder_key),
sfo['users'] = [{
'username': u['username'],
'manage_records': u['manage_records'],
'manage_users': u['manage_users']
} for u in sf.users]
if sf.teams:
sfo['teams'] = [{
'name': t['name'],
'manage_records': t['manage_records'],
'manage_users': t['manage_users']
} for t in sf.teams]
print(json.dumps(sfo, indent=2))
else:
sf.display()
elif api.is_team(params, uid):
team = api.get_team(params, uid)
if fmt == 'json':
to = {
'team_uid': team.team_uid,
'name': team.name,
'restrict_edit': team.restrict_edit,
'restrict_view': team.restrict_view,
'restrict_share': team.restrict_share
}
print(json.dumps(to, indent=2))
else:
team.display()
elif uid in params.folder_cache:
f = params.folder_cache[uid]
if fmt == 'json':
fo = {
folder_name = kwargs['folder'] if 'folder' in kwargs else None
if folder_name:
if folder_name in params.folder_cache:
folder = params.folder_cache[folder_name]
else:
src = try_resolve_path(params, folder_name)
if src is not None:
folder, name = src
if folder is None:
folder = params.folder_cache[params.current_folder] if params.current_folder else params.root_folder
if not force:
folder_uid = folder.uid or ''
if folder_uid in params.subfolder_record_cache:
for uid in params.subfolder_record_cache[folder_uid]:
r = api.get_record(params, uid)
if r.title == title:
logging.error('Record with title "%s" already exists', title)
return
record_key = os.urandom(32)
record_uid = api.generate_record_uid()
rq = {
'command': 'record_add',
'record_uid': record_uid,
'record_type': 'password',
'record_key': api.encrypt_aes(record_key, params.data_key),
'how_long_ago': 0
}
if folder.type in {BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType}:
rq['folder_uid'] = folder.uid
rq['folder_type'] = 'shared_folder' if folder.type == BaseFolderNode.SharedFolderType else 'shared_folder_folder'
else:
team.display()
elif uid in params.folder_cache:
f = params.folder_cache[uid]
if fmt == 'json':
fo = {
'folder_uid': f.uid,
'type': f.type,
'name': f.name
}
print(json.dumps(fo, indent=2))
else:
f.display(params=params)
else:
api.get_record_shares(params, [uid])
r = api.get_record(params, uid)
if r:
params.queue_audit_event('open_record', record_uid=uid)
if fmt == 'json':
ro = {
'record_uid': r.record_uid,
'title': r.title
}
if r.login:
ro['login'] = r.login
if r.password:
ro['password'] = r.password
if r.login_url:
ro['login_url'] = r.login_url
if r.notes:
ro['notes'] = r.notes
if r.custom_fields:
'name': name,
'value': value
})
if custom:
for c in custom:
if c['value']:
record.set_field(c['name'], c['value'])
changed = True
else:
deleted = record.remove_field(c['name'])
if deleted:
changed = True
if changed:
params.sync_data = True
api.update_record(params, record)
'to_username': email,
'record_uid': record_uid,
'record_key': record_key,
'transfer': True
})
while transfers:
chunk = transfers[:90]
transfers = transfers[90:]
commands.append({
'command': 'record_share_update',
'pt': 'Commander',
'add_shares': chunk
})
api.execute_batch(params, commands)
params.sync_data = True
else:
logging.warning('User %s could be resolved', u)
if len(users) > 0:
for user_id in users:
is_add, user_email = users[user_id]
rq = {
'command': 'team_enterprise_user_add' if is_add else 'team_enterprise_user_remove',
'team_uid': team['team_uid'],
'enterprise_user_id': user_id
}
if is_add:
public_key = self.get_public_key(params, user_email)
team_key = self.get_team_key(params, team['team_uid'])
if public_key and team_key:
rq['user_type'] = 0
rq['team_key'] = api.encrypt_rsa(team_key, public_key)
rs = api.communicate(params, rq)
if rs['result'] == 'success':
api.query_enterprise(params)
logging.info('User %s %s team %s', user_email, 'added to' if is_add else 'removed from', team['name'])
if show_info:
team_uid = team['team_uid']
print('{0:>24s}: {1}'.format('Team UID', team_uid))
print('{0:>24s}: {1}'.format('Team Name', team['name']))
print('{0:>24s}: {1:<32s} {2}'.format('Node', self.get_node_path(params, team['node_id']), str(team['node_id'])))
print('{0:>24s}: {1}'.format('Restrict Edit?', 'Yes' if team['restrict_edit'] else 'No'))
print('{0:>24s}: {1}'.format('Restrict Share?', 'Yes' if team['restrict_sharing'] else 'No'))
print('{0:>24s}: {1}'.format('Restrict View?', 'Yes' if team['restrict_view'] else 'No'))
if 'team_users' in params.enterprise:
user_ids = [x['enterprise_user_id'] for x in params.enterprise['team_users'] if x['team_uid'] == team_uid]