Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_private_to_public(self):
with cli_args('-i', self.priv_fname, '-o', 'test_private_to_public.pem'):
with captured_output():
rsa.util.private_to_public()
# Check that the key is indeed valid.
with open('test_private_to_public.pem', 'rb') as pemfile:
key = rsa.PublicKey.load_pkcs1(pemfile.read())
self.assertEqual(self.priv_key.n, key.n)
self.assertEqual(self.priv_key.e, key.e)
def test_sign_blob(self):
private_key_id, signature = self.credentials.sign_blob('Google')
self.assertEqual(self.private_key_id, private_key_id)
pub_key = rsa.PublicKey.load_pkcs1_openssl_pem(
datafile('publickey_openssl.pem'))
self.assertTrue(rsa.pkcs1.verify(b'Google', signature, pub_key))
with self.assertRaises(rsa.pkcs1.VerificationError):
rsa.pkcs1.verify(b'Orest', signature, pub_key)
with self.assertRaises(rsa.pkcs1.VerificationError):
rsa.pkcs1.verify(b'Google', b'bad signature', pub_key)
def login(self):
"""Login to LINE server."""
if self.provider == CurveThrift.Provider.LINE: # LINE
j = self._get_json(self.LINE_SESSION_LINE_URL)
else: # NAVER
j = self._get_json(self.LINE_SESSION_NAVER_URL)
session_key = j['session_key']
message = (chr(len(session_key)) + session_key +
chr(len(self.id)) + self.id +
chr(len(self.password)) + self.password).encode('utf-8')
keyname, n, e = j['rsa_key'].split(",")
pub_key = rsa.PublicKey(int(n,16), int(e,16))
crypto = rsa.encrypt(message, pub_key).encode('hex')
self.transport = THttpClient.THttpClient(self.LINE_HTTP_URL)
self.transport.setCustomHeaders(self._headers)
self.protocol = TCompactProtocol.TCompactProtocol(self.transport)
self._client = CurveThrift.Client(self.protocol)
try:
with open(self.CERT_FILE,'r') as f:
self.certificate = f.read()
f.close()
except:
self.certificate = ""
msg = self._client.loginWithIdentityCredentialForCertificate(
async def _get_rsa(self):
async with self.session.post(SteamUrls.Store.value + '/login/getrsakey/',
data={'username': self.username}) as resp:
resp = await resp.json()
try:
mod = int(resp['publickey_mod'], 16)
exp = int(resp['publickey_exp'], 16)
timestamp = resp['timestamp']
except KeyError:
if self._repeats >= 10:
raise ValueError("Unable to obtain rsa keys")
else:
self._repeats += 1
return await self._get_rsa()
else:
return {'rsa_key': rsa.PublicKey(mod, exp), 'rsa_timestamp': timestamp}
def privatekeyToPublickey(privatekey):
import rsa
from rsa import pkcs1
if "BEGIN RSA PRIVATE KEY" not in privatekey:
privatekey = "-----BEGIN RSA PRIVATE KEY-----\n%s\n-----END RSA PRIVATE KEY-----" % privatekey
priv = rsa.PrivateKey.load_pkcs1(privatekey)
pub = rsa.PublicKey(priv.n, priv.e)
return pub.save_pkcs1("DER")
def get_pubkey_tuple(self):
pub_key = rsa.PublicKey(self.key.n, self.key.e)
logger.debug('pub key: %s' % (pub_key))
logger.debug('pub key: e=%s, n=%s' % (pub_key.e, pub_key.n))
return pub_key.e, pub_key.n
def get_password(self, password, servertime, nonce, pubkey):
rsaPublickey = int(pubkey, 16)
key = rsa.PublicKey(rsaPublickey, 65537) # 创建公钥
message = str(servertime) + '\t' + str(nonce) + '\n' + str(password) # 拼接明文js加密文件中得到
message = message.encode("utf-8")
passwd = rsa.encrypt(message, key) # 加密
passwd = binascii.b2a_hex(passwd) # 将加密信息转换为16进制。
return passwd
def __crypt(self, mail, passwd, RSA):
message = (chr(len(RSA.sessionKey)) + RSA.sessionKey +
chr(len(mail)) + mail +
chr(len(passwd)) + passwd).encode('utf-8')
pub_key = rsa.PublicKey(int(RSA.nvalue, 16), int(RSA.evalue, 16))
crypto = rsa.encrypt(message, pub_key).encode('hex')
return crypto