Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for name, field in self._fields.items():
value = getattr(self, name, field.default)
node_xml = field.to_xml(value)
nodes.append(node_xml)
data = '\n'.join(nodes)
return tpl.format(data=data)
def __str__(self):
if six.PY2:
return to_binary(self.render())
else:
return to_text(self.render())
@register_reply('empty')
class EmptyReply(BaseReply):
"""
回复空串
微信服务器不会对此作任何处理,并且不会发起重试
"""
def __init__(self):
pass
def render(self):
return ''
@register_reply('text')
class TextReply(BaseReply):
"""
文本回复
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
session_id = StringField('SessionID')
content = Base64EncodeField('Content')
@register_reply('device_status')
class DeviceStatusReply(BaseReply):
type = 'device_status'
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
status = IntegerField('DeviceStatus')
@register_reply('hardware')
class HardwareReply(BaseReply):
type = 'hardware'
func_flag = IntegerField('FuncFlag', 0)
hardware = HardwareField('HardWare')
def create_reply(reply, message=None, render=False):
"""
Create a reply quickly
"""
r = None
if not reply:
r = EmptyReply()
elif isinstance(reply, BaseReply):
r = reply
if message:
r.source = message.target
articles.append(article)
self.articles = articles
@register_reply('transfer_customer_service')
class TransferCustomerServiceReply(BaseReply):
"""
将消息转发到多客服
详情请参阅
http://mp.weixin.qq.com/wiki/5/ae230189c9bd07a6b221f48619aeef35.html
"""
type = 'transfer_customer_service'
@register_reply('device_text')
class DeviceTextReply(BaseReply):
type = 'device_text'
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
session_id = StringField('SessionID')
content = Base64EncodeField('Content')
@register_reply('device_event')
class DeviceEventReply(BaseReply):
type = 'device_event'
event = StringField('Event')
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
session_id = StringField('SessionID')
content = Base64EncodeField('Content')
try:
msg = crypto.decrypt_message(
request.data,
signature,
timestamp,
nonce,
)
except (InvalidSignatureException, InvalidCorpIdException):
return abort(403)
else:
request.wechat_msg = parse_message(msg)
res = method(*args, **kwargs)
xml = ''
if isinstance(res, BaseReply):
xml = res.render()
crypto = WeChatCrypto(
current_app.config['WECHAT_TOKEN'],
current_app.config['WECHAT_AES_KEY'],
current_app.config['WECHAT_APPID']
)
xml = crypto.encrypt_message(xml, nonce, timestamp)
return xml
session_id = StringField('SessionID')
content = Base64EncodeField('Content')
@register_reply('device_event')
class DeviceEventReply(BaseReply):
type = 'device_event'
event = StringField('Event')
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
session_id = StringField('SessionID')
content = Base64EncodeField('Content')
@register_reply('device_status')
class DeviceStatusReply(BaseReply):
type = 'device_status'
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
status = IntegerField('DeviceStatus')
@register_reply('hardware')
class HardwareReply(BaseReply):
type = 'hardware'
func_flag = IntegerField('FuncFlag', 0)
hardware = HardwareField('HardWare')
def create_reply(reply, message=None, render=False):
"""
Create a reply quickly
http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
"""
type = 'image'
image = ImageField('Image')
@property
def media_id(self):
return self.image
@media_id.setter
def media_id(self, value):
self.image = value
@register_reply('voice')
class VoiceReply(BaseReply):
"""
语音回复
详情请参阅
http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
"""
type = 'voice'
voice = VoiceField('Voice')
@property
def media_id(self):
return self.voice
@media_id.setter
def media_id(self, value):
self.voice = value
http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
"""
type = 'voice'
voice = VoiceField('Voice')
@property
def media_id(self):
return self.voice
@media_id.setter
def media_id(self, value):
self.voice = value
@register_reply('video')
class VideoReply(BaseReply):
"""
视频回复
详情请参阅
http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
"""
type = 'video'
video = VideoField('Video', {})
@property
def media_id(self):
return self.video.get('media_id')
@media_id.setter
def media_id(self, value):
video = self.video
video['media_id'] = value
def create_reply(reply, message=None, render=False):
"""
Create a reply quickly
"""
r = None
if not reply:
r = EmptyReply()
elif isinstance(reply, BaseReply):
r = reply
if message:
r.source = message.target
r.target = message.source
elif isinstance(reply, six.string_types):
r = TextReply(
message=message,
content=reply
)
elif isinstance(reply, (tuple, list)):
if len(reply) > 10:
raise AttributeError("Can't add more than 10 articles"
" in an ArticlesReply")
r = ArticlesReply(
message=message,
articles=reply
def render(self):
return ''
@register_reply('text')
class TextReply(BaseReply):
"""
文本回复
详情请参阅 http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
"""
type = 'text'
content = StringField('Content')
@register_reply('image')
class ImageReply(BaseReply):
"""
图片回复
详情请参阅
http://mp.weixin.qq.com/wiki/9/2c15b20a16019ae613d413e30cac8ea1.html
"""
type = 'image'
image = ImageField('Image')
@property
def media_id(self):
return self.image
@media_id.setter
def media_id(self, value):
self.image = value