Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
meta_data = {
'record_uid': record.record_uid,
'owner': key_type in [0, 1],
'can_share': key_type == 1,
'can_edit': key_type == 1,
'record_key_type': key_type
}
if key_type == 0:
_RECORD_METADATA.append(meta_data)
if key_type == 1:
meta_data['record_key'] = api.encrypt_aes(record_key, _USER_DATA_KEY)
_RECORD_METADATA.append(meta_data)
elif key_type == 2:
meta_data['record_key'] = api.encrypt_rsa(record_key, _IMPORTED_PUBLIC_KEY)
_RECORD_METADATA.append(meta_data)
return record_key
def register_team(team, key_type, sfs=None):
# type: (team.Team, int, dict) -> bytes
team_key = api.generate_aes_key()
t = {
'team_uid': team.team_uid,
'name': team.name,
'team_key_type': key_type,
'team_key': api.encrypt_aes(team_key, _USER_DATA_KEY) if key_type == 1 else api.encrypt_rsa(team_key, _IMPORTED_PUBLIC_KEY),
'team_private_key': api.encrypt_aes(_DER_PRIVATE_KEY, team_key),
'restrict_edit': team.restrict_edit,
'restrict_share': team.restrict_share,
'restrict_view': team.restrict_view,
}
_TEAMS.append(t)
if sfs:
t['shared_folder_keys'] = [{
'shared_folder_uid': x[0],
'key_type': 1,
'shared_folder_key': api.encrypt_aes(x[1], team_key)
} for x in sfs.items()]
sf_uids = set()
for uid in sfs:
def get_enterprise_data(params, rq):
# type: (KeeperParams, dict) -> dict
encrypted_tree_key = api.encrypt_aes(_TREE_KEY, params.data_key) if _USE_DATA_KEY else api.encrypt_rsa(_TREE_KEY, _VAULT_ENV.public_key)
tree_key_type = 1 if _USE_DATA_KEY else 2
rs = {
'result': 'success',
'result_code': '',
'message': '',
'enterprise_name': 'Enterprise 1',
'tree_key': encrypted_tree_key,
'key_type_id': tree_key_type
}
includes = set(rq.get('include') or [])
ent_id = _ENTERPRISE_ID << 32
if 'nodes' in includes:
rs['nodes'] = [
{
'node_id': _NODE1_ID,
'encrypted_data': api.encrypt_aes(json.dumps({'displayname': 'Root node'}).encode('utf-8'), _TREE_KEY)
for user_id in users:
is_add, user_email = users[user_id]
rq = {
'command': 'role_user_add' if is_add else 'role_user_remove',
'role_id': role['role_id'],
'enterprise_user_id': user_id
}
if is_add:
if has_managed_nodes:
public_key = self.get_public_key(params, user_email)
if public_key:
rq['tree_key'] = api.encrypt_rsa(params.enterprise['unencrypted_tree_key'], public_key)
if role_key:
public_key = self.get_public_key(params, user_email)
if public_key:
rq['role_admin_key'] = api.encrypt_rsa(role_key, public_key)
rs = api.communicate(params, rq)
if rs['result'] == 'success':
logging.info('User %s %s role %s', user_email, 'added to' if is_add else 'removed from', role['data'].get('displayname') or '')
api.query_enterprise(params)
if role:
if show_info:
role_id = role['role_id']
print('{0:>24s}: {1}'.format('Role ID', role_id))
print('{0:>24s}: {1}'.format('Role Name', role['data'].get('displayname')))
print('{0:>24s}: {1}'.format('Node', self.get_node_path(params, role['node_id'])))
print('{0:>24s}: {1}'.format('Cascade?', 'Yes' if role['visible_below'] else 'No'))
print('{0:>24s}: {1}'.format('New user?', 'Yes' if role['new_user_inherit'] else 'No'))
if 'role_users' in params.enterprise:
user_ids = [x['enterprise_user_id'] for x in params.enterprise['role_users'] if x['role_id'] == role_id]
teams[team_uid] = is_add, team_node['name']
else:
logging.warning('Team %s could be resolved', t)
if len(teams) > 0:
for team_uid in teams:
is_add, team_name = teams[team_uid]
rq = {
'command': 'team_enterprise_user_add' if is_add else 'team_enterprise_user_remove',
'enterprise_user_id': user['enterprise_user_id'],
'team_uid': team_uid
}
if is_add:
team_key = self.get_team_key(params, team_uid)
public_key = self.get_public_key(params, user['username'])
if team_key and public_key:
rq['team_key'] = api.encrypt_rsa(team_key, public_key)
rq['user_type'] = 0
rs = api.communicate(params, rq)
if rs['result'] == 'success':
logging.info('Team %s %s %s', team_name, 'added to' if is_add else 'removed from', user['username'])
api.query_enterprise(params)
elif user_name or node_id:
dt = user['data'].copy()
if user_name:
dt['displayname'] = user_name
rq = {
'command': 'enterprise_user_update',
'enterprise_user_id': user['enterprise_user_id'],
'node_id': node_id if node_id is not None else user['node_id'],
'encrypted_data': api.encrypt_aes(json.dumps(dt).encode('utf-8'), params.enterprise['unencrypted_tree_key']),
'enterprise_user_username': user['username']
request['add_users'].append({
'username': email,
'manage_users': perm.manage_users,
'manage_records': perm.manage_records,
'shared_folder_key': api.encrypt_aes(parent_key, params.data_key)
})
elif email in emails:
public_key = emails[email]
if public_key:
try:
rsa_key = RSA.importKey(base64.urlsafe_b64decode(public_key + '=='))
request['add_users'].append({
'username': email,
'manage_users': perm.manage_users,
'manage_records': perm.manage_records,
'shared_folder_key': api.encrypt_rsa(parent_key, rsa_key)
})
except:
pass
shared_folder_add.append(request)
return shared_folder_add
uo['manage_users'] = True
share_action = 'update_users'
else:
if mr or mu:
if mr:
uo['manage_records'] = False
if mu:
uo['manage_users'] = False
share_action = 'update_users'
else:
share_action = 'remove_users'
elif action == 'grant':
uo['manage_records'] = mr
uo['manage_users'] = mu
rsa_key = RSA.importKey(base64.urlsafe_b64decode(public_keys[email] + '=='))
uo['shared_folder_key'] = api.encrypt_rsa(sh_fol['shared_folder_key_unencrypted'], rsa_key)
share_action = 'add_users'
if share_action:
if not share_action in request:
request[share_action] = []
request[share_action].append(uo)
if len(team_keys) > 0:
team_set = set()
if 'teams' in sh_fol:
for team in sh_fol['teams']:
team_set.add(team['team_uid'])
mr = kwargs.get('manage_records')
mu = kwargs.get('manage_users')
for team_uid in team_keys: