Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_prpcrypto():
key = "ReUrr0NKeHkppBQq"
assert len(key) == 16
crypto = PrpCrypto(key)
text = generate_token(32)
app_id = generate_token(32)
assert crypto.decrypt(crypto.encrypt(text, app_id),
app_id) == to_binary(text)
def test_to_binary():
assert to_binary(6) == six.binary_type(6)
assert to_binary(b"aa") == b"aa"
assert to_binary("cc") == b"cc"
if six.PY2:
assert to_binary(u"喵") == "喵"
assert to_binary("喵") == "喵"
def test_to_binary():
assert to_binary(6) == six.binary_type(6)
assert to_binary(b"aa") == b"aa"
assert to_binary("cc") == b"cc"
if six.PY2:
assert to_binary(u"喵") == "喵"
assert to_binary("喵") == "喵"
def test_to_binary():
assert to_binary(6) == six.binary_type(6)
assert to_binary(b"aa") == b"aa"
assert to_binary("cc") == b"cc"
if six.PY2:
assert to_binary(u"喵") == "喵"
assert to_binary("喵") == "喵"
def remove_session(session):
try:
del session[to_binary("fromUser")]
except:
pass
def test_article():
article = Article(
title="tt", description=to_binary("附近的萨卡里发生"), img="http", url="uuu"
)
assert article.render().strip() == to_text(
"""
def get_reply(self, message):
"""
Return the raw xml reply for the given message.
"""
session_storage = self.config["SESSION_STORAGE"]
id = None
session = None
if session_storage and hasattr(message, "source"):
id = to_binary(message.source)
session = session_storage[id]
handlers = self.get_handlers(message.type)
try:
for handler, args_count in handlers:
args = [message, session][:args_count]
reply = handler(*args)
if session_storage and id:
session_storage[id] = session
if reply:
return reply
except:
self.logger.warning("Catch an exception", exc_info=True)
def encode(text):
# 计算需要填充的位数
amount_to_pad = _BLOCK_SIZE - (len(text) % _BLOCK_SIZE)
if not amount_to_pad:
amount_to_pad = _BLOCK_SIZE
# 获得补位所用的字符
pad = chr(amount_to_pad)
return text + to_binary(pad * amount_to_pad)
def encrypt(self, text, app_id):
"""
对明文进行加密
:param text: 需要加密的明文
:param app_id: 微信公众平台的 AppID
:return: 加密后的字符串
"""
text = b"".join(
[
to_binary(self.get_random_string()),
struct.pack(b"I", socket.htonl(len(to_binary(text)))),
to_binary(text),
to_binary(app_id)
]
)
text = pkcs7.encode(text)
encryptor = self.cipher.encryptor()
ciphertext = to_binary(encryptor.update(text) + encryptor.finalize())
return base64.b64encode(ciphertext)
def decrypt(self, text, app_id):
"""
对密文进行解密
:param text: 需要解密的密文
:param app_id: 微信公众平台的 AppID
:return: 解密后的字符串
"""
text = to_binary(text)
decryptor = self.cipher.decryptor()
plain_text = decryptor.update(base64.b64decode(text)
) + decryptor.finalize()
padding = byte2int(plain_text, -1)
content = plain_text[16:-padding]
xml_len = socket.ntohl(struct.unpack("I", content[:4])[0])
xml_content = content[4:xml_len + 4]
from_appid = content[xml_len + 4:]
if to_text(from_appid) != app_id:
raise AppIdValidationError(text, app_id)
return xml_content