Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import json
from data_vault import VaultEnvironment
from keepercommander import api
from keepercommander.params import KeeperParams
_TREE_KEY = api.generate_aes_key()
_ENTERPRISE_ID = 123
_VAULT_ENV = VaultEnvironment()
_USE_DATA_KEY = True
_TEAM_KEY = api.generate_aes_key()
_TEAM1_UID = api.generate_record_uid()
_TEAM2_UID = api.generate_record_uid()
_TEAM1_NAME = 'Team 1'
_TEAM2_NAME = 'Team 2'
_NODE1_ID = (_ENTERPRISE_ID << 32) + 101
_NODE2_ID = (_ENTERPRISE_ID << 32) + 102
_USER1_ID = (_ENTERPRISE_ID << 32) + 201
_USER2_ID = (_ENTERPRISE_ID << 32) + 202
_USER2_EMAIL = 'user2@keepercommander.com'
_ROLE1_ID = (_ENTERPRISE_ID << 32) + 301
_ROLE1_NAME = 'Role 1'
_LAST_ID = 1000
r2 = record.Record()
r2.record_uid = api.generate_record_uid()
r2.title = 'Record 2'
r2.login = 'user2@keepersecurity.com'
r2.password = 'password2'
r2.login_url = 'https://keepersecurity.com/2'
r2.set_field('field2', 'value2')
r2.notes = 'note2'
r2.revision = 2
r2_key = register_record(r2, 2)
register_records_to_folder(None, [r1.record_uid, r2.record_uid])
r3 = record.Record()
r3.record_uid = api.generate_record_uid()
r3.title = 'Record 3'
r3.login = 'user3@keepersecurity.com'
r3.password = 'password3'
r3.login_url = 'https://keepersecurity.com/3'
r3.revision = 3
r3_key = register_record(r3)
sf1 = shared_folder.SharedFolder()
sf1.shared_folder_uid = api.generate_record_uid()
sf1.default_manage_records = False
sf1.default_manage_users = False
sf1.default_can_edit = False
sf1.default_can_share = False
sf1.name = 'Shared Folder 1'
sf1_key = register_shared_folder(sf1, {
r3.record_uid: r3_key
sf1 = shared_folder.SharedFolder()
sf1.shared_folder_uid = api.generate_record_uid()
sf1.default_manage_records = False
sf1.default_manage_users = False
sf1.default_can_edit = False
sf1.default_can_share = False
sf1.name = 'Shared Folder 1'
sf1_key = register_shared_folder(sf1, {
r3.record_uid: r3_key
})
register_records_to_folder(sf1.shared_folder_uid, [r3.record_uid])
_USER_FOLDER_SHARED_FOLDER.append({'shared_folder_uid': sf1.shared_folder_uid})
t1 = team.Team()
t1.team_uid = api.generate_record_uid()
t1.name = 'Team 1'
t1.restrict_edit = True
t1.restrict_share = True
t1.restrict_view = False
register_team(t1, 1, {
sf1.shared_folder_uid: sf1_key
})
folder_key = api.generate_aes_key()
_USER_FOLDERS.append({
'folder_uid': api.generate_record_uid(),
'key_type': 1,
'user_folder_key': api.encrypt_aes(folder_key, _USER_DATA_KEY),
'revision': 200,
'type': 'user_folder',
_USER_FOLDER_SHARED_FOLDER.append({'shared_folder_uid': sf1.shared_folder_uid})
t1 = team.Team()
t1.team_uid = api.generate_record_uid()
t1.name = 'Team 1'
t1.restrict_edit = True
t1.restrict_share = True
t1.restrict_view = False
register_team(t1, 1, {
sf1.shared_folder_uid: sf1_key
})
folder_key = api.generate_aes_key()
_USER_FOLDERS.append({
'folder_uid': api.generate_record_uid(),
'key_type': 1,
'user_folder_key': api.encrypt_aes(folder_key, _USER_DATA_KEY),
'revision': 200,
'type': 'user_folder',
'data': api.encrypt_aes(json.dumps({'name': 'User Folder 1'}).encode('utf-8'), folder_key)
})
r2.revision = 2
r2_key = register_record(r2, 2)
register_records_to_folder(None, [r1.record_uid, r2.record_uid])
r3 = record.Record()
r3.record_uid = api.generate_record_uid()
r3.title = 'Record 3'
r3.login = 'user3@keepersecurity.com'
r3.password = 'password3'
r3.login_url = 'https://keepersecurity.com/3'
r3.revision = 3
r3_key = register_record(r3)
sf1 = shared_folder.SharedFolder()
sf1.shared_folder_uid = api.generate_record_uid()
sf1.default_manage_records = False
sf1.default_manage_users = False
sf1.default_can_edit = False
sf1.default_can_share = False
sf1.name = 'Shared Folder 1'
sf1_key = register_shared_folder(sf1, {
r3.record_uid: r3_key
})
register_records_to_folder(sf1.shared_folder_uid, [r3.record_uid])
_USER_FOLDER_SHARED_FOLDER.append({'shared_folder_uid': sf1.shared_folder_uid})
t1 = team.Team()
t1.team_uid = api.generate_record_uid()
t1.name = 'Team 1'
t1.restrict_edit = True
t1.restrict_share = True
request['manage_records'] = True
request['can_edit'] = True
request['can_share'] = True
else:
if 'u' in s1:
request['manage_users'] = True
if 'r' in s1:
request['manage_records'] = True
if 'e' in s1:
request['can_edit'] = True
if 's' in s1:
request['can_share'] = True
else:
request['folder_type'] = 'user_folder'
folder_uid = api.generate_record_uid()
request['folder_uid'] = folder_uid
folder_key = os.urandom(32)
encryption_key = params.data_key
if request['folder_type'] == 'shared_folder_folder':
sf_uid = base_folder.shared_folder_uid if base_folder.type == BaseFolderNode.SharedFolderFolderType else base_folder.uid
sf = params.shared_folder_cache[sf_uid]
encryption_key = sf['shared_folder_key_unencrypted']
request['shared_folder_uid'] = sf_uid
request['key'] = api.encrypt_aes(folder_key, encryption_key)
if base_folder.type not in {BaseFolderNode.RootFolderType, BaseFolderNode.SharedFolderType}:
request['parent_uid'] = base_folder.uid
name = name or ''
while len(name.strip()) == 0:
parent_uid = ''
parent_type = ''
parent_key = None
comps = list(path_components(fol.path))
for i in range(len(comps)):
comp = comps[i]
h = hashlib.md5()
hs = '{0}|{1}'.format(comp.lower(), parent_uid)
h.update(hs.encode())
digest = h.hexdigest()
is_last = False
if i == len(comps) - 1:
is_last = True
if digest not in folder_hash:
folder_uid = api.generate_record_uid()
request = {
'command': 'folder_add',
'folder_uid': folder_uid
}
folder_type = 'shared_folder' if is_last else 'user_folder'
request['folder_type'] = folder_type
encryption_key = params.data_key
folder_key = os.urandom(32)
request['key'] = api.encrypt_aes(folder_key, encryption_key)
if parent_uid:
request['parent_uid'] = parent_uid
if folder_type == 'shared_folder':
request['name'] = api.encrypt_aes(comp.encode('utf-8'), folder_key)
data = {'name': comp}
path = fol.domain if is_domain else fol.path
if not path:
continue
comps = list(path_components(path))
for i in range(len(comps)):
comp = comps[i]
h = hashlib.md5()
hs = '{0}|{1}'.format(comp.lower(), parent_uid)
h.update(hs.encode())
digest = h.hexdigest()
if digest not in folder_hash:
is_shared = False
if i == len(comps) - 1:
is_shared = is_domain
folder_uid = api.generate_record_uid()
request = {
'command': 'folder_add',
'folder_uid': folder_uid
}
if parent_type in {BaseFolderNode.UserFolderType, BaseFolderNode.RootFolderType}:
folder_type = 'shared_folder' if is_shared else 'user_folder'
else:
folder_type = 'shared_folder_folder'
request['folder_type'] = folder_type
encryption_key = params.data_key
if request['folder_type'] == 'shared_folder_folder' and parent_shared_folder_uid and parent_shared_folder_key:
encryption_key = parent_shared_folder_key
request['shared_folder_uid'] = parent_shared_folder_uid
folder_key = os.urandom(32)
h = hashlib.md5()
hs = '{0}|{1}|{2}'.format(rec.title or '', rec.login or '', rec.password or '')
h.update(hs.encode())
record_hash[h.hexdigest()] = r_uid
record_adds = []
for rec in records:
h = hashlib.md5()
hs = '{0}|{1}|{2}'.format(rec.title or '', rec.login or '', rec.password or '')
h.update(hs.encode())
rec_hash = h.hexdigest()
record_uid = record_hash.get(rec_hash)
if record_uid is None:
record_key = os.urandom(32)
record_uid = api.generate_record_uid()
req = {
'command': 'record_add',
'record_uid': record_uid,
'record_type': 'password',
'record_key': api.encrypt_aes(record_key, params.data_key),
'how_long_ago': 0,
'folder_type': 'user_folder'
}
folder_uid = None
if rec.folders:
if len(rec.folders) > 0:
folder_uid = rec.folders[0].uid
if folder_uid:
if folder_uid in params.folder_cache:
folder = params.folder_cache[folder_uid]
if folder.type in {BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType}:
rs = api.communicate(params, rq)
if rs['result'] == 'success':
logging.info('Team %s deleted', team['name'])
api.query_enterprise(params)
else:
logging.warning('Team not found')
return
if kwargs.get('add'):
if team is None:
if node_id is None:
for node in params.enterprise['nodes']:
if not node.get('parent_id'):
node_id = node['node_id']
break
team_uid = api.generate_record_uid()
team_key = api.generate_aes_key()
rsa_key = RSA.generate(2048)
private_key = DerSequence([0,
rsa_key.n,
rsa_key.e,
rsa_key.d,
rsa_key.p,
rsa_key.q,
rsa_key.d % (rsa_key.p-1),
rsa_key.d % (rsa_key.q-1),
Integer(rsa_key.q).inverse(rsa_key.p)
]).encode()
pub_key = rsa_key.publickey()
public_key = DerSequence([pub_key.n,
pub_key.e
]).encode()