Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_capsule_as_dict_key(alices_keys):
priv_key_alice, pub_key_alice = alices_keys
plain_data = b'attack at dawn'
ciphertext, capsule = pre.encrypt(pub_key_alice, plain_data)
# We can use the capsule as a key, and successfully lookup using it.
some_dict = {capsule: "Thing that Bob wants to try per-Capsule"}
assert some_dict[capsule] == "Thing that Bob wants to try per-Capsule"
kfrags, _vkeys = pre.split_rekey(alices_keys.priv, alices_keys.pub, 1, 2)
cfrag = pre.reencrypt(kfrags[0], capsule)
capsule.attach_cfrag(cfrag)
cfrag = pre.reencrypt(kfrags[1], capsule)
capsule.attach_cfrag(cfrag)
# Even if we activate the capsule, it still serves as the same key.
cleartext = pre.decrypt(capsule, alices_keys.priv, ciphertext, alices_keys.pub)
assert some_dict[capsule] == "Thing that Bob wants to try per-Capsule"
assert cleartext == plain_data
# And if we change the value for this key, all is still well.
some_dict[capsule] = "Bob has changed his mind."
assert some_dict[capsule] == "Bob has changed his mind."
assert len(some_dict.keys()) == 1
def firehose() -> None:
print("Making kfrags...")
kfrags, capsule = __produce_kfrags(M=6, N=10)
one_kfrag, *remaining_kfrags = kfrags
print('Re-encrypting...')
successful_rencryptions = 0
for iteration in range(int(REENCRYPTIONS)):
_cfrag = pre.reencrypt(one_kfrag, capsule) # <<< REENCRYPTION HAPPENS HERE
successful_rencryptions += 1
if iteration % 20 == 0:
print('Performed {} Re-encryptions...'.format(iteration))
failure_message = "A Reencryption failed. {} of {} succeeded".format(successful_rencryptions, REENCRYPTIONS)
assert successful_rencryptions == REENCRYPTIONS, failure_message
print("Successfully performed {} reencryptions".format(successful_rencryptions), end='\n')
def test_cheating_ursula_sends_garbage(kfrags, prepared_capsule):
capsule_alice = prepared_capsule
cfrags = []
for i, kfrag in enumerate(kfrags):
# Example of potential metadata to describe the re-encryption request
metadata_i = "This is an example of metadata for re-encryption request #{}"
metadata_i = metadata_i.format(i).encode()
cfrag = pre.reencrypt(kfrag, capsule_alice, metadata=metadata_i)
cfrags.append(cfrag)
# Let's put random garbage in one of the cfrags
cfrags[0].point_e1 = Point.gen_rand()
cfrags[0].point_v1 = Point.gen_rand()
# Of course, this CFrag is not valid ...
assert not cfrags[0].verify_correctness(capsule_alice)
# ... and trying to attach it raises an error.
with pytest.raises(pre.UmbralCorrectnessError) as exception_info:
capsule_alice.attach_cfrag(cfrags[0])
correctness_error = exception_info.value
assert cfrags[0] in correctness_error.offending_cfrags
assert len(correctness_error.offending_cfrags) == 1
def test_challenge_response_serialization():
priv_key = pre.gen_priv()
pub_key = pre.priv2pub(priv_key)
_unused_key, capsule = pre._encapsulate(pub_key)
kfrags, _unused_vkeys = pre.split_rekey(priv_key, pub_key, 1, 2)
cfrag = pre.reencrypt(kfrags[0], capsule)
capsule.attach_cfrag(cfrag)
ch_resp = pre.challenge(kfrags[0], capsule, cfrag)
ch_resp_bytes = ch_resp.to_bytes()
# A ChallengeResponse can be represented as
# the 228 total bytes of four Points (33 each) and three BigNums (32 each).
assert len(ch_resp_bytes) == (33 * 4) + (32 * 3) == 228
new_ch_resp = pre.ChallengeResponse.from_bytes(ch_resp_bytes)
assert new_ch_resp.point_eph_e2 == ch_resp.point_eph_e2
assert new_ch_resp.point_eph_v2 == ch_resp.point_eph_v2
assert new_ch_resp.point_kfrag_commitment == ch_resp.point_kfrag_commitment
assert new_ch_resp.point_kfrag_pok == ch_resp.point_kfrag_pok
assert new_ch_resp.bn_kfrag_sig1 == ch_resp.bn_kfrag_sig1
def test_cfrag_with_missing_proof_cannot_be_attached(kfrags, prepared_capsule):
capsule = prepared_capsule
cfrags = []
for kfrag in kfrags:
cfrag = pre.reencrypt(kfrag, capsule)
cfrags.append(cfrag)
# If the proof is lost (e.g., it is chopped off a serialized CFrag or similar),
# then the CFrag cannot be attached.
cfrags[0].proof = None
with pytest.raises(CapsuleFrag.NoProofProvided):
capsule.attach_cfrag(cfrags[0])
# The remaining CFrags are fine, so they can be attached correctly
for cfrag in cfrags[1:]:
capsule.attach_cfrag(cfrag)
def test_m_of_n(N, M, alices_keys, bobs_keys):
priv_key_alice, pub_key_alice = alices_keys
priv_key_bob, pub_key_bob = bobs_keys
sym_key, capsule = pre._encapsulate(pub_key_alice.point_key)
kfrags, vkeys = pre.split_rekey(priv_key_alice, pub_key_bob, M, N)
for kfrag in kfrags:
assert kfrag.verify(pub_key_alice.point_key, pub_key_bob.point_key)
assert kfrag.is_consistent(vkeys)
for kfrag in kfrags[:M]:
cfrag = pre.reencrypt(kfrag, capsule)
capsule.attach_cfrag(cfrag)
ch = pre.challenge(kfrag, capsule, cfrag)
assert pre.check_challenge(capsule, cfrag, ch, pub_key_alice.point_key, pub_key_bob.point_key)
# assert capsule.is_openable_by_bob() # TODO: Is it possible to check here if >= m cFrags have been attached?
# capsule.open(pub_bob, priv_bob, pub_alice)
capsule._reconstruct_shamirs_secret()
sym_key_from_capsule = pre.decapsulate_reencrypted(pub_key_bob.point_key,
priv_key_bob.bn_key,
pub_key_alice.point_key,
capsule)
assert sym_key == sym_key_from_capsule
kfrags = pre.generate_kfrags(delegating_privkey, receiving_pubkey, M, N, signer)
# Capsule preparation (necessary before re-encryotion and activation)
capsule.set_correctness_keys(delegating=delegating_pubkey,
receiving=receiving_pubkey,
verifying=signing_pubkey)
# Bob requests re-encryption to some set of M ursulas
cfrags = list()
for kfrag in kfrags[:M]:
# Ursula checks that the received kfrag is valid
assert kfrag.verify(signing_pubkey, delegating_pubkey, receiving_pubkey, params)
# Re-encryption by an Ursula
cfrag = pre.reencrypt(kfrag, capsule)
# Bob collects the result
cfrags.append(cfrag)
# Capsule activation (by Bob)
for cfrag in cfrags:
capsule.attach_cfrag(cfrag)
# Decryption by Bob
reenc_cleartext = pre.decrypt(ciphertext, capsule, receiving_privkey)
assert reenc_cleartext == plain_data
priv_key_alice = keys.UmbralPrivateKey.gen_key(params=params)
pub_key_alice = priv_key_alice.get_pubkey()
priv_key_bob = keys.UmbralPrivateKey.gen_key(params=params)
pub_key_bob = priv_key_bob.get_pubkey()
plain_data = b'attack at dawn'
ciphertext, capsule = pre.encrypt(pub_key_alice, plain_data)
cleartext = pre.decrypt(capsule, priv_key_alice, ciphertext)
assert cleartext == plain_data
rekeys, _unused_vkeys = pre.split_rekey(priv_key_alice, pub_key_bob, M, N, params=params)
for rekey in rekeys:
c_frag = pre.reencrypt(rekey, capsule, params=params)
capsule.attach_cfrag(c_frag)
reenc_cleartext = pre.decrypt(capsule, priv_key_bob, ciphertext, pub_key_alice)
assert reenc_cleartext == plain_data
import random
kfrags = random.sample(kfrags, # All kfrags from above
10) # M - Threshold
# Bob collects the resulting `cfrags` from several Ursulas.
# Bob must gather at least `threshold` `cfrags` in order to activate the capsule.
bob_capsule.set_correctness_keys(delegating=alices_public_key,
receiving=bobs_public_key,
verifying=alices_verifying_key)
cfrags = list() # Bob's cfrag collection
for kfrag in kfrags:
cfrag = pre.reencrypt(kfrag=kfrag, capsule=bob_capsule)
cfrags.append(cfrag) # Bob collects a cfrag
assert len(cfrags) == 10
#10
# Bob attaches cfrags to the capsule
# ----------------------------------
# Bob attaches at least `threshold` `cfrags` to the capsule;
# then it can become *activated*.
for cfrag in cfrags:
bob_capsule.attach_cfrag(cfrag)
#11
# Bob activates and opens the capsule
# ------------------------------------