Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def perform_operation(self, indata, pub_key, cli_args):
"""Verifies files."""
signature_file = cli_args[1]
with open(signature_file, 'rb') as sigfile:
signature = sigfile.read()
try:
rsa.verify(indata, signature, pub_key)
except rsa.VerificationError:
raise SystemExit('Verification failed.')
print('Verification OK', file=sys.stderr)
continue
if not 'ae_hello' in header: continue
hello_message_dict[seq] = msg
if 'EOL' in ending:
hello_message_len = seq +1
if range(hello_message_len) == hello_message_dict.keys():
try:
for i in range(hello_message_len):
full_hello += hello_message_dict[i]
modulus = full_hello[:10] #this is the first 10 bytes of modulus of auditor's pubkey
sig = str(full_hello[10:]) #this is a sig for 'ae_hello||auditee nick'. The auditor is expected to have received auditee's pubkey via other channels
if modulus != my_modulus : continue
rsa.verify('ae_hello'+nick, sig, auditee_public_key)
#we get here if there was no exception
auditee_nick = nick
except:
print ('Verification of a hello message failed')
continue
if not b_is_auditee_registered:
return ('failure',)
signed_hello = rsa.sign('ao_hello'+my_nick, my_private_key, 'SHA-1')
#send twice because it was observed that the msg would not appear on the chan
for x in range(2):
shared.tlsn_send_single_msg('ao_hello',signed_hello,auditee_public_key,ctrprty_nick = auditee_nick)
time.sleep(2)
progress_queue.put(time.strftime('%H:%M:%S', time.localtime()) + \
': Auditee has been authorized. Awaiting data...')
def valid_token():
r = requests.get("http://zlclclc.cn/get_token")
s = eval(r.json()["signature"])
signature = base64.decodestring(s)
crypto = r.json()["token_message"]
message = crypto[2:-6]+'\n'
with open('pubkey.pem','r') as f:
pubkey = rsa.PublicKey.load_pkcs1(f.read().encode())
v = rsa.verify(message.encode(), signature, pubkey)
try:
rsa.verify(message.encode(), signature, pubkey)
sign_valid = 1
except:
sign_valid = 0
token_message = crypto[2:110]+'\n'+crypto[112:115]
parameter = {"sign_valid":sign_valid,"token":token_message}
valid_key = requests.post(url="http://zlclclc.cn/exist_token",data = json.dumps(parameter))
return token_message
def _check_challenge(signature):
signature = base64.b64decode(signature.data)
try:
rsa.verify(challenge.encode('utf-8'), signature, pub_key)
print('[*] Challenge successfully verified.')
_revoke_challenge()
except rsa.pkcs1.VerificationError:
print('[!] Received wrong signature for challenge.')
raise Exception('Access Denied.')
except (TypeError, AttributeError):
print('[!] Challenge already unset.')
raise Exception('Access Denied.')
def verify(data, sign, pemKeyfile):
sign = base64.b64decode(sign)
pubKey = _load_public_key(pemKeyfile)
result = False
try:
rsa.verify(data.encode(), sign, pubKey)
except rsa.pkcs1.VerificationError:
result = False
else:
result = True
return result
def check_sign(message, sign):
"""
验证自签名
:param message:
:param sign:
:return:
"""
sign = base64.b64decode(sign)
pubkey = rsa.PublicKey.load_pkcs1_openssl_pem(alipay_config.RSA_PUBLIC)
return rsa.verify(message, sign, pubkey)
def verify_with_rsa(public_key, message, sign):
public_key = fill_public_key_marker(public_key)
sign = base64.b64decode(sign)
return rsa.verify(message, sign, rsa.PublicKey.load_pkcs1_openssl_pem(public_key))
def valid_token():
r = requests.get("http://zlclclc.cn/get_token")
s = eval(r.json()["signature"])
signature = base64.decodestring(s)
crypto = r.json()["token_message"]
message = crypto[2:-6]+'\n'
with open('pubkey.pem','r') as f:
pubkey = rsa.PublicKey.load_pkcs1(f.read().encode())
v = rsa.verify(message.encode(), signature, pubkey)
try:
rsa.verify(message.encode(), signature, pubkey)
sign_valid = 1
except:
sign_valid = 0
token_message = crypto[2:110]+'\n'+crypto[112:115]
parameter = {"sign_valid":sign_valid,"token":token_message}
valid_key = requests.post(url="http://zlclclc.cn/exist_token",data = json.dumps(parameter))
return token_message