Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"https://stuvel.eu/python-rsa-doc/usage.html#working-with-big-files "
"for more information.",
DeprecationWarning, stacklevel=2)
if not isinstance(pub_key, key.PublicKey):
raise TypeError('Public key required, but got %r' % pub_key)
key_bytes = common.bit_size(pub_key.n) // 8
blocksize = key_bytes - 11 # keep space for PKCS#1 padding
# Write the version number to the VARBLOCK file
outfile.write(byte(varblock.VARBLOCK_VERSION))
# Encrypt and write each block
for block in varblock.yield_fixedblocks(infile, blocksize):
crypto = pkcs1.encrypt(block, pub_key)
varblock.write_varint(outfile, len(crypto))
outfile.write(crypto)
def test_find_signature_hash(self):
"""Test happy flow of sign and find_signature_hash"""
message = b'je moeder'
signature = pkcs1.sign(message, self.priv, 'SHA-256')
self.assertEqual('SHA-256', pkcs1.find_signature_hash(signature, self.pub))
def sign(self, message):
"""Signs a message.
Args:
message: bytes, Message to be signed.
Returns:
string, The signature of the message for the given key.
"""
message = _to_bytes(message, encoding='utf-8')
return rsa.pkcs1.sign(message, self._key, 'SHA-256')
def _check_challenge(signature):
signature = base64.b64decode(signature.data)
try:
rsa.verify(challenge.encode('utf-8'), signature, pub_key)
print('[*] Challenge successfully verified.')
_revoke_challenge()
except rsa.pkcs1.VerificationError:
print('[!] Received wrong signature for challenge.')
raise Exception('Access Denied.')
except (TypeError, AttributeError):
print('[!] Challenge already unset.')
raise Exception('Access Denied.')
def verify(self, msg, sig):
if not self.is_public():
return False
try:
pyrsa.verify(msg, sig, self._prepared_key)
return True
except pyrsa.pkcs1.VerificationError:
return False
asymmetric_decrypt_verify() function.
"""
plaintext, signature = None, None
try:
blocks = []
for i in range(0, len(ciphertext), DECRYPTION_BLOCK_SIZE):
blocks.append(decrypt(ciphertext[i:i + DECRYPTION_BLOCK_SIZE],
decrypt_key))
plaintext = b''.join(blocks)
signature, plaintext = plaintext[:DECRYPTION_BLOCK_SIZE], \
plaintext[DECRYPTION_BLOCK_SIZE:]
except TypeError:
log("Warning: TypeError has occurred during a decrypt/verify. \n\t"
"CIPHERTEXT({0}): {1}".format(len(ciphertext),
ciphertext))
except pkcs1.DecryptionError:
log("Warning: RSA Decryption Failed. \n\tCIPHERTEXT({0}): {1}"
.format(len(ciphertext), ciphertext))
if plaintext and signature:
return plaintext, signature
else:
return None, None
def verify(data, publickey, sign):
import rsa
from rsa import pkcs1
pub = rsa.PublicKey.load_pkcs1(publickey, format="DER")
try:
valid = rsa.pkcs1.verify(data, sign, pub)
except pkcs1.VerificationError:
valid = False
return valid
now = int(time.time())
payload = {
'aud': self._token_uri,
'scope': self._scopes,
'iat': now,
'exp': now + _ServiceAccountCredentials.MAX_TOKEN_LIFETIME_SECS,
'iss': self._service_account_email
}
payload.update(self._kwargs)
assertion_input = (_urlsafe_b64encode(header) + b'.' +
_urlsafe_b64encode(payload))
# Sign the assertion.
rsa_bytes = rsa.pkcs1.sign(assertion_input, self._private_key, 'SHA-256')
signature = base64.urlsafe_b64encode(rsa_bytes).rstrip(b'=')
return assertion_input + b'.' + signature
for signing_key, x509 in list(json.loads(resp.content).items()):
der = rsa.pem.load_pem(x509, 'CERTIFICATE')
asn1_cert, _ = decoder.decode(der, asn1Spec=Certificate())
key_bitstring = (
asn1_cert['tbsCertificate']
['subjectPublicKeyInfo']
['subjectPublicKey'])
key_bytearray = BitStringToByteString(key_bitstring)
pub = rsa.PublicKey.load_pkcs1(key_bytearray, 'DER')
try:
if rsa.pkcs1.verify(msg, sig, pub):
self.__x509 = x509
self.__signing_key = signing_key
return
except rsa.pkcs1.VerificationError:
pass
raise apiproxy_errors.ApplicationError(
app_identity_service_pb.AppIdentityServiceError.UNKNOWN_ERROR,
'Unable to find matching X509 cert for private key: %s' % url)
# python-rsa lib hashes all messages it signs. ADB does it already, we just
# need to slap a signature on top of already hashed message. Introduce "fake"
# hashing algo for this.
class _Accum(object):
def __init__(self):
self._buf = b''
def update(self, msg):
self._buf += msg
def digest(self):
return self._buf
pkcs1.HASH_METHODS['SHA-1-PREHASHED'] = _Accum
pkcs1.HASH_ASN1['SHA-1-PREHASHED'] = pkcs1.HASH_ASN1['SHA-1']
def _load_rsa_private_key(pem):
"""PEM encoded PKCS#8 private key -> rsa.PrivateKey."""
# ADB uses private RSA keys in pkcs#8 format. 'rsa' library doesn't support
# them natively. Do some ASN unwrapping to extract naked RSA key
# (in der-encoded form). See https://www.ietf.org/rfc/rfc2313.txt.
# Also http://superuser.com/a/606266.
try:
der = rsa.pem.load_pem(pem, 'PRIVATE KEY')
keyinfo, _ = decoder.decode(der)
if keyinfo[1][0] != univ.ObjectIdentifier(
'1.2.840.113549.1.1.1'): # pragma: no cover
raise ValueError('Not a DER-encoded OpenSSL private RSA key')
private_key_der = keyinfo[2].asOctets()
except IndexError: # pragma: no cover