Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def act_as_bob(self, name):
dirname = "accounts/" + name + "/"
fname = dirname+"recipent.private.json"
with open(fname) as data_file:
data = json.load(data_file)
enc_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["enc"]))
sig_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["sig"]))
bob_enc_keypair = DecryptingKeypair(private_key=enc_privkey)
bob_sig_keypair = SigningKeypair(private_key=sig_privkey)
enc_power = DecryptingPower(keypair=bob_enc_keypair)
sig_power = SigningPower(keypair=bob_sig_keypair)
power_ups = [enc_power, sig_power]
bob = Bob(
is_me=True,
federated_only=True,
crypto_power_ups=power_ups,
start_learning_now=True,
abort_on_learning_error=True,
known_nodes=[self.ursula],
save_metadata=False,
network_middleware=RestMiddleware(),
)
return bob
def creat_nucid(alice, cid, enc_pubkey, sig_pubkey, label):
powers_and_material = { DecryptingPower: enc_pubkey, SigningPower: sig_pubkey }
doctor_strange = Bob.from_public_keys(powers_and_material=powers_and_material, federated_only=True)
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
# m, n = 1, 2
m, n = 2, 3
print(doctor_strange, label, m, n, policy_end_datetime)
print(alice)
policy = alice.grant(bob=doctor_strange, label=label, m=m, n=n, expiration=policy_end_datetime)
policy_info = {
"policy_pubkey": base58.b58encode(policy.public_key.to_bytes()).decode("utf-8"),
"alice_sig_pubkey": base58.b58encode(bytes(alice.stamp)).decode("utf-8"),
"label": label.decode("utf-8"),
}
store_url = "%s_%s_%s_%s"%(cid,
base58.b58encode(policy.public_key.to_bytes()).decode("utf-8"),
base58.b58encode(bytes(alice.stamp)).decode("utf-8"),
label.decode("utf-8"))
def add_contents(self, alicia, my_label, contents):
"""
cid = client.add_contents(
policy_pubkey=policy_pub_key
)
"""
policy_pubkey = alicia.get_policy_pubkey_from_label(my_label)
data_source = Enrico(policy_encrypting_key=policy_pubkey)
data_source_public_key = bytes(data_source.stamp)
heart_rate = 80
now = time.time()
kits = list()
heart_rate = contents
now += 3
heart_rate_data = { 'heart_rate': heart_rate, 'timestamp': now, }
plaintext = msgpack.dumps(heart_rate_data, use_bin_type=True)
message_kit, _signature = data_source.encrypt_message(plaintext)
kit_bytes = message_kit.to_bytes()
kits.append(kit_bytes)
data = { 'data_source': data_source_public_key, 'kits': kits, }
# print("🚀 ADDING TO IPFS D-STORAGE NETWORK 🚀")
d = msgpack.dumps(data, use_bin_type=True)
### NETWORK ERROR OUT ON FALLBACK
def decrypt(self, bob, item_cid, pol, sig, lab):
policy_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(pol))
alices_sig_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(sig))
label = lab.encode()
dat = self.ipfs_gateway_api.cat(item_cid)
doctor = bob
doctor.join_policy(label, alices_sig_pubkey)
data = msgpack.loads(dat, raw=False)
message_kits = (UmbralMessageKit.from_bytes(k) for k in data['kits'])
data_source = Enrico.from_public_keys(
{SigningPower: data['data_source']},
policy_encrypting_key=policy_pubkey
)
message_kit = next(message_kits)
start = timer()
retrieved_plaintexts = doctor.retrieve(
label=label,
message_kit=message_kit,
data_source=data_source,
alice_verifying_key=alices_sig_pubkey
)
end = timer()
plaintext = msgpack.loads(retrieved_plaintexts[0], raw=False)
heart_rate = plaintext['heart_rate']
timestamp = maya.MayaDT(plaintext['timestamp'])
terminal_size = shutil.get_terminal_size().columns
def act_as_alice(self, name, password):
dirname = "accounts/" + name + "/"
congifloc = dirname + "alice.config"
alice_config = AliceConfiguration(
config_root=os.path.join(dirname),
is_me=True,
known_nodes={self.ursula},
start_learning_now=False,
federated_only=True,
learn_on_same_thread=True,
)
cfg = alice_config.from_configuration_file(congifloc)
cfg.keyring.unlock(password)
alice = cfg.produce()
# alice.start_learning_loop(now=True)
return alice
def create_new_user(self, name, password):
passphrase = password
direco = "accounts/"+ name
# alice_config = AliceConfiguration(
# config_root=os.path.join(direco),
# is_me=True, known_nodes={self.ursula}, start_learning_now=True,
# federated_only=True, learn_on_same_thread=True,
# )
# alice_config.initialize(password=passphrase)
# alice_config.keyring.unlock(password=passphrase)
# alice_config_file = alice_config.to_configuration_file()
alice_config = AliceConfiguration(
config_root=os.path.join(direco),
is_me=True,
known_nodes={self.ursula},
start_learning_now=False,
federated_only=True,
learn_on_same_thread=True,
)
alice_config.initialize(password=passphrase)
alice_config.keyring.unlock(password=passphrase)
alice = alice_config.produce()
alice_config_file = alice_config.to_configuration_file()
alice.start_learning_loop(now=True)
enc_privkey = UmbralPrivateKey.gen_key()
sig_privkey = UmbralPrivateKey.gen_key()
def decrypt(self, bob, item_cid, pol, sig, lab):
policy_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(pol))
alices_sig_pubkey = UmbralPublicKey.from_bytes(bytes.fromhex(sig))
label = lab.encode()
dat = self.ipfs_gateway_api.cat(item_cid)
doctor = bob
doctor.join_policy(label, alices_sig_pubkey)
data = msgpack.loads(dat, raw=False)
message_kits = (UmbralMessageKit.from_bytes(k) for k in data['kits'])
data_source = Enrico.from_public_keys(
{SigningPower: data['data_source']},
policy_encrypting_key=policy_pubkey
)
message_kit = next(message_kits)
start = timer()
retrieved_plaintexts = doctor.retrieve(
label=label,
message_kit=message_kit,
data_source=data_source,
alice_verifying_key=alices_sig_pubkey
)
end = timer()
plaintext = msgpack.loads(retrieved_plaintexts[0], raw=False)
heart_rate = plaintext['heart_rate']
timestamp = maya.MayaDT(plaintext['timestamp'])
def creat_nucid(alice, cid, enc_pubkey, sig_pubkey, label):
powers_and_material = { DecryptingPower: enc_pubkey, SigningPower: sig_pubkey }
doctor_strange = Bob.from_public_keys(powers_and_material=powers_and_material, federated_only=True)
policy_end_datetime = maya.now() + datetime.timedelta(days=5)
# m, n = 1, 2
m, n = 2, 3
print(doctor_strange, label, m, n, policy_end_datetime)
print(alice)
policy = alice.grant(bob=doctor_strange, label=label, m=m, n=n, expiration=policy_end_datetime)
policy_info = {
"policy_pubkey": base58.b58encode(policy.public_key.to_bytes()).decode("utf-8"),
"alice_sig_pubkey": base58.b58encode(bytes(alice.stamp)).decode("utf-8"),
"label": label.decode("utf-8"),
}
store_url = "%s_%s_%s_%s"%(cid,
base58.b58encode(policy.public_key.to_bytes()).decode("utf-8"),
base58.b58encode(bytes(alice.stamp)).decode("utf-8"),
def act_as_bob(self, name):
dirname = "accounts/" + name + "/"
fname = dirname+"recipent.private.json"
with open(fname) as data_file:
data = json.load(data_file)
enc_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["enc"]))
sig_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["sig"]))
bob_enc_keypair = DecryptingKeypair(private_key=enc_privkey)
bob_sig_keypair = SigningKeypair(private_key=sig_privkey)
enc_power = DecryptingPower(keypair=bob_enc_keypair)
sig_power = SigningPower(keypair=bob_sig_keypair)
power_ups = [enc_power, sig_power]
bob = Bob(
is_me=True,
federated_only=True,
crypto_power_ups=power_ups,
start_learning_now=True,
abort_on_learning_error=True,
known_nodes=[self.ursula],
save_metadata=False,
network_middleware=RestMiddleware(),
)
return bob
def act_as_bob(self, name):
dirname = "accounts/" + name + "/"
fname = dirname+"recipent.private.json"
with open(fname) as data_file:
data = json.load(data_file)
enc_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["enc"]))
sig_privkey = UmbralPrivateKey.from_bytes(bytes.fromhex(data["sig"]))
bob_enc_keypair = DecryptingKeypair(private_key=enc_privkey)
bob_sig_keypair = SigningKeypair(private_key=sig_privkey)
enc_power = DecryptingPower(keypair=bob_enc_keypair)
sig_power = SigningPower(keypair=bob_sig_keypair)
power_ups = [enc_power, sig_power]
bob = Bob(
is_me=True,
federated_only=True,
crypto_power_ups=power_ups,
start_learning_now=True,
abort_on_learning_error=True,
known_nodes=[self.ursula],
save_metadata=False,
network_middleware=RestMiddleware(),
)
return bob