From 545aea1e2c43d03459494912614116625461615a Mon Sep 17 00:00:00 2001 From: "Zhenguan.Lin" <2490670279@qq.com> Date: Thu, 5 Jun 2025 19:47:32 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8A=E5=82=B3=E6=AA=94=E6=A1=88=E5=88=B0?= =?UTF-8?q?=E3=80=8C/=E3=80=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- wechat_kefu.py | 258 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 wechat_kefu.py diff --git a/wechat_kefu.py b/wechat_kefu.py new file mode 100644 index 0000000..6317d52 --- /dev/null +++ b/wechat_kefu.py @@ -0,0 +1,258 @@ +from flask import Flask, request, make_response +import hashlib +import time +import xml.etree.ElementTree as ET +import requests +import threading +import xml.sax.saxutils as saxutils +import json + +app = Flask(__name__) + +# ====================================== +# 全局配置区 +# ====================================== + +# 与微信公众号后台“服务器配置”中填写的 Token 保持一致,用于验证微信服务器请求的合法性 +TOKEN = "" + +# Dify AI 服务配置:基础 URL 与 API Key +DIFY_API_BASE = "" # ⚠️ 替换为你的本地地址 +DIFY_API_KEY = "" # ⚠️ 清空或使用本地密钥 + +# 会话映射字典:存储 映射 +conv_map = {} + +# ====================================== +# 1. 获取微信 access_token +# ====================================== +def get_access_token(): + """ + 说明: + 向微信接口获取 access_token,此 token 用于调用“客服消息接口”发送消息。 + 建议在生产环境中实现缓存机制,过期后再重新拉取。 + 返回: + access_token 字符串 + 异常抛出: + requests 异常或微信接口返回 errcode != 0 时 + """ + APPID = "" # 在微信公众号后台获取APPID + APPSECRET = "" # 在微信公众号后台获取APPSECRET + + # 拼接 URL,grant_type 固定为 client_credential + url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={APPID}&secret={APPSECRET}" + resp = requests.get(url, timeout=5) + data = resp.json() + + # 简单校验:若返回包含 errcode 且不等于 0,则说明调用失败 + if data.get("errcode"): + raise Exception(f"获取 access_token 失败,errcode={data['errcode']},errmsg={data.get('errmsg')}") + + return data.get("access_token") + +# ====================================== +# 2. 发送微信客服消息 +# ====================================== +def send_wechat_customer_message(openid, content): + """ + 说明: + 通过微信“客服消息接口”主动向用户推送文本消息。仅已认证公众号且 + 用户在 48 小时内与公众号有过交互时,才能发送客服消息。 + + 参数: + openid (str):用户的唯一标识 + content (str):要发送的纯文本内容 + + 返回: + None + 异常捕获: + 1. get_access_token() 失败 + 2. HTTP 请求失败 + 3. 微信接口返回 errcode != 0 + """ + try: + # 1. 获取 access_token + access_token = get_access_token() + url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}" + + # 2. 构造消息体:发送文本 + payload = { + "touser": openid, + "msgtype": "text", + "text": { + "content": content + } + } + + # 3. 发起 POST 请求 + response = requests.post(url, json=payload, timeout=5) + print("【客服消息】发送状态码:", response.status_code) + print("【客服消息】发送结果:", response.text) + + # 4. 检查微信接口返回 + resp_json = response.json() + if resp_json.get("errcode") != 0: + raise Exception(f"微信客服消息接口返回错误,errcode={resp_json['errcode']},errmsg={resp_json.get('errmsg')}") + + except Exception as e: + # 打印错误日志,生产环境可做更完善的监控/告警 + print("【客服消息】发送失败:", str(e)) + +# ====================================== +# 3. 异步调用 Dify 并发送客服消息 +# ====================================== +def async_reply(user_openid, official_id, user_msg, conv_map): + """ + 说明: + 在后台线程中调用 Dify AI 服务,获取智能回复后,通过客服消息接口主动推送给用户。 + + 参数: + user_openid (str):用户的唯一标识 + official_id (str):公众号的 ID(一般不作为 Dify 调用参数) + user_msg (str):用户发送的原始文本消息 + conv_map (dict):保存 用于多轮对话 + """ + # 1. 构造 Dify 请求 payload + payload = { + "inputs": {}, # 此处保留,可扩展放置其他上下文数据 + "query": user_msg, # 用户输入文本 + "response_mode": "blocking", + "user": user_openid # 把用户 ID 传给 Dify,便于多用户区分 + } + # 如果 conv_map 中已经存在此用户的 conversation_id,则带上以维持上下文 + if user_openid in conv_map: + payload["conversation_id"] = conv_map[user_openid] + + # 2. 构造 HTTP 请求头 + headers = {"Content-Type": "application/json"} + # 仅在配置了 API Key 时添加认证头 + if DIFY_API_KEY: + headers["Authorization"] = f"Bearer {DIFY_API_KEY}" + + try: + # 3. 发起 Dify AI 服务调用 + resp = requests.post( + f"{DIFY_API_BASE}/chat-messages", + headers=headers, + json=payload, + timeout=10 + ) + print("【异步调用】Dify 响应状态码:", resp.status_code) + print("【异步调用】Dify 原始响应:", resp.text) + + # 4. 检查 HTTP 返回码 + if resp.status_code != 200: + raise Exception(f"Dify 返回非200状态码: {resp.status_code}") + + data = resp.json() + + # 5. 提取 answer 字段(可能含有 Unicode 编码),尝试自动解码 + reply = data.get("answer", "抱歉,我无法回答这个问题。") + try: + # 若 reply 中有类似 "\u89c6\u89c9" 需要被解码为中文,则此方法可行 + reply = json.loads(f'"{reply}"') + except json.JSONDecodeError: + # 若不是 Unicode 编码,就保留原样 + pass + + # 6. 如果 Dify 返回了 conversation_id,更新 conv_map + if data.get("conversation_id"): + conv_map[user_openid] = data["conversation_id"] + + # 7. 通过客服消息接口主动推送给用户 + send_wechat_customer_message(user_openid, reply) + + except Exception as e: + # 一旦出现任何异常,就发一条"调用失败"消息给用户 + error_msg = f"调用AI接口失败,请稍后再试。错误信息:{str(e)}" + print("【异步调用】错误:", error_msg) + send_wechat_customer_message(user_openid, error_msg) + +# ====================================== +# 4. 微信消息接入与路由 +# ====================================== +@app.route('/wx', methods=['GET', 'POST']) +def wechat(): + # -------------------------------- + # 4.1 GET 请求:微信服务器接入验证 + # -------------------------------- + if request.method == 'GET': + data = request.args + signature = data.get('signature') + timestamp = data.get('timestamp') + nonce = data.get('nonce') + echostr = data.get('echostr') + + # 1. 对 timestamp、nonce、TOKEN 字典序排序,拼接后做 SHA1 + tmp_list = sorted([timestamp, nonce, TOKEN]) + tmp_str = ''.join(tmp_list) + hash_sha1 = hashlib.sha1() + hash_sha1.update(tmp_str.encode('utf-8')) + digest = hash_sha1.hexdigest() + + # 2. 若与微信给的 signature 一致,即接入验证通过 + if digest == signature: + print("✅ 微信服务器验证成功") + return echostr + else: + print("❌ 签名验证失败") + return '验证失败', 403 + + # -------------------------------- + # 4.2 POST 请求:接收并处理用户消息 + # -------------------------------- + elif request.method == 'POST': + # 1. 解析 XML(微信服务器 POST 过来的报文内容) + try: + xml_data = ET.fromstring(request.data or b'') + except ET.ParseError: + print("❌ XML 解析失败") + return 'success' + + # 2. 获取消息类型 + msg_type = xml_data.findtext('MsgType') + + # 3. 如果是文本消息,则异步调用 AI 服务并立即被动回复 + if msg_type == 'text': + user_openid = xml_data.findtext('FromUserName') + official_id = xml_data.findtext('ToUserName') + user_msg = xml_data.findtext('Content', default='') + + print(f"📩 用户 [{user_openid}] 发送了消息: {user_msg}") + + # 4. 启动后台线程,不阻塞主线程 + thread = threading.Thread( + target=async_reply, + args=(user_openid, official_id, user_msg, conv_map) + ) + thread.start() + + # 5. 构造被动回复给用户,告知“正在查询” + reply_content = "正在为您查询答案,请稍等..." + # 注意:使用 CDATA 包裹,避免出现 <、& 等特殊字符引起 XML 解析错误 + reply_xml = f""" + + + + {int(time.time())} + + + +""".strip() + + response = make_response(reply_xml) + response.headers['Content-Type'] = 'application/xml' + return response + + # 6. 针对非文本类型的消息,直接返回 success,不做任何处理 + else: + print(f"📎 收到非文本消息类型: {msg_type}") + return 'success' + +# ====================================== +# 5. 启动 Flask Server +# ====================================== +if __name__ == "__main__": + # host="0.0.0.0" 表示对外监听所有网卡;port=80 为 HTTP 默认端口 + # debug=True 仅用于开发环境,生产环境请关掉 + app.run(host="0.0.0.0", port=8010, debug=True)