Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
m = bytearray(self.ct.decrypt(decrypter, *decargs))
"""
The value "m" in the above formulas is derived from the session key
as follows. First, the session key is prefixed with a one-octet
algorithm identifier that specifies the symmetric encryption
algorithm used to encrypt the following Symmetrically Encrypted Data
Packet. Then a two-octet checksum is appended, which is equal to the
sum of the preceding session key octets, not including the algorithm
identifier, modulo 65536. This value is then encoded as described in
PKCS#1 block encoding EME-PKCS1-v1_5 in Section 7.2.1 of [RFC3447] to
form the "m" value used in the formulas above. See Section 13.1 of
this document for notes on OpenPGP's use of PKCS#1.
"""
symalg = SymmetricKeyAlgorithm(m[0])
del m[0]
symkey = m[:symalg.key_size // 8]
del m[:symalg.key_size // 8]
checksum = self.bytes_to_int(m[:2])
del m[:2]
if not sum(symkey) % 65536 == checksum: # pragma: no cover
raise PGPDecryptionError("{:s} decryption failed".format(self.pkalg.name))
return (symalg, symkey)
def gen_key(username):
print("generating OpenPGP key pair")
key = pgpy.PGPKey.new(PubKeyAlgorithm.RSAEncryptOrSign, 4096)
uid = pgpy.PGPUID.new(username, email='%s@%s' % (username, _provider))
key.add_uid(
uid,
usage={KeyFlags.EncryptCommunications},
hashes=[HashAlgorithm.SHA512],
ciphers=[SymmetricKeyAlgorithm.AES256],
compression=[CompressionAlgorithm.Uncompressed]
)
return key
def test_encrypt_message(self, pub, cipher):
if pub.key_algorithm in {PubKeyAlgorithm.DSA}:
pytest.skip('Asymmetric encryption only implemented for RSA/ECDH currently')
if cipher in {SymmetricKeyAlgorithm.Plaintext, SymmetricKeyAlgorithm.Twofish256, SymmetricKeyAlgorithm.IDEA}:
pytest.xfail('Symmetric cipher {} not supported for encryption'.format(cipher))
# test encrypting a message
mtxt = "This message will have been encrypted"
msg = PGPMessage.new(mtxt)
with warnings.catch_warnings():
warnings.simplefilter('ignore')
emsg = pub.encrypt(msg, cipher=cipher)
self.msgs[(pub.fingerprint, cipher)] = emsg
def test_protect_protected_key(self, rsa_enc, recwarn):
rsa_enc.protect('QwertyUiop', SymmetricKeyAlgorithm.CAST5, HashAlgorithm.SHA1)
w = recwarn.pop(UserWarning)
assert str(w.message) == "This key is already protected with a passphrase - " \
"please unlock it before attempting to specify a new passphrase"
assert w.filename == __file__
def test_encrypt_passphrase_2(self):
mtxt = "This message is to be encrypted"
msg = PGPMessage.new(mtxt)
assert not msg.is_encrypted
sk = SymmetricKeyAlgorithm.AES256.gen_key()
encmsg = msg.encrypt("QwertyUiop", sessionkey=sk).encrypt("AsdfGhjkl", sessionkey=sk)
assert isinstance(encmsg, PGPMessage)
assert encmsg.is_encrypted
assert encmsg.type == 'encrypted'
# decrypt with PGPy only, since GnuPG can't do multiple passphrases
for passwd in ["QwertyUiop", "AsdfGhjkl"]:
decmsg = encmsg.decrypt(passwd)
assert isinstance(decmsg, PGPMessage)
assert decmsg.type == msg.type
assert decmsg.is_compressed
assert decmsg.message == mtxt
def test_add_altuid(self, pkspec):
if pkspec not in self.keys:
pytest.skip('Keyspec {} not in keys; must not have generated'.format(pkspec))
key = self.keys[pkspec]
uid = PGPUID.new('T. Keyerson', 'Secondary UID', 'testkey@localhost.local')
expiration = datetime.utcnow() + timedelta(days=2)
# add all of the sbpackets that only work on self-certifications
with warnings.catch_warnings():
warnings.simplefilter('ignore')
key.add_uid(uid,
usage=[KeyFlags.Certify, KeyFlags.Sign],
ciphers=[SymmetricKeyAlgorithm.AES256, SymmetricKeyAlgorithm.Camellia256],
hashes=[HashAlgorithm.SHA384],
compression=[CompressionAlgorithm.ZLIB],
key_expiration=expiration,
keyserver_flags={KeyServerPreferences.NoModify},
keyserver='about:none',
primary=False)
sig = uid.selfsig
assert sig.type == SignatureType.Positive_Cert
assert sig.cipherprefs == [SymmetricKeyAlgorithm.AES256, SymmetricKeyAlgorithm.Camellia256]
assert sig.hashprefs == [HashAlgorithm.SHA384]
assert sig.compprefs == [CompressionAlgorithm.ZLIB]
assert sig.features == {Features.ModificationDetection}
assert sig.key_expiration == expiration - key.created
assert sig.keyserver == 'about:none'
@encalg.register(SymmetricKeyAlgorithm)
def encalg_int(self, val):
self._encalg = SymmetricKeyAlgorithm(val)
:type passphrase: ``str``, ``unicode``, ``bytes``
:optional param sessionkey: Provide a session key to use when encrypting something. Default is ``None``.
If ``None``, a session key of the appropriate length will be generated randomly.
.. warning::
Care should be taken when making use of this option! Session keys *absolutely need*
to be unpredictable! Use the ``gen_key()`` method on the desired
:py:obj:`~constants.SymmetricKeyAlgorithm` to generate the session key!
:type sessionkey: ``bytes``, ``str``
:raises: :py:exc:`~errors.PGPEncryptionError`
:returns: A new :py:obj:`PGPMessage` containing the encrypted contents of this message.
"""
cipher_algo = prefs.pop('cipher', SymmetricKeyAlgorithm.AES256)
hash_algo = prefs.pop('hash', HashAlgorithm.SHA256)
# set up a new SKESessionKeyV4
skesk = SKESessionKeyV4()
skesk.s2k.usage = 255
skesk.s2k.specifier = 3
skesk.s2k.halg = hash_algo
skesk.s2k.encalg = cipher_algo
skesk.s2k.count = skesk.s2k.halg.tuned_count
if sessionkey is None:
sessionkey = cipher_algo.gen_key()
skesk.encrypt_sk(passphrase, sessionkey)
del passphrase
msg = PGPMessage() | skesk
def key_size(self):
ks = {SymmetricKeyAlgorithm.IDEA: 128,
SymmetricKeyAlgorithm.TripleDES: 192,
SymmetricKeyAlgorithm.CAST5: 128,
SymmetricKeyAlgorithm.Blowfish: 128,
SymmetricKeyAlgorithm.AES128: 128,
SymmetricKeyAlgorithm.AES192: 192,
SymmetricKeyAlgorithm.AES256: 256,
SymmetricKeyAlgorithm.Twofish256: 256,
SymmetricKeyAlgorithm.Camellia128: 128,
SymmetricKeyAlgorithm.Camellia192: 192,
SymmetricKeyAlgorithm.Camellia256: 256}
if self in ks:
return ks[self]
raise NotImplementedError(repr(self))
def decrypt_sk(self, passphrase):
# derive the first session key from our passphrase
sk = self.s2k.derive_key(passphrase)
del passphrase
# if there is no ciphertext, then the first session key is the session key being used
if len(self.ct) == 0:
return self.symalg, sk
# otherwise, we now need to decrypt the encrypted session key
m = bytearray(_decrypt(bytes(self.ct), sk, self.symalg))
del sk
symalg = SymmetricKeyAlgorithm(m[0])
del m[0]
return symalg, bytes(m)