Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_check_signature_should_fail(self):
from wechatpy.exceptions import InvalidSignatureException
token = 'test'
signature = 'f21891de399b4e33a1a93c9a7b8a8fffb5a443fe'
timestamp = '1410685589'
nonce = 'test'
self.assertRaises(
InvalidSignatureException,
check_signature,
token, signature, timestamp, nonce
)
def test_check_signature_should_fail(self):
from wechatpy.exceptions import InvalidSignatureException
signature = 'dd6b9c95b495b3f7e2901bfbc76c664930ffdb96'
timestamp = '1411443780'
nonce = '437374424'
echo_str = '4ByGGj+sVCYcvGeQYhaKIk1o0pQRNbRjxybjTGblXrBaXlTXeOo1+bXFXDQQb1o6co6Yh9Bv41n7hOchLF6p+Q==' # NOQA
crypto = WeChatCrypto(self.token, self.encoding_aes_key, self.corp_id)
self.assertRaises(
InvalidSignatureException,
crypto.check_signature,
signature, timestamp, nonce, echo_str
)
def test_check_wxa_signature(self):
from wechatpy.exceptions import InvalidSignatureException
# 微信官方示例
raw_data = '{"nickName":"Band","gender":1,"language":"zh_CN","city":"Guangzhou","province":"Guangdong","country":"CN","avatarUrl":"http://wx.qlogo.cn/mmopen/vi_32/1vZvI39NWFQ9XM4LtQpFrQJ1xlgZxx3w7bQxKARol6503Iuswjjn6nIGBiaycAjAtpujxyzYsrztuuICqIM5ibXQ/0"}' # noqa
session_key = 'HyVFkGl5F5OQWJZZaNzBBg=='
client_signature = '75e81ceda165f4ffa64f4068af58c64b8f54b88c'
check_wxa_signature(session_key, raw_data, client_signature)
client_signature = "fake_sign"
self.assertRaises(
InvalidSignatureException,
check_wxa_signature,
session_key, raw_data, client_signature
)
# 带中文的示例
raw_data = '{"nickName":"Xavier-Lam林","gender":1,"language":"zh_CN","city":"Ningde","province":"Fujian","country":"China","avatarUrl":"https://wx.qlogo.cn/mmopen/vi_32/vTxUxcbjcZ8t9eU6YfXBwRU89KS9uRILEDro01MTYp7UKYsyTjLFMIVhB0AlBuEvLHbhmO3OpaHw5zwlSetuLg/132"}' # noqa
session_key = 'GtYYez5b/M5HhT4L7n31gQ=='
client_signature = '8fde625b7640734a13c071c05d792b5cef21cf89'
check_wxa_signature(session_key, raw_data, client_signature)
def on_get(self, req, resp):
query_string = req.query_string
query_list = query_string.split('&')
b = {}
for i in query_list:
b[i.split('=')[0]] = i.split('=')[1]
try:
check_signature(token='lengxiao', signature=b['signature'], timestamp=b['timestamp'], nonce=b['nonce'])
resp.body = (b['echostr'])
except InvalidSignatureException:
pass
resp.status = falcon.HTTP_200
def check_signature(token, signature, timestamp, nonce):
"""Check WeChat callback signature, raises InvalidSignatureException
if check failed.
:param token: WeChat callback token
:param signature: WeChat callback signature sent by WeChat server
:param timestamp: WeChat callback timestamp sent by WeChat server
:param nonce: WeChat callback nonce sent by WeChat sever
"""
signer = WeChatSigner()
signer.add_data(token, timestamp, nonce)
if signer.signature != signature:
from wechatpy.exceptions import InvalidSignatureException
raise InvalidSignatureException()
def __init__(self, errcode=-40001, errmsg='Invalid signature'):
super(InvalidSignatureException, self).__init__(errcode, errmsg)
# POST
if encrypt_type == 'raw':
# plaintext mode
msg = parse_message(request.httprequest.data)
else:
# encryption mode
msg = None
try:
msg = self.crypto.decrypt_message(
request.httprequest.data,
msg_signature,
timestamp,
nonce
)
except (InvalidSignatureException, InvalidAppIdException):
return abort(403)
msg = parse_message(msg)
_logger.info('>>> %s %s'%(msg.type, msg))
ret = ''
if msg.type in ['text', 'image', 'voice']:
from .handlers.app_handler import app_kf_handler
ret = app_kf_handler(request, msg)
return 'success'
"""校验前端传来的rawData签名正确
详情请参考
https://developers.weixin.qq.com/miniprogram/dev/framework/open-ability/signature.html # noqa
:param session_key: code换取的session_key
:param raw_data: 前端拿到的rawData
:param client_signature: 前端拿到的signature
:raises: InvalidSignatureException
:return: 返回数据dict
"""
str2sign = (raw_data + session_key).encode("utf-8")
signature = hashlib.sha1(str2sign).hexdigest()
if signature != client_signature:
from wechatpy.exceptions import InvalidSignatureException
raise InvalidSignatureException()
msg_signature = request.params.get("msg_signature")
timestamp = request.params.get("timestamp")
nonce = request.params.get("nonce")
echo_str = request.params.get('echostr', '')
if request.httprequest.method == 'GET':
try:
echo_str = self.crypto.decrypt_message(
{'Encrypt': echo_str},
msg_signature,
timestamp,
nonce
)
except InvalidSignatureException:
abort(403)
return echo_str
# POST
msg = None
try:
msg = self.crypto.decrypt_message(
request.httprequest.data,
msg_signature,
timestamp,
nonce
)
except (InvalidSignatureException, InvalidCorpIdException):
abort(403)
msg = parse_message(msg)
ss = '>>> handle msg: %s %s %s'%(msg.type, msg.id, msg)
def get(self):
echostr = self.get_argument('echostr', '')
signature = self.get_argument('signature', '')
timestamp = self.get_argument('timestamp', '')
nonce = self.get_argument('nonce', '')
try:
check_signature(options.token, signature, timestamp, nonce)
except InvalidSignatureException:
logging.warning("Signature check failed.")
else:
logging.info("Signature check success.")
self.write(echostr)