Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_kfrags():
vector_file = os.path.join('vectors', 'vectors_kfrags.json')
try:
with open(vector_file) as f:
vector_suite = json.load(f)
except OSError:
raise
verifying_key = UmbralPublicKey.from_bytes(bytes.fromhex(vector_suite['verifying_key']))
delegating_key = UmbralPublicKey.from_bytes(bytes.fromhex(vector_suite['delegating_key']))
receiving_key = UmbralPublicKey.from_bytes(bytes.fromhex(vector_suite['receiving_key']))
for json_kfrag in vector_suite['vectors']:
kfrag = KFrag.from_bytes(bytes.fromhex(json_kfrag['kfrag']))
assert kfrag.verify(signing_pubkey=verifying_key,
delegating_pubkey=delegating_key,
receiving_pubkey=receiving_key), \
'Invalid KFrag {}'.format(kfrag.to_bytes().hex())
def test_public_key_serialization(random_ec_curvebn1):
priv_key = random_ec_curvebn1
params = default_params()
pub_key = priv_key * params.g
umbral_key = UmbralPublicKey(pub_key, params)
encoded_key = umbral_key.to_bytes()
decoded_key = UmbralPublicKey.from_bytes(encoded_key)
assert pub_key == decoded_key.point_key
def test_kfrags():
vector_file = os.path.join('vectors', 'vectors_kfrags.json')
try:
with open(vector_file) as f:
vector_suite = json.load(f)
except OSError:
raise
verifying_key = UmbralPublicKey.from_bytes(bytes.fromhex(vector_suite['verifying_key']))
delegating_key = UmbralPublicKey.from_bytes(bytes.fromhex(vector_suite['delegating_key']))
receiving_key = UmbralPublicKey.from_bytes(bytes.fromhex(vector_suite['receiving_key']))
for json_kfrag in vector_suite['vectors']:
kfrag = KFrag.from_bytes(bytes.fromhex(json_kfrag['kfrag']))
assert kfrag.verify(signing_pubkey=verifying_key,
delegating_pubkey=delegating_key,
receiving_pubkey=receiving_key), \
'Invalid KFrag {}'.format(kfrag.to_bytes().hex())
def get_users_public_keys(name, serialized=False):
dirname = "accounts/" + name + "/"
fname = dirname+"recipent.public.json"
with open(fname) as data_file:
data = json.load(data_file)
enc_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(data["enc"]))
sig_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(data["sig"]))
print(enc_pubkey, sig_pubkey)
if serialized:
return (base58.b58encode(bytes.fromhex(data["enc"])).decode("utf-8"),
base58.b58encode(bytes.fromhex(data["sig"])).decode("utf-8"))
return (enc_pubkey, sig_pubkey)
def grant_others_access(self, username, password, cid, label, recp_enc_b58_key, recp_sig_b58_key):
alice = self.act_as_alice(username, password)
enc = UmbralPublicKey.from_bytes(base58.b58decode(recp_enc_b58_key))
sig = UmbralPublicKey.from_bytes(base58.b58decode(recp_sig_b58_key))
nucid = creat_nucid(alice, cid, enc, sig, label.encode("utf-8"))
return nucid
def grant_others_access(self, username, password, cid, label, recp_enc_b58_key, recp_sig_b58_key):
alice = self.act_as_alice(username, password)
enc = UmbralPublicKey.from_bytes(base58.b58decode(recp_enc_b58_key))
sig = UmbralPublicKey.from_bytes(base58.b58decode(recp_sig_b58_key))
nucid = creat_nucid(alice, cid, enc, sig, label.encode("utf-8"))
return nucid
def get_users_public_keys(name, serialized=False):
dirname = "accounts/" + name + "/"
fname = dirname+"recipent.public.json"
with open(fname) as data_file:
data = json.load(data_file)
enc_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(data["enc"]))
sig_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(data["sig"]))
print(enc_pubkey, sig_pubkey)
if serialized:
return (base58.b58encode(bytes.fromhex(data["enc"])).decode("utf-8"),
base58.b58encode(bytes.fromhex(data["sig"])).decode("utf-8"))
return (enc_pubkey, sig_pubkey)
def decrypt(self, bob, item_cid, pol, sig, lab):
policy_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(pol))
alices_sig_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(sig))
label = lab.encode()
dat = self.ipfs_gateway_api.cat(item_cid)
doctor = bob
doctor.join_policy(label, alices_sig_pubkey)
data = msgpack.loads(dat, raw=False)
message_kits = (UmbralMessageKit.from_bytes(k) for k in data['kits'])
data_source = Enrico.from_public_keys(
{SigningPower: data['data_source']},
policy_encrypting_key=policy_pubkey
)
message_kit = next(message_kits)
start = timer()
retrieved_plaintexts = doctor.retrieve(
label=label,
message_kit=message_kit,