🌘

Wangcham's Blog

微信公众号的适配器实现方法

之前写微信适配器的时候,看到了微信官方的一个要求:

当普通微信用户向公众账号发消息时,微信服务器将 POST 消息的 XML 数据包到开发者填写的 URL 上。

请注意:

  • 关于重试的消息排重,推荐使用 msgid 排重。
  • 微信服务器在 五秒内收不到响应会断掉连接,并重新发起请求,总共重试三次
  • 假如服务器无法保证在 5 秒内处理并回复,可以直接回复空串,微信服务器不会对此作任何处理,并且不会发起重试。详情请见 “发送消息 - 被动回复消息”
  • 如果开发者需要对用户消息在 5 秒内立即做出回应,可以使用 “发送消息 - 被动回复消息” 接口向用户被动回复消息。
  • 公众平台官网的开发者中心 处设置消息加密后,用户发来的消息和开发者回复的消息都会被加密(但通过 客服接口 API 发送的消息不受影响)。

从这里我们可以看到,微信官方有个规定,是在15s之内必须给出回复,否则将无法返回消息。那么我们将LangBot接入微信公众号的时候,必须得在15s之内给出微信请求。

所以在第一次写的时候,实在不知道怎么弄出这个15s的请求,自己弄了半天,依然不理解什么叫每隔五秒就会重新请求,于是开始求助大佬@RockChin,在他的帮助下,成功设置了一个字典,能够在每次消息来的时候,通过msg_id来判断是第几次请求。
代码如下:

async def _handle_message(self, event: OAEvent):
        """
        处理消息事件。
        """
        message_id = event.message_id
        if message_id in self.msg_id_map.keys():
            self.msg_id_map[message_id] += 1
            return

        self.msg_id_map[message_id] = 1
        msg_type = event.type
        if msg_type in self._message_handlers:
            for handler in self._message_handlers[msg_type]:
                await handler(event)

通过map来处理请求的次数,防止在收到消息的时候造成干扰,具体的逻辑我就先不放上来了。
总之,这种方法只能解决一般的情况,但是如果用户的请求都是超过15s的呢?这样就不是一种很好的办法,会给用户造成不舒服的体验。

所以,在了解了其他优秀开源项目之后,我门发现可以以这么一种方式来实现微信公众号适配器。因为请求必须在15s之内准备好,所以我们决定将流程安排如下:

触发条件 处理逻辑
📩 用户发送消息 存入 用户消息队列,并立即回复:
“AI 正在思考中,请回复任意内容获取回答。”
📤 发送到 LangBot LangBot 处理后,存入 AI 回复队列
💬 用户再次发送消息 AI 回复队列 取出消息并发送给用户,同时清理空用户队列。

在这种回复模式中,我们可以有效避免三种情况:

  • 避免15s之外的回复无法发送
  • 避免用户消息队列堵塞
  • 防止用户发的任意字符串消息被当成真正的ai回答

代码实现:

if oa.msg_queue.get(from_user) and oa.msg_queue[from_user][0]["content"]:
    queue_top = oa.msg_queue[from_user].pop(0)
    queue_content = queue_top["content"]

    if user_msg_queue.get(from_user) and user_msg_queue[from_user]:
        user_msg_queue[from_user].pop(0)

    response_xml = xml_template.format(
        to_user=from_user,
        from_user=to_user,
        create_time=int(time.time()),
        content=queue_content
    )
    return response_xml
else:
    response_xml = xml_template.format(
        to_user=from_user,
        from_user=to_user,
        create_time=int(time.time()),
        content="AI正在思考中,请发送任意内容获取回答。"
    )

    if user_msg_queue.get(from_user) and user_msg_queue[from_user][0]["content"]:
        return response_xml
    else:
        message_data = await self.get_message(xml_msg)

        if message_data:
            event = OAEvent.from_payload(message_data)
            if event:
                user_msg_queue.setdefault(from_user, []).append(
                    {
                        "content": event.message,
                    }
                )
                await self._handle_message(event)

        return response_xml

— Mar 14, 2025

Search