Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_crypto_sign_open(self):
pk, sk = pysodium.crypto_sign_keypair()
signed = pysodium.crypto_sign(b'howdy', sk)
changed = signed[:pysodium.crypto_sign_BYTES] + b'0' + signed[pysodium.crypto_sign_BYTES + 1:]
pysodium.crypto_sign_open(signed, pk)
self.assertRaises(ValueError, pysodium.crypto_sign_open, changed, pk)
# needs id, challenge, sig(id)
b, c = challenge()
message = [CHANGE,id,c]
# changes stored secret
# returns output from ./response | fail
elif sys.argv[1] == 'delete':
if len(sys.argv) != 4: usage()
# needs id, sig(id)
message = [DELETE,id]
elif sys.argv[1] == 'list':
print(b'\n'.join(getusers(datadir, hostid)).decode())
sys.exit(0)
else:
usage()
message=pysodium.crypto_sign(b''.join(message),sk)
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SphinxClientProtocol(message, loop, b), addr, port)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
def doSphinx(self, message, host, b, cb):
self.hostid=pysodium.crypto_generichash(host, self.getsalt(), 32)
signed=pysodium.crypto_sign(message,self.getkey())
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: SphinxClientProtocol(signed, loop, b, self, cb), address, port)
try:
loop.run_until_complete(coro)
loop.run_forever()
except:
raise
def ask_sync(self, pk, info):
"""Respond to someone wanting to sync (only public method)."""
# TODO: only send a diff? maybe with the help of self.height
# TODO: thread safe? (allow to run while mainloop is running)
cs = loads(crypto_sign_open(info, pk))
subset = {h: self.hg[h] for h in bfs(
(self.head,),
lambda u: (p for p in self.hg[u].p
if self.hg[p].c not in cs or self.height[p] > cs[self.hg[p].c]))}
msg = dumps((self.head, subset))
return crypto_sign(msg, self.sk)
def savepublickeys(self):
with open(get_pk_filename(self.basedir, self.name), 'w') as fd:
dates='{:<32}{:<32}'.format(self.created.isoformat(), self.valid.isoformat())
fd.write(nacl.crypto_sign(self.mp+self.sp+self.cp+dates+self.name, self.ms))
def sync(self, pk, payload):
"""Update hg and return new event ids in topological order."""
info = crypto_sign(dumps({c: self.height[h]
for c, h in self.can_see[self.head].items()}), self.sk)
msg = crypto_sign_open(self.network[pk](self.pk, info), pk)
remote_head, remote_hg = loads(msg)
new = tuple(toposort(remote_hg.keys() - self.hg.keys(),
lambda u: remote_hg[u].p))
for h in new:
ev = remote_hg[h]
if self.is_valid_event(h, ev):
self.add_event(h, ev)
if self.is_valid_event(remote_head, remote_hg[remote_head]):
h, ev = self.new_event(payload, (self.head, remote_head))
# this really shouldn't fail, let's check it to be sure
def savepublickeys(self):
with open(get_pk_filename(self.basedir, self.name), 'w') as fd:
dates='{:<32}{:<32}'.format(self.created.isoformat(), self.valid.isoformat())
fd.write(nacl.crypto_sign(self.mp+self.sp+self.cp+dates+self.name, self.ms))
def proof_create(chall_id, chall_sk):
chall_id = chall_id.encode('utf-8')
assert isinstance(chall_id, bytes)
assert isinstance(chall_sk, bytes)
team_sk = TeamSecrets['sign_sk']
membership_proof = pysodium.crypto_sign(chall_id, team_sk)
proof = pysodium.crypto_sign(membership_proof, chall_sk)
return b64encode(proof)