Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_simple_api(N, M, curve=default_curve()):
"""Manually injects umbralparameters for multi-curve testing."""
params = UmbralParameters(curve=curve)
priv_key_alice = keys.UmbralPrivateKey.gen_key(params=params)
pub_key_alice = priv_key_alice.get_pubkey()
priv_key_bob = keys.UmbralPrivateKey.gen_key(params=params)
pub_key_bob = priv_key_bob.get_pubkey()
plain_data = b'attack at dawn'
ciphertext, capsule = pre.encrypt(pub_key_alice, plain_data)
cleartext = pre.decrypt(capsule, priv_key_alice, ciphertext)
assert cleartext == plain_data
rekeys, _unused_vkeys = pre.split_rekey(priv_key_alice, pub_key_bob, M, N, params=params)
import os
import sys
sys.path.append(os.path.abspath(os.getcwd()))
from umbral import keys, pre
from umbral.config import default_curve
from umbral.params import UmbralParameters
from umbral.signing import Signer
CURVE = default_curve()
PARAMS = UmbralParameters(curve=CURVE)
REENCRYPTIONS = 1000
def __produce_kfrags(M, N) -> list:
delegating_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
signing_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
signer = Signer(signing_privkey)
receiving_privkey = keys.UmbralPrivateKey.gen_key(params=PARAMS)
receiving_pubkey = receiving_privkey.get_pubkey()
kfrags = pre.split_rekey(delegating_privkey, signer, receiving_pubkey, M, N)
return kfrags
def test_simple_api(N, M, curve=default_curve()):
"""
This test models the main interactions between NuCypher actors (i.e., Alice,
Bob, Data Source, and Ursulas) and artifacts (i.e., public and private keys,
ciphertexts, capsules, KFrags, CFrags, etc).
The test covers all the main stages of data sharing with NuCypher:
key generation, delegation, encryption, decryption by
Alice, re-encryption by Ursula, and decryption by Bob.
Manually injects umbralparameters for multi-curve testing."""
# Generation of global parameters
params = UmbralParameters(curve=curve)
# Key Generation (Alice)
delegating_privkey = UmbralPrivateKey.gen_key(params=params)
from hypothesis.strategies import binary, booleans, integers, tuples
from umbral.config import default_curve
from umbral.curvebn import CurveBN
from umbral.cfrags import CorrectnessProof
from umbral.kfrags import KFrag
from umbral.keys import UmbralPrivateKey, UmbralPublicKey
from umbral.params import UmbralParameters
from umbral.point import Point
from umbral.random_oracles import unsafe_hash_to_point
from umbral.pre import Capsule
# test parameters
max_examples = 1000
# crypto constants
curve = default_curve()
params = UmbralParameters(curve)
bn_size = curve.group_order_size_in_bytes
# generators
bns = integers(min_value=1, max_value=backend._bn_to_int(curve.order)).map(
lambda x: CurveBN.from_int(x))
points = binary(min_size=1).map(
lambda x: unsafe_hash_to_point(x, label=b'hypothesis', params=params))
signatures = tuples(integers(min_value=1, max_value=backend._bn_to_int(curve.order)),
integers(min_value=1, max_value=backend._bn_to_int(curve.order))).map(
lambda tup: tup[0].to_bytes(bn_size, 'big') + tup[1].to_bytes(bn_size, 'big'))
# # utility
def assert_kfrag_eq(k0, k1):
def expected_bytes_length(cls, curve: Optional[Curve] = None) -> int:
"""
Returns the size (in bytes) of a Capsule given the curve.
If no curve is provided, it will use the default curve.
"""
curve = curve if curve is not None else default_curve()
bn_size = CurveBN.expected_bytes_length(curve)
point_size = Point.expected_bytes_length(curve)
return (bn_size * 1) + (point_size * 2)
def expected_bytes_length(cls, curve: Optional[Curve] = None):
"""
Returns the size (in bytes) of a CorrectnessProof without the metadata.
If no curve is given, it will use the default curve.
"""
curve = curve if curve is not None else default_curve()
bn_size = CurveBN.expected_bytes_length(curve=curve)
point_size = Point.expected_bytes_length(curve=curve)
return (bn_size * 3) + (point_size * 4)
def from_bytes(cls, data: bytes, curve: Optional[Curve] = None) -> 'Point':
"""
Returns a Point object from the given byte data on the curve provided.
"""
curve = curve if curve is not None else default_curve()
point = openssl._get_new_EC_POINT(curve)
with backend._tmp_bn_ctx() as bn_ctx:
res = backend._lib.EC_POINT_oct2point(
curve.ec_group, point, data, len(data), bn_ctx);
backend.openssl_assert(res == 1)
return cls(point, curve)
def expected_bytes_length(cls, curve: Optional[Curve] = None) -> int:
curve = curve if curve is not None else default_curve()
return 2 * curve.group_order_size_in_bytes
def from_affine(cls, coords: Tuple[int, int], curve: Optional[Curve] = None) -> 'Point':
"""
Returns a Point object from the given affine coordinates in a tuple in
the format of (x, y) and a given curve.
"""
curve = curve if curve is not None else default_curve()
affine_x, affine_y = coords
if type(affine_x) == int:
affine_x = openssl._int_to_bn(affine_x, curve=None)
if type(affine_y) == int:
affine_y = openssl._int_to_bn(affine_y, curve=None)
ec_point = openssl._get_EC_POINT_via_affine(affine_x, affine_y, curve)
return cls(ec_point, curve)