Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __standard_encryption_api() -> tuple:
delegating_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
delegating_pubkey = delegating_privkey.get_pubkey()
signing_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
signer = Signer(signing_privkey)
receiving_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
receiving_pubkey = receiving_privkey.get_pubkey()
plain_data = os.urandom(32)
ciphertext, capsule = pre.encrypt(delegating_pubkey, plain_data)
capsule.set_correctness_keys(delegating=delegating_pubkey,
receiving=receiving_pubkey,
verifying=signing_privkey.get_pubkey())
return delegating_privkey, signer, receiving_pubkey, ciphertext, capsule
# If True, this will overwrite existing test vector files with new randomly generated instances
generate_again = False
#########
# SETUP #
#########
set_default_curve()
params = default_params()
curve = params.curve
# We create also some Umbral objects for later
delegating_privkey = UmbralPrivateKey.gen_key(params=params)
receiving_privkey = UmbralPrivateKey.gen_key(params=params)
signing_privkey = UmbralPrivateKey.gen_key(params=params)
verifying_key = signing_privkey.get_pubkey()
delegating_key = delegating_privkey.get_pubkey()
receiving_key = receiving_privkey.get_pubkey()
signer = Signer(signing_privkey)
kfrags = pre.generate_kfrags(delegating_privkey=delegating_privkey,
receiving_pubkey=receiving_key,
threshold=6,
N=10,
signer=signer,
)
plain_data = b'peace at dawn'
def test_private_key_serialization_with_encryption(random_ec_curvebn1):
priv_key = random_ec_curvebn1
umbral_key = UmbralPrivateKey(priv_key, default_params())
insecure_cost = 15 # This is deliberately insecure, just to make the tests faster
encoded_key = umbral_key.to_bytes(password=b'test',
_scrypt_cost=insecure_cost)
decoded_key = UmbralPrivateKey.from_bytes(encoded_key,
password=b'test',
_scrypt_cost=insecure_cost)
assert priv_key == decoded_key.bn_key
def __produce_kfrags_and_capsule(m: int, n: int) -> Tuple[List[KFrag], Capsule]:
delegating_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
delegating_pubkey = delegating_privkey.get_pubkey()
signing_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
signer = Signer(signing_privkey)
receiving_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
receiving_pubkey = receiving_privkey.get_pubkey()
plain_data = os.urandom(32)
ciphertext, capsule = pre.encrypt(delegating_pubkey, plain_data)
kfrags = pre.generate_kfrags(delegating_privkey, receiving_pubkey, m, n, signer)
capsule.set_correctness_keys(delegating=delegating_pubkey,
receiving=receiving_pubkey,
verifying=signing_privkey.get_pubkey())
def test_prehashed_message(execution_number):
privkey = UmbralPrivateKey.gen_key()
pubkey = privkey.get_pubkey()
signer = Signer(private_key=privkey)
message = b"peace at dawn"
hash_function = hashes.Hash(DEFAULT_HASH_ALGORITHM(), backend=backend)
hash_function.update(message)
prehashed_message = hash_function.finalize()
signature = signer(message=prehashed_message, is_prehashed=True)
assert signature.verify(message=prehashed_message,
verifying_key=pubkey,
is_prehashed=True)
def test_gen_key():
# Pass in the parameters to test that manual param selection works
umbral_priv_key = UmbralPrivateKey.gen_key()
assert type(umbral_priv_key) == UmbralPrivateKey
umbral_pub_key = umbral_priv_key.get_pubkey()
assert type(umbral_pub_key) == UmbralPublicKey
def __produce_kfrags(M, N) -> list:
delegating_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
signing_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
signer = Signer(signing_privkey)
receiving_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
receiving_pubkey = receiving_privkey.get_pubkey()
kfrags = pre.split_rekey(delegating_privkey, signer, receiving_pubkey, M, N)
return kfrags
def test_sign_and_verify(execution_number):
privkey = UmbralPrivateKey.gen_key()
pubkey = privkey.get_pubkey()
signer = Signer(private_key=privkey)
message = b"peace at dawn"
signature = signer(message=message)
# Basic signature verification
assert signature.verify(message, pubkey)
assert not signature.verify(b"another message", pubkey)
another_pubkey = UmbralPrivateKey.gen_key().pubkey
assert not signature.verify(message, another_pubkey)
# Signature serialization
sig_bytes = bytes(signature)
assert len(sig_bytes) == Signature.expected_bytes_length()
restored_signature = Signature.from_bytes(sig_bytes)
assert restored_signature == signature
def act_as_bob(self, name):
dirname = "accounts/" + name + "/"
fname = dirname+"recipent.private.json"
with open(fname) as data_file:
data = json.load(data_file)
enc_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["enc"]))
sig_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["sig"]))
bob_enc_keypair = DecryptingKeypair(private_key=enc_privkey)
bob_sig_keypair = SigningKeypair(private_key=sig_privkey)
enc_power = DecryptingPower(keypair=bob_enc_keypair)
sig_power = SigningPower(keypair=bob_sig_keypair)
power_ups = [enc_power, sig_power]
bob = Bob(
is_me=True,
federated_only=True,
crypto_power_ups=power_ups,
start_learning_now=True,
abort_on_learning_error=True,
known_nodes=[self.ursula],
save_metadata=False,
network_middleware=RestMiddleware(),
)
def decrypt(ciphertext: bytes, capsule: Capsule, decrypting_key: UmbralPrivateKey,
check_proof: bool = True) -> bytes:
"""
Opens the capsule and gets what's inside.
We hope that's a symmetric key, which we use to decrypt the ciphertext
and return the resulting cleartext.
"""
if not isinstance(ciphertext, bytes) or len(ciphertext) < DEM_NONCE_SIZE:
raise ValueError("Input ciphertext must be a bytes object of length >= {}".format(DEM_NONCE_SIZE))
elif not isinstance(capsule, Capsule) or not capsule.verify():
raise Capsule.NotValid
elif not isinstance(decrypting_key, UmbralPrivateKey):
raise TypeError("The decrypting key is not an UmbralPrivateKey")
if capsule._attached_cfrags:
# Since there are cfrags attached, we assume this is Bob opening the Capsule.
# (i.e., this is a re-encrypted capsule)
encapsulated_key = _open_capsule(capsule, decrypting_key, check_proof=check_proof)
else:
# Since there aren't cfrags attached, we assume this is Alice opening the Capsule.
# (i.e., this is an original capsule)
encapsulated_key = _decapsulate_original(decrypting_key, capsule)
dem = UmbralDEM(encapsulated_key)
try:
cleartext = dem.decrypt(ciphertext, authenticated_data=bytes(capsule))
except InvalidTag as e:
raise UmbralDecryptionError() from e