Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# check public key
if signer.pubkey != pubkey:
print("Bad credentials!")
sys.exit(1)
# save private key in a file (PubSec v1 format)
signer.save_pubsec_file(PRIVATE_KEY_FILE_PATH)
# document saved
print(
"Private key for public key %s saved in %s" % (signer.pubkey, PRIVATE_KEY_FILE_PATH)
)
try:
# load private keys from file
loaded_signer = SigningKey.from_pubsec_file(PRIVATE_KEY_FILE_PATH)
# check public key from file
print(
"Public key %s loaded from file %s"
% (loaded_signer.pubkey, PRIVATE_KEY_FILE_PATH)
)
except IOError as error:
print(error)
sys.exit(1)
sys.exit(0)
# CONFIG #######################################
SIGNED_MESSAGE_FILENAME = "/tmp/duniter_signed_message.bin"
################################################
if __name__ == "__main__":
# prompt hidden user entry
salt = getpass.getpass("Enter your passphrase (salt): ")
# prompt hidden user entry
password = getpass.getpass("Enter your password: ")
# Create key object
key = SigningKey.from_credentials(salt, password)
# Display your public key
print("Public key for your credentials: %s" % key.pubkey)
message = input("Enter your message: ")
# Sign the message, the signed string is the message itself plus the
# signature
signed_message = key.sign(bytes(message, "utf-8")) # type: bytes
# To create a verifier pass in the verify key:
veri = libnacl.sign.Verifier(key.hex_vk())
# Verify the message!
verified = veri.verify(signed_message)
# save signed message in a file
def generate_revocation(self, connection, secret_key, password):
"""
Generate account revocation document for given community
:param sakia.data.entities.Connection connection: The connection of the identity
:param str secret_key: The account SigningKey secret key
:param str password: The account SigningKey password
"""
document = Revocation(10, connection.currency, connection.pubkey, "")
identity = self._identities_processor.get_identity(connection.currency, connection.pubkey, connection.uid)
if not identity:
identity = self.generate_identity(connection)
identity_doc = identity.document()
key = SigningKey(connection.salt, connection.password, connection.scrypt_params)
identity_doc.sign([key])
identity.signature = identity_doc.signatures[0]
self._identities_processor.insert_or_update_identity(identity)
self_cert = identity.document()
key = SigningKey(secret_key, password, connection.scrypt_params)
document.sign(self_cert, [key])
return document.signed_raw(self_cert), identity
def set_scrypt_infos(self, salt, password):
"""
Change the size of the wallet pool
:param int size: The new size of the wallet pool
:param str password: The password of the account, same for all wallets
"""
self.salt = salt
self.pubkey = SigningKey(self.salt, password).pubkey
wallet = Wallet.create(0, self.salt, password,
"Wallet", self._identities_registry)
self.wallets.append(wallet)
"""
Get an Identity document
:param current_block: Current block data
:param uid: Unique IDentifier
:param salt: Passphrase of the account
:param password: Password of the account
:rtype: Identity
"""
# get current block BlockStamp
timestamp = BlockUID(current_block["number"], current_block["hash"])
# create keys from credentials
key = SigningKey.from_credentials(salt, password)
# create identity document
identity = Identity(
version=10,
currency=current_block["currency"],
pubkey=key.pubkey,
uid=uid,
ts=timestamp,
signature=None,
)
# sign document
identity.sign([key])
return identity
Main code
"""
# dummy credentials
salt = password = "test"
# You can connect with member credentials in case there is not much slots available on the endpoint
#
# # Prompt hidden user entry
# import getpass
# salt = getpass.getpass("Enter your passphrase (salt): ")
#
# # Prompt hidden user entry
# password = getpass.getpass("Enter your password: ")
# Init signing_key instance
signing_key = SigningKey.from_credentials(salt, password)
# Create Client from endpoint string in Duniter format
try:
ws2p_endpoint = await generate_ws2p_endpoint(BMAS_ENDPOINT)
except ValueError as e:
print(e)
return
client = Client(ws2p_endpoint)
try:
# Create a Web Socket connection
ws = await client.connect_ws()
print("Successfully connected to the web socket endpoint")
# HANDSHAKE #######################################################
def check_password(self, password):
"""
Method to verify the key password validity
:param str password: The key password
:return: True if the generated pubkey is the same as the account
.. warnings:: Generates a new temporary SigningKey
"""
key = SigningKey(self.salt, password)
return (key.pubkey == self.pubkey)
self._logger.debug("Certdata")
blockUID = self._blockchain_processor.current_buid(connection.currency)
if not identity.signature:
lookup_data = await self._bma_connector.get(connection.currency, bma.wot.lookup,
req_args={'search': identity.pubkey})
for uid_data in next(data["uids"] for data in lookup_data["results"] if data["pubkey"] == identity.pubkey):
if uid_data["uid"] == identity.uid and block_uid(uid_data["meta"]["timestamp"]) == identity.blockstamp:
identity.signature = uid_data["self"]
break
else:
return False, "Could not find certified identity signature"
certification = Certification(10, connection.currency,
connection.pubkey, identity.pubkey, blockUID, None)
key = SigningKey(secret_key, password, connection.scrypt_params)
certification.sign(identity.document(), [key])
signed_cert = certification.signed_raw(identity.document())
self._logger.debug("Certification : {0}".format(signed_cert))
timestamp = self._blockchain_processor.time(connection.currency)
responses = await self._bma_connector.broadcast(connection.currency, bma.wot.certify, req_args={'cert': signed_cert})
result = await parse_bma_responses(responses)
if result[0]:
self._identities_processor.insert_or_update_identity(identity)
self._certifications_processor.create_or_update_certification(connection.currency, certification,
timestamp, None)
return result
Usage:
python decrypt_message.py ENCRYPTED_MESSAGE_FILEPATH
"""
)
# capture encrypted message filepath argument
signed_message_path = sys.argv[1]
# prompt hidden user entry
salt = getpass.getpass("Enter your passphrase (salt): ")
# prompt hidden user entry
password = getpass.getpass("Enter your password: ")
# Create key object
signing_key_instance = SigningKey.from_credentials(salt, password)
# open encrypted message file
with open(signed_message_path, "rb") as file_handler:
encrypted_message = file_handler.read()
# Decrypt the message!
try:
message = signing_key_instance.decrypt_seal(encrypted_message).decode("utf-8")
print("Decrypted message:")
except ValueError as error:
message = str(error)
print(message)
async def revoke(self, currency, identity, salt, password):
"""
Revoke self-identity on server, not in blockchain
:param str currency: The currency of the identity
:param sakia.data.entities.IdentityDoc identity: The certified identity
:param str salt: The account SigningKey salt
:param str password: The account SigningKey password
"""
revocation = Revocation(10, currency, None)
self_cert = identity.document()
key = SigningKey(salt, password)
revocation.sign(self_cert, [key])
self._logger.debug("Self-Revokation Document : \n{0}".format(revocation.raw(self_cert)))
self._logger.debug("Signature : \n{0}".format(revocation.signatures[0]))
data = {
'pubkey': identity.pubkey,
'self_': self_cert.signed_raw(),
'sig': revocation.signatures[0]
}
self._logger.debug("Posted data : {0}".format(data))
responses = await self._bma_connector.broadcast(currency, bma.wot.Revoke, {}, data)
result = await parse_bma_responses(responses)
return result