Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def register_team(team, key_type, sfs=None):
# type: (team.Team, int, dict) -> bytes
team_key = api.generate_aes_key()
t = {
'team_uid': team.team_uid,
'name': team.name,
'team_key_type': key_type,
'team_key': api.encrypt_aes(team_key, _USER_DATA_KEY) if key_type == 1 else api.encrypt_rsa(team_key, _IMPORTED_PUBLIC_KEY),
'team_private_key': api.encrypt_aes(_DER_PRIVATE_KEY, team_key),
'restrict_edit': team.restrict_edit,
'restrict_share': team.restrict_share,
'restrict_view': team.restrict_view,
}
_TEAMS.append(t)
if sfs:
t['shared_folder_keys'] = [{
'shared_folder_uid': x[0],
'key_type': 1,
'shared_folder_key': api.encrypt_aes(x[1], team_key)
} for x in sfs.items()]
sf_uids = set()
for uid in sfs:
sf_uids.add(uid)
request['can_share'] = True
else:
request['folder_type'] = 'user_folder'
folder_uid = api.generate_record_uid()
request['folder_uid'] = folder_uid
folder_key = os.urandom(32)
encryption_key = params.data_key
if request['folder_type'] == 'shared_folder_folder':
sf_uid = base_folder.shared_folder_uid if base_folder.type == BaseFolderNode.SharedFolderFolderType else base_folder.uid
sf = params.shared_folder_cache[sf_uid]
encryption_key = sf['shared_folder_key_unencrypted']
request['shared_folder_uid'] = sf_uid
request['key'] = api.encrypt_aes(folder_key, encryption_key)
if base_folder.type not in {BaseFolderNode.RootFolderType, BaseFolderNode.SharedFolderType}:
request['parent_uid'] = base_folder.uid
name = name or ''
while len(name.strip()) == 0:
name = input("... Folder Name: ")
name = name.strip()
is_slash = False
for x in range(0, len(name)-2):
if name[x] == '/':
is_slash = not is_slash
else:
if is_slash:
logging.warning('Character "/" is reserved. Use "//" inside folder name')
share_action = 'update_records'
else:
if ce or cs:
if ce:
ro['can_edit'] = False
if cs:
ro['can_share'] = False
share_action = 'update_records'
else:
share_action = 'remove_records'
else:
if action == 'grant':
ro['can_edit'] = ce
ro['can_share'] = cs
rec = params.record_cache[record_uid]
ro['record_key'] = api.encrypt_aes(rec['record_key_unencrypted'], sh_fol['shared_folder_key_unencrypted'])
share_action = 'add_records'
if share_action:
if not share_action in request:
request[share_action] = []
request[share_action].append(ro)
response = api.communicate(params, request)
params.sync_data = True
for node in ['add_teams', 'update_teams', 'remove_teams']:
if node in response:
for t in response[node]:
team = api.get_team(params, t['team_uid'])
if t['status'] == 'success':
logging.warning('Team share \'%s\' %s', team.name, 'added' if node == 'add_teams' else 'updated' if node == 'update_teams' else 'removed')
else:
def prepare_transition_keys(params, folder, keys, encryption_key):
for f_uid in folder.subfolders:
f = params.folder_cache[f_uid]
FolderMoveCommand.prepare_transition_keys(params, f, keys, encryption_key)
sf = params.subfolder_cache[folder.uid]
transition_key = api.encrypt_aes(sf['folder_key_unencrypted'], encryption_key)
keys.append({
'uid': folder.uid,
'key': transition_key
})
if folder.uid in params.subfolder_record_cache:
for r_uid in params.subfolder_record_cache[folder.uid]:
rec = params.record_cache[r_uid]
transition_key = api.encrypt_aes(rec['record_key_unencrypted'], encryption_key)
keys.append({
'uid': r_uid,
'key': transition_key
})
commands = []
record_keys = {}
for email in emails:
if emails[email]:
record_keys[email] = {}
if template_records:
for r in template_records:
record = copy.deepcopy(r)
EnterprisePushCommand.substitute_record_params(params, email, record)
record_uid = api.generate_record_uid()
record_key = api.generate_aes_key()
record_add_command = {
'command': 'record_add',
'record_uid': record_uid,
'record_type': 'password',
'record_key': api.encrypt_aes(record_key, params.data_key),
'folder_type': 'user_folder',
'how_long_ago': 0
}
data = {
'title': record.get('title') or '',
'secret1': record.get('login') or '',
'secret2': record.get('password') or '',
'link': record.get('login_url') or '',
'notes': record.get('notes') or ''
}
if 'custom_fields' in record:
data['custom'] = [{
'name': x[0],
'value': x[1]
} for x in record['custom_fields'].items()]
totp = rec.custom_fields[cf]
else:
custom_fields.append({
'name': cf,
'value': rec.custom_fields[cf]
})
data = {
'title': rec.title or '',
'secret1': rec.login or '',
'secret2': rec.password or '',
'link': rec.login_url or '',
'notes': rec.notes or '',
'custom': custom_fields
}
req['data'] = api.encrypt_aes(json.dumps(data).encode('utf-8'), record_key)
if totp:
extra = {
'fields': [
{
'id': api.generate_record_uid(),
'field_type': 'totp',
'field_title': 'Two-Factor Code',
'type': 0,
'data': totp
}]
}
req['extra'] = api.encrypt_aes(json.dumps(extra).encode('utf-8'), record_key)
record_adds.append(req)
rec.uid = record_uid
is_last = False
if i == len(comps) - 1:
is_last = True
if digest not in folder_hash:
folder_uid = api.generate_record_uid()
request = {
'command': 'folder_add',
'folder_uid': folder_uid
}
folder_type = 'shared_folder' if is_last else 'user_folder'
request['folder_type'] = folder_type
encryption_key = params.data_key
folder_key = os.urandom(32)
request['key'] = api.encrypt_aes(folder_key, encryption_key)
if parent_uid:
request['parent_uid'] = parent_uid
if folder_type == 'shared_folder':
request['name'] = api.encrypt_aes(comp.encode('utf-8'), folder_key)
data = {'name': comp}
request['data'] = api.encrypt_aes(json.dumps(data).encode('utf-8'), folder_key)
shared_folder_add.append(request)
parent_uid = folder_uid
parent_type = folder_type
parent_key = folder_key
folder_hash[digest] = folder_uid, folder_type, folder_key if folder_type == 'shared_folder' else None
else:
parent_uid, parent_type, parent_key = folder_hash[digest]
if is_last:
'command': 'folder_add',
'folder_uid': folder_uid
}
folder_type = 'shared_folder' if is_last else 'user_folder'
request['folder_type'] = folder_type
encryption_key = params.data_key
folder_key = os.urandom(32)
request['key'] = api.encrypt_aes(folder_key, encryption_key)
if parent_uid:
request['parent_uid'] = parent_uid
if folder_type == 'shared_folder':
request['name'] = api.encrypt_aes(comp.encode('utf-8'), folder_key)
data = {'name': comp}
request['data'] = api.encrypt_aes(json.dumps(data).encode('utf-8'), folder_key)
shared_folder_add.append(request)
parent_uid = folder_uid
parent_type = folder_type
parent_key = folder_key
folder_hash[digest] = folder_uid, folder_type, folder_key if folder_type == 'shared_folder' else None
else:
parent_uid, parent_type, parent_key = folder_hash[digest]
if is_last:
skip_folder = parent_type != 'shared_folder'
else:
skip_folder = parent_type != 'user_folder'
if skip_folder:
break
if not skip_folder and parent_type == 'shared_folder':
pub_key = rsa_key.publickey()
public_key = DerSequence([pub_key.n,
pub_key.e
]).encode()
rq = {
'command': 'team_add',
'team_uid': team_uid,
'team_name': t_arg,
'restrict_edit': kwargs.get('restrict_edit') == 'on' if kwargs.get('restrict_edit') else False,
'restrict_share': kwargs.get('restrict_share') == 'on' if kwargs.get('restrict_share') else False,
'restrict_view': kwargs.get('restrict_view') == 'on' if kwargs.get('restrict_view') else False,
'public_key': base64.urlsafe_b64encode(public_key).rstrip(b'=').decode(),
'private_key': api.encrypt_aes(private_key, team_key),
'node_id': node_id,
'team_key': api.encrypt_aes(team_key, params.data_key),
'manage_only': True
}
rs = api.communicate(params, rq)
if rs['result'] == 'success':
logging.info('Team %s created', t_arg)
api.query_enterprise(params)
params.environment_variables[LAST_TEAM_UID] = team_uid
else:
logging.warning('Team %s already exists', t_arg)
return
if team:
show_info = True
team_name = kwargs.get('name')
if team_name or node_id or kwargs.get('restrict_edit') or kwargs.get('restrict_share') or kwargs.get('restrict_view'):
rq = {