Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def do_storetargetedblockenc(self, arg):
"<data> store base58 encoded targeted block."
args = arg.split(' ')
data = base58.decode(args[0])
store_key = bool(args[1]) if len(args) == 2 else False
def key_callback(data_key):
self.writeln("data_key=[{}].".format(mbase32.encode(data_key)))
start = datetime.today()
storing_nodes =\
yield from self.peer.engine.tasks.send_store_targeted_data(\
data, store_key=store_key, key_callback=key_callback)
diff = datetime.today() - start
self.writeln("storing_nodes=[{}].".format(storing_nodes))
self.writeln("send_store_targeted_data(..) took: {}.".format(diff))
</data>
def do_storeukeyenc(self, arg):
" <data> [PATH] store base58 encoded DATA"
" with base58 encoded private KEY."
args = arg.split(' ')
key = rsakey.RsaKey(privdata=base58.decode(args[0]))
data = base58.decode(args[1])
version = int(args[2])
storekey = bool(args[3])
path = args[4] if len(args) > 4 else None
def key_callback(data_key):
self.writeln("data_key=[{}].".format(mbase32.encode(data_key)))
start = datetime.today()
yield from multipart.store_data(\
self.peer.engine, data, privatekey=key, path=path,\
version=version, key_callback=key_callback)
diff = datetime.today() - start
self.writeln("multipart.store_data(..) took: {}.".format(diff))</data>
def do_storeukeyenc(self, arg):
" <data> [PATH] store base58 encoded DATA"
" with base58 encoded private KEY."
args = arg.split(' ')
key = rsakey.RsaKey(privdata=base58.decode(args[0]))
data = base58.decode(args[1])
version = int(args[2])
storekey = bool(args[3])
path = args[4] if len(args) > 4 else None
def key_callback(data_key):
self.writeln("data_key=[{}].".format(mbase32.encode(data_key)))
start = datetime.today()
yield from multipart.store_data(\
self.peer.engine, data, privatekey=key, path=path,\
version=version, key_callback=key_callback)
diff = datetime.today() - start
self.writeln("multipart.store_data(..) took: {}.".format(diff))
</data>
def do_storeblockenc(self, arg):
"<data> store base58 encoded block."
args = arg.split(' ')
data = base58.decode(args[0])
store_key = bool(args[1]) if len(args) == 2 else False
def key_callback(data_key):
self.writeln("data_key=[{}].".format(mbase32.encode(data_key)))
start = datetime.today()
storing_nodes =\
yield from self.peer.engine.tasks.send_store_data(\
data, store_key=store_key, key_callback=key_callback)
diff = datetime.today() - start
self.writeln("storing_nodes=[{}].".format(storing_nodes))
self.writeln("send_store_data(..) took: {}.".format(diff))
</data>
return
formelement = form["fileToUpload"]
filename = formelement.filename
data = formelement.file.read()
if log.isEnabledFor(logging.INFO):
log.info("filename=[{}].".format(filename))
privatekey = form["privateKey"].value
if privatekey and privatekey != "${PRIVATE_KEY}":
if log.isEnabledFor(logging.INFO):
log.info("privatekey=[{}].".format(privatekey))
privatekey = base58.decode(privatekey)
privatekey = rsakey.RsaKey(privdata=privatekey)
path = form["path"].value.encode()
version = form["version"].value
if not version:
version = 0
else:
version = int(version)
mime_type = form["mime_type"].value
else:
privatekey = None
if log.isEnabledFor(logging.DEBUG):
log.debug("data=[{}].".format(data))
def _send_dmail(self, from_asymkey, recipient, dmail_bytes, signature):
assert type(recipient) is DmailSite
# Read in recipient DmailSite.
root = recipient.root
sse = sshtype.parseMpint(base58.decode(root["sse"]))[1]
target_enc = root["target"]
difficulty = root["difficulty"]
# Calculate a shared secret.
dh = dhgroup14.DhGroup14()
dh.generate_x()
dh.generate_e()
dh.f = sse
k = dh.calculate_k()
target_key = mbase32.decode(target_enc)
key = self._generate_encryption_key(target_key, k)
# Encrypt the Dmail bytes.
addr, sig_bits = mutil.decode_key(args.scan_dmail)
def key_callback(key):
print("dmail key: [{}].".format(mbase32.encode(key)))
yield from de.scan_dmail_address(\
addr, sig_bits, key_callback=key_callback)
if args.fetch_dmail:
log.info("Fetching dmail for key=[{}].".format(args.fetch_dmail))
key = mbase32.decode(args.fetch_dmail)
if args.x:
l, x_int = sshtype.parseMpint(base58.decode(args.x))
else:
x_int = None
dmail_target = args.dmail_target
dm, valid_sig =\
yield from de.fetch_dmail(key, x_int, None, dmail_target)
if not dm:
raise Exception("No dmail found.")
if not x_int:
print("Encrypted dmail data=[\n{}].".format(mutil.hex_dump(dm)))
else:
print("Subject: {}\n".format(dm.subject))
def send_dmail_text(self, subject, message_text):
if message_text.startswith("from: "):
p0 = message_text.find('\n')
m_from_asymkey = rsakey.RsaKey(\
privdata=base58.decode(message_text[6:p0]))
p0 += 1
else:
p0 = 0
m_from_asymkey = None
m_dest_ids = []
while message_text.startswith("to: ", p0):
p1 = message_text.find('\n')
m_dest_enc = message_text[p0+4:p1]
m_dest_id, sig_bits = mutil.decode_key(m_dest_enc)
m_dest_ids.append((m_dest_enc, m_dest_id, sig_bits))
p0 = p1 + 1
date = mutil.utc_datetime()
if message_text[p0] == '\n':
def do_storedataenc(self, arg):
"<data> store base58 encoded data."
data = base58.decode(arg)
def key_callback(data_key):
self.writeln("data_key=[{}].".format(mbase32.encode(data_key)))
start = datetime.today()
yield from multipart.store_data(\
self.peer.engine, data, key_callback=key_callback)
diff = datetime.today() - start
self.writeln("multipart.store_data(..) took: {}.".format(diff))
</data>