之前写微信适配器的时候,看到了微信官方的一个要求:
请注意:
msgid
排重。从这里我们可以看到,微信官方有个规定,是在15s之内必须给出回复,否则将无法返回消息。那么我们将LangBot接入微信公众号的时候,必须得在15s之内给出微信请求。
所以在第一次写的时候,实在不知道怎么弄出这个15s的请求,自己弄了半天,依然不理解什么叫每隔五秒就会重新请求,于是开始求助大佬@RockChin,在他的帮助下,成功设置了一个字典,能够在每次消息来的时候,通过msg_id来判断是第几次请求。
代码如下:
async def _handle_message(self, event: OAEvent):
"""
处理消息事件。
"""
message_id = event.message_id
if message_id in self.msg_id_map.keys():
self.msg_id_map[message_id] += 1
return
self.msg_id_map[message_id] = 1
msg_type = event.type
if msg_type in self._message_handlers:
for handler in self._message_handlers[msg_type]:
await handler(event)
通过map来处理请求的次数,防止在收到消息的时候造成干扰,具体的逻辑我就先不放上来了。
总之,这种方法只能解决一般的情况,但是如果用户的请求都是超过15s的呢?这样就不是一种很好的办法,会给用户造成不舒服的体验。
所以,在了解了其他优秀开源项目之后,我门发现可以以这么一种方式来实现微信公众号适配器。因为请求必须在15s之内准备好,所以我们决定将流程安排如下:
触发条件 | 处理逻辑 |
---|---|
📩 用户发送消息 | 存入 用户消息队列 ,并立即回复:
“AI 正在思考中,请回复任意内容获取回答。” |
📤 发送到 LangBot | LangBot 处理后,存入 AI 回复队列 。 |
💬 用户再次发送消息 | 从 AI 回复队列 取出消息并发送给用户,同时清理空用户队列。 |
在这种回复模式中,我们可以有效避免三种情况:
代码实现:
if oa.msg_queue.get(from_user) and oa.msg_queue[from_user][0]["content"]:
queue_top = oa.msg_queue[from_user].pop(0)
queue_content = queue_top["content"]
if user_msg_queue.get(from_user) and user_msg_queue[from_user]:
user_msg_queue[from_user].pop(0)
response_xml = xml_template.format(
to_user=from_user,
from_user=to_user,
create_time=int(time.time()),
content=queue_content
)
return response_xml
else:
response_xml = xml_template.format(
to_user=from_user,
from_user=to_user,
create_time=int(time.time()),
content="AI正在思考中,请发送任意内容获取回答。"
)
if user_msg_queue.get(from_user) and user_msg_queue[from_user][0]["content"]:
return response_xml
else:
message_data = await self.get_message(xml_msg)
if message_data:
event = OAEvent.from_payload(message_data)
if event:
user_msg_queue.setdefault(from_user, []).append(
{
"content": event.message,
}
)
await self._handle_message(event)
return response_xml
— Mar 14, 2025
Made with ❤ and at Pingyao.