259 lines
9.9 KiB
Python
259 lines
9.9 KiB
Python
from flask import Flask, request, make_response
|
||
import hashlib
|
||
import time
|
||
import xml.etree.ElementTree as ET
|
||
import requests
|
||
import threading
|
||
import xml.sax.saxutils as saxutils
|
||
import json
|
||
|
||
app = Flask(__name__)
|
||
|
||
# ======================================
|
||
# 全局配置区
|
||
# ======================================
|
||
|
||
# 与微信公众号后台“服务器配置”中填写的 Token 保持一致,用于验证微信服务器请求的合法性
|
||
TOKEN = ""
|
||
|
||
# Dify AI 服务配置:基础 URL 与 API Key
|
||
DIFY_API_BASE = "" # ⚠️ 替换为你的本地地址
|
||
DIFY_API_KEY = "" # ⚠️ 清空或使用本地密钥
|
||
|
||
# 会话映射字典:存储 <user_openid, Dify 返回的 conversation_id> 映射
|
||
conv_map = {}
|
||
|
||
# ======================================
|
||
# 1. 获取微信 access_token
|
||
# ======================================
|
||
def get_access_token():
|
||
"""
|
||
说明:
|
||
向微信接口获取 access_token,此 token 用于调用“客服消息接口”发送消息。
|
||
建议在生产环境中实现缓存机制,过期后再重新拉取。
|
||
返回:
|
||
access_token 字符串
|
||
异常抛出:
|
||
requests 异常或微信接口返回 errcode != 0 时
|
||
"""
|
||
APPID = "" # 在微信公众号后台获取APPID
|
||
APPSECRET = "" # 在微信公众号后台获取APPSECRET
|
||
|
||
# 拼接 URL,grant_type 固定为 client_credential
|
||
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={APPID}&secret={APPSECRET}"
|
||
resp = requests.get(url, timeout=5)
|
||
data = resp.json()
|
||
|
||
# 简单校验:若返回包含 errcode 且不等于 0,则说明调用失败
|
||
if data.get("errcode"):
|
||
raise Exception(f"获取 access_token 失败,errcode={data['errcode']},errmsg={data.get('errmsg')}")
|
||
|
||
return data.get("access_token")
|
||
|
||
# ======================================
|
||
# 2. 发送微信客服消息
|
||
# ======================================
|
||
def send_wechat_customer_message(openid, content):
|
||
"""
|
||
说明:
|
||
通过微信“客服消息接口”主动向用户推送文本消息。仅已认证公众号且
|
||
用户在 48 小时内与公众号有过交互时,才能发送客服消息。
|
||
|
||
参数:
|
||
openid (str):用户的唯一标识
|
||
content (str):要发送的纯文本内容
|
||
|
||
返回:
|
||
None
|
||
异常捕获:
|
||
1. get_access_token() 失败
|
||
2. HTTP 请求失败
|
||
3. 微信接口返回 errcode != 0
|
||
"""
|
||
try:
|
||
# 1. 获取 access_token
|
||
access_token = get_access_token()
|
||
url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}"
|
||
|
||
# 2. 构造消息体:发送文本
|
||
payload = {
|
||
"touser": openid,
|
||
"msgtype": "text",
|
||
"text": {
|
||
"content": content
|
||
}
|
||
}
|
||
|
||
# 3. 发起 POST 请求
|
||
response = requests.post(url, json=payload, timeout=5)
|
||
print("【客服消息】发送状态码:", response.status_code)
|
||
print("【客服消息】发送结果:", response.text)
|
||
|
||
# 4. 检查微信接口返回
|
||
resp_json = response.json()
|
||
if resp_json.get("errcode") != 0:
|
||
raise Exception(f"微信客服消息接口返回错误,errcode={resp_json['errcode']},errmsg={resp_json.get('errmsg')}")
|
||
|
||
except Exception as e:
|
||
# 打印错误日志,生产环境可做更完善的监控/告警
|
||
print("【客服消息】发送失败:", str(e))
|
||
|
||
# ======================================
|
||
# 3. 异步调用 Dify 并发送客服消息
|
||
# ======================================
|
||
def async_reply(user_openid, official_id, user_msg, conv_map):
|
||
"""
|
||
说明:
|
||
在后台线程中调用 Dify AI 服务,获取智能回复后,通过客服消息接口主动推送给用户。
|
||
|
||
参数:
|
||
user_openid (str):用户的唯一标识
|
||
official_id (str):公众号的 ID(一般不作为 Dify 调用参数)
|
||
user_msg (str):用户发送的原始文本消息
|
||
conv_map (dict):保存 <user_openid, conversation_id> 用于多轮对话
|
||
"""
|
||
# 1. 构造 Dify 请求 payload
|
||
payload = {
|
||
"inputs": {}, # 此处保留,可扩展放置其他上下文数据
|
||
"query": user_msg, # 用户输入文本
|
||
"response_mode": "blocking",
|
||
"user": user_openid # 把用户 ID 传给 Dify,便于多用户区分
|
||
}
|
||
# 如果 conv_map 中已经存在此用户的 conversation_id,则带上以维持上下文
|
||
if user_openid in conv_map:
|
||
payload["conversation_id"] = conv_map[user_openid]
|
||
|
||
# 2. 构造 HTTP 请求头
|
||
headers = {"Content-Type": "application/json"}
|
||
# 仅在配置了 API Key 时添加认证头
|
||
if DIFY_API_KEY:
|
||
headers["Authorization"] = f"Bearer {DIFY_API_KEY}"
|
||
|
||
try:
|
||
# 3. 发起 Dify AI 服务调用
|
||
resp = requests.post(
|
||
f"{DIFY_API_BASE}/chat-messages",
|
||
headers=headers,
|
||
json=payload,
|
||
timeout=10
|
||
)
|
||
print("【异步调用】Dify 响应状态码:", resp.status_code)
|
||
print("【异步调用】Dify 原始响应:", resp.text)
|
||
|
||
# 4. 检查 HTTP 返回码
|
||
if resp.status_code != 200:
|
||
raise Exception(f"Dify 返回非200状态码: {resp.status_code}")
|
||
|
||
data = resp.json()
|
||
|
||
# 5. 提取 answer 字段(可能含有 Unicode 编码),尝试自动解码
|
||
reply = data.get("answer", "抱歉,我无法回答这个问题。")
|
||
try:
|
||
# 若 reply 中有类似 "\u89c6\u89c9" 需要被解码为中文,则此方法可行
|
||
reply = json.loads(f'"{reply}"')
|
||
except json.JSONDecodeError:
|
||
# 若不是 Unicode 编码,就保留原样
|
||
pass
|
||
|
||
# 6. 如果 Dify 返回了 conversation_id,更新 conv_map
|
||
if data.get("conversation_id"):
|
||
conv_map[user_openid] = data["conversation_id"]
|
||
|
||
# 7. 通过客服消息接口主动推送给用户
|
||
send_wechat_customer_message(user_openid, reply)
|
||
|
||
except Exception as e:
|
||
# 一旦出现任何异常,就发一条"调用失败"消息给用户
|
||
error_msg = f"调用AI接口失败,请稍后再试。错误信息:{str(e)}"
|
||
print("【异步调用】错误:", error_msg)
|
||
send_wechat_customer_message(user_openid, error_msg)
|
||
|
||
# ======================================
|
||
# 4. 微信消息接入与路由
|
||
# ======================================
|
||
@app.route('/wx', methods=['GET', 'POST'])
|
||
def wechat():
|
||
# --------------------------------
|
||
# 4.1 GET 请求:微信服务器接入验证
|
||
# --------------------------------
|
||
if request.method == 'GET':
|
||
data = request.args
|
||
signature = data.get('signature')
|
||
timestamp = data.get('timestamp')
|
||
nonce = data.get('nonce')
|
||
echostr = data.get('echostr')
|
||
|
||
# 1. 对 timestamp、nonce、TOKEN 字典序排序,拼接后做 SHA1
|
||
tmp_list = sorted([timestamp, nonce, TOKEN])
|
||
tmp_str = ''.join(tmp_list)
|
||
hash_sha1 = hashlib.sha1()
|
||
hash_sha1.update(tmp_str.encode('utf-8'))
|
||
digest = hash_sha1.hexdigest()
|
||
|
||
# 2. 若与微信给的 signature 一致,即接入验证通过
|
||
if digest == signature:
|
||
print("✅ 微信服务器验证成功")
|
||
return echostr
|
||
else:
|
||
print("❌ 签名验证失败")
|
||
return '验证失败', 403
|
||
|
||
# --------------------------------
|
||
# 4.2 POST 请求:接收并处理用户消息
|
||
# --------------------------------
|
||
elif request.method == 'POST':
|
||
# 1. 解析 XML(微信服务器 POST 过来的报文内容)
|
||
try:
|
||
xml_data = ET.fromstring(request.data or b'')
|
||
except ET.ParseError:
|
||
print("❌ XML 解析失败")
|
||
return 'success'
|
||
|
||
# 2. 获取消息类型
|
||
msg_type = xml_data.findtext('MsgType')
|
||
|
||
# 3. 如果是文本消息,则异步调用 AI 服务并立即被动回复
|
||
if msg_type == 'text':
|
||
user_openid = xml_data.findtext('FromUserName')
|
||
official_id = xml_data.findtext('ToUserName')
|
||
user_msg = xml_data.findtext('Content', default='')
|
||
|
||
print(f"📩 用户 [{user_openid}] 发送了消息: {user_msg}")
|
||
|
||
# 4. 启动后台线程,不阻塞主线程
|
||
thread = threading.Thread(
|
||
target=async_reply,
|
||
args=(user_openid, official_id, user_msg, conv_map)
|
||
)
|
||
thread.start()
|
||
|
||
# 5. 构造被动回复给用户,告知“正在查询”
|
||
reply_content = "正在为您查询答案,请稍等..."
|
||
# 注意:使用 CDATA 包裹,避免出现 <、& 等特殊字符引起 XML 解析错误
|
||
reply_xml = f"""
|
||
<xml>
|
||
<ToUserName><![CDATA[{user_openid}]]></ToUserName>
|
||
<FromUserName><![CDATA[{official_id}]]></FromUserName>
|
||
<CreateTime>{int(time.time())}</CreateTime>
|
||
<MsgType><![CDATA[text]]></MsgType>
|
||
<Content><![CDATA[{reply_content}]]></Content>
|
||
</xml>
|
||
""".strip()
|
||
|
||
response = make_response(reply_xml)
|
||
response.headers['Content-Type'] = 'application/xml'
|
||
return response
|
||
|
||
# 6. 针对非文本类型的消息,直接返回 success,不做任何处理
|
||
else:
|
||
print(f"📎 收到非文本消息类型: {msg_type}")
|
||
return 'success'
|
||
|
||
# ======================================
|
||
# 5. 启动 Flask Server
|
||
# ======================================
|
||
if __name__ == "__main__":
|
||
# host="0.0.0.0" 表示对外监听所有网卡;port=80 为 HTTP 默认端口
|
||
# debug=True 仅用于开发环境,生产环境请关掉
|
||
app.run(host="0.0.0.0", port=8010, debug=True)
|