Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@register_event('user_scan_product')
class UserScanProductEvent(BaseEvent):
"""
打开商品主页事件
详情请参考
https://mp.weixin.qq.com/wiki?id=mp1455872179
"""
event = 'user_scan_product'
standard = StringField('KeyStandard')
key = StringField('KeyStr')
country = StringField('Country')
province = StringField('Province')
city = StringField('City')
sex = IntegerField('Sex')
scene = IntegerField('Scene')
@register_event('user_scan_product_enter_session')
class UserScanProductEnterSessionEvent(BaseEvent):
"""
进入公众号事件
详情请参考
https://mp.weixin.qq.com/wiki?id=mp1455872179
"""
event = 'user_scan_product_enter_session'
standard = StringField('KeyStandard')
key = StringField('KeyStr')
扫码推事件且弹出“消息接收中”提示框的事件
详情请参阅
http://qydev.weixin.qq.com/wiki/index.php?title=接收事件#.E6.89.AB.E7.A0.81.E6.8E.A8.E4.BA.8B.E4.BB.B6.E4.B8.94.E5.BC.B9.E5.87.BA.E2.80.9C.E6.B6.88.E6.81.AF.E6.8E.A5.E6.94.B6.E4.B8.AD.E2.80.9D.E6.8F.90.E7.A4.BA.E6.A1.86.E7.9A.84.E4.BA.8B.E4.BB.B6.E6.8E.A8.E9.80.81
"""
agent = IntegerField('AgentID', 0)
event = 'scancode_waitmsg'
@register_event('pic_sysphoto')
class PicSysPhotoEvent(events.PicSysPhotoEvent):
"""
弹出系统拍照发图事件
详情请参阅
http://qydev.weixin.qq.com/wiki/index.php?title=接收事件#.E5.BC.B9.E5.87.BA.E7.B3.BB.E7.BB.9F.E6.8B.8D.E7.85.A7.E5.8F.91.E5.9B.BE.E7.9A.84.E4.BA.8B.E4.BB.B6.E6.8E.A8.E9.80.81
"""
agent = IntegerField('AgentID', 0)
event = 'pic_sysphoto'
@register_event('pic_photo_or_album')
class PicPhotoOrAlbumEvent(events.PicPhotoOrAlbumEvent):
"""
弹出拍照或相册发图事件
详情请参阅
http://qydev.weixin.qq.com/wiki/index.php?title=接收事件#.E5.BC.B9.E5.87.BA.E6.8B.8D.E7.85.A7.E6.88.96.E8.80.85.E7.9B.B8.E5.86.8C.E5.8F.91.E5.9B.BE.E7.9A.84.E4.BA.8B.E4.BB.B6.E6.8E.A8.E9.80.81
"""
agent = IntegerField('AgentID', 0)
event = 'pic_photo_or_album'
@register_event('pic_weixin')
class PicWeChatEvent(events.PicWeChatEvent):
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1451025274
"""
event = 'user_consume_card'
card_id = StringField('CardId')
code = StringField('UserCardCode')
consume_source = StringField('ConsumeSource')
location_id = StringField('LocationId')
staff = StringField('StaffOpenId')
@register_event('merchant_order')
class MerchantOrderEvent(BaseEvent):
event = 'merchant_order'
order_id = StringField('OrderId')
order_status = IntegerField('OrderStatus')
product_id = StringField('ProductId')
sku_info = StringField('SkuInfo')
@register_event('kf_create_session')
class KfCreateSessionEvent(BaseEvent):
event = 'kf_create_session'
account = StringField('KfAccount')
@register_event('kf_close_session')
class KfCloseSessionEvent(BaseEvent):
event = 'kf_close_session'
account = StringField('KfAccount')
if isinstance(v, FieldDescriptor):
attrs[k] = copy.deepcopy(v.field)
cls = super(MessageMetaClass, cls).__new__(cls, name, bases, attrs)
cls._fields = {}
for name, field in cls.__dict__.items():
if isinstance(field, BaseField):
field.add_to_class(cls, name)
return cls
class BaseMessage(six.with_metaclass(MessageMetaClass)):
"""Base class for all messages and events"""
type = 'unknown'
id = IntegerField('MsgId', 0)
source = StringField('FromUserName')
target = StringField('ToUserName')
create_time = DateTimeField('CreateTime')
time = IntegerField('CreateTime')
def __init__(self, message):
self._data = message
def __repr__(self):
_repr = "{klass}({msg})".format(
klass=self.__class__.__name__,
msg=repr(self._data)
)
if six.PY2:
return to_binary(_repr)
else:
return ret
@register_event('poi_check_notify')
class PoiCheckNotifyEvent(BaseEvent):
event = 'poi_check_notify'
poi_id = StringField('PoiId')
uniq_id = StringField('UniqId')
result = StringField('Result')
message = StringField('Msg')
@register_event('wificonnected')
class WiFiConnectedEvent(BaseEvent):
event = 'wificconnected'
connect_time = IntegerField('ConnectTime')
expire_time = IntegerField('ExpireTime')
vendor_id = StringField('VendorId')
shop_id = StringField('PlaceId')
bssid = StringField('DeviceNo')
# ============================================================================
# 微信认证事件推送
# ============================================================================
@register_event('qualification_verify_success')
class QualificationVerifySuccessEvent(BaseEvent):
"""
资质认证成功事件
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1455785130
@register_event('masssendjobfinish')
class MassSendJobFinishEvent(BaseEvent):
"""
群发消息任务完成事件
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1481187827_i0l21
"""
id = IntegerField('MsgID', 0)
event = 'masssendjobfinish'
status = StringField('Status')
total_count = IntegerField('TotalCount', 0)
filter_count = IntegerField('FilterCount', 0)
sent_count = IntegerField('SentCount', 0)
error_count = IntegerField('ErrorCount', 0)
@register_event('templatesendjobfinish')
class TemplateSendJobFinishEvent(BaseEvent):
"""
模板消息任务完成事件
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1433751277
"""
id = IntegerField('MsgID')
event = 'templatesendjobfinish'
status = StringField('Status')
class BaseScanCodeEvent(BaseEvent):
REPLY_TYPES = {}
def register_reply(reply_type):
def register(cls):
REPLY_TYPES[reply_type] = cls
return cls
return register
class BaseReply(six.with_metaclass(MessageMetaClass)):
"""Base class for all replies"""
source = StringField('FromUserName')
target = StringField('ToUserName')
time = IntegerField('CreateTime', time.time())
type = 'unknown'
def __init__(self, **kwargs):
self._data = {}
message = kwargs.pop('message', None)
if message and isinstance(message, BaseMessage):
if 'source' not in kwargs:
kwargs['source'] = message.target
if 'target' not in kwargs:
kwargs['target'] = message.source
if hasattr(message, 'agent') and 'agent' not in kwargs:
kwargs['agent'] = message.agent
if 'time' not in kwargs:
kwargs['time'] = time.time()
for name, value in kwargs.items():
field = self._fields.get(name)
class DeviceUnbindEvent(BaseEvent):
event = 'device_unbind'
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
session_id = StringField('SessionID')
content = Base64DecodeField('Content')
open_id = StringField('OpenID')
@register_event('device_subscribe_status')
class DeviceSubscribeStatusEvent(BaseEvent):
event = 'device_subscribe_status'
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
open_id = StringField('OpenID')
op_type = IntegerField('OpType')
@register_event('device_unsubscribe_status')
class DeviceUnsubscribeStatusEvent(BaseEvent):
event = 'device_unsubscribe_status'
device_type = StringField('DeviceType')
device_id = StringField('DeviceID')
open_id = StringField('OpenID')
op_type = IntegerField('OpType')
@register_event('shakearoundusershake')
class ShakearoundUserShakeEvent(BaseEvent):
event = 'shakearound_user_shake'
_chosen_beacon = BaseField('ChosenBeacon', {})
_around_beacons = BaseField('AroundBeacons', {})
@register_event('poi_check_notify')
class PoiCheckNotifyEvent(BaseEvent):
event = 'poi_check_notify'
poi_id = StringField('PoiId')
uniq_id = StringField('UniqId')
result = StringField('Result')
message = StringField('Msg')
@register_event('wificonnected')
class WiFiConnectedEvent(BaseEvent):
event = 'wificconnected'
connect_time = IntegerField('ConnectTime')
expire_time = IntegerField('ExpireTime')
vendor_id = StringField('VendorId')
shop_id = StringField('PlaceId')
bssid = StringField('DeviceNo')
# ============================================================================
# 微信认证事件推送
# ============================================================================
@register_event('qualification_verify_success')
class QualificationVerifySuccessEvent(BaseEvent):
"""
资质认证成功事件
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1455785130
"""
url = StringField('EventKey')
@register_event('masssendjobfinish')
class MassSendJobFinishEvent(BaseEvent):
"""
群发消息任务完成事件
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1481187827_i0l21
"""
id = IntegerField('MsgID', 0)
event = 'masssendjobfinish'
status = StringField('Status')
total_count = IntegerField('TotalCount', 0)
filter_count = IntegerField('FilterCount', 0)
sent_count = IntegerField('SentCount', 0)
error_count = IntegerField('ErrorCount', 0)
@register_event('templatesendjobfinish')
class TemplateSendJobFinishEvent(BaseEvent):
"""
模板消息任务完成事件
详情请参阅
https://mp.weixin.qq.com/wiki?id=mp1433751277
"""
id = IntegerField('MsgID')
event = 'templatesendjobfinish'
status = StringField('Status')