from flask import Flask, request, make_response import hashlib import time import xml.etree.ElementTree as ET import requests import threading import xml.sax.saxutils as saxutils import json app = Flask(__name__) # ====================================== # 全局配置区 # ====================================== # 与微信公众号后台“服务器配置”中填写的 Token 保持一致,用于验证微信服务器请求的合法性 TOKEN = "" # Dify AI 服务配置:基础 URL 与 API Key DIFY_API_BASE = "" # ⚠️ 替换为你的本地地址 DIFY_API_KEY = "" # ⚠️ 清空或使用本地密钥 # 会话映射字典:存储 映射 conv_map = {} # ====================================== # 1. 获取微信 access_token # ====================================== def get_access_token(): """ 说明: 向微信接口获取 access_token,此 token 用于调用“客服消息接口”发送消息。 建议在生产环境中实现缓存机制,过期后再重新拉取。 返回: access_token 字符串 异常抛出: requests 异常或微信接口返回 errcode != 0 时 """ APPID = "" # 在微信公众号后台获取APPID APPSECRET = "" # 在微信公众号后台获取APPSECRET # 拼接 URL,grant_type 固定为 client_credential url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={APPID}&secret={APPSECRET}" resp = requests.get(url, timeout=5) data = resp.json() # 简单校验:若返回包含 errcode 且不等于 0,则说明调用失败 if data.get("errcode"): raise Exception(f"获取 access_token 失败,errcode={data['errcode']},errmsg={data.get('errmsg')}") return data.get("access_token") # ====================================== # 2. 发送微信客服消息 # ====================================== def send_wechat_customer_message(openid, content): """ 说明: 通过微信“客服消息接口”主动向用户推送文本消息。仅已认证公众号且 用户在 48 小时内与公众号有过交互时,才能发送客服消息。 参数: openid (str):用户的唯一标识 content (str):要发送的纯文本内容 返回: None 异常捕获: 1. get_access_token() 失败 2. HTTP 请求失败 3. 微信接口返回 errcode != 0 """ try: # 1. 获取 access_token access_token = get_access_token() url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}" # 2. 构造消息体:发送文本 payload = { "touser": openid, "msgtype": "text", "text": { "content": content } } # 3. 发起 POST 请求 response = requests.post(url, json=payload, timeout=5) print("【客服消息】发送状态码:", response.status_code) print("【客服消息】发送结果:", response.text) # 4. 检查微信接口返回 resp_json = response.json() if resp_json.get("errcode") != 0: raise Exception(f"微信客服消息接口返回错误,errcode={resp_json['errcode']},errmsg={resp_json.get('errmsg')}") except Exception as e: # 打印错误日志,生产环境可做更完善的监控/告警 print("【客服消息】发送失败:", str(e)) # ====================================== # 3. 异步调用 Dify 并发送客服消息 # ====================================== def async_reply(user_openid, official_id, user_msg, conv_map): """ 说明: 在后台线程中调用 Dify AI 服务,获取智能回复后,通过客服消息接口主动推送给用户。 参数: user_openid (str):用户的唯一标识 official_id (str):公众号的 ID(一般不作为 Dify 调用参数) user_msg (str):用户发送的原始文本消息 conv_map (dict):保存 用于多轮对话 """ # 1. 构造 Dify 请求 payload payload = { "inputs": {}, # 此处保留,可扩展放置其他上下文数据 "query": user_msg, # 用户输入文本 "response_mode": "blocking", "user": user_openid # 把用户 ID 传给 Dify,便于多用户区分 } # 如果 conv_map 中已经存在此用户的 conversation_id,则带上以维持上下文 if user_openid in conv_map: payload["conversation_id"] = conv_map[user_openid] # 2. 构造 HTTP 请求头 headers = {"Content-Type": "application/json"} # 仅在配置了 API Key 时添加认证头 if DIFY_API_KEY: headers["Authorization"] = f"Bearer {DIFY_API_KEY}" try: # 3. 发起 Dify AI 服务调用 resp = requests.post( f"{DIFY_API_BASE}/chat-messages", headers=headers, json=payload, timeout=10 ) print("【异步调用】Dify 响应状态码:", resp.status_code) print("【异步调用】Dify 原始响应:", resp.text) # 4. 检查 HTTP 返回码 if resp.status_code != 200: raise Exception(f"Dify 返回非200状态码: {resp.status_code}") data = resp.json() # 5. 提取 answer 字段(可能含有 Unicode 编码),尝试自动解码 reply = data.get("answer", "抱歉,我无法回答这个问题。") try: # 若 reply 中有类似 "\u89c6\u89c9" 需要被解码为中文,则此方法可行 reply = json.loads(f'"{reply}"') except json.JSONDecodeError: # 若不是 Unicode 编码,就保留原样 pass # 6. 如果 Dify 返回了 conversation_id,更新 conv_map if data.get("conversation_id"): conv_map[user_openid] = data["conversation_id"] # 7. 通过客服消息接口主动推送给用户 send_wechat_customer_message(user_openid, reply) except Exception as e: # 一旦出现任何异常,就发一条"调用失败"消息给用户 error_msg = f"调用AI接口失败,请稍后再试。错误信息:{str(e)}" print("【异步调用】错误:", error_msg) send_wechat_customer_message(user_openid, error_msg) # ====================================== # 4. 微信消息接入与路由 # ====================================== @app.route('/wx', methods=['GET', 'POST']) def wechat(): # -------------------------------- # 4.1 GET 请求:微信服务器接入验证 # -------------------------------- if request.method == 'GET': data = request.args signature = data.get('signature') timestamp = data.get('timestamp') nonce = data.get('nonce') echostr = data.get('echostr') # 1. 对 timestamp、nonce、TOKEN 字典序排序,拼接后做 SHA1 tmp_list = sorted([timestamp, nonce, TOKEN]) tmp_str = ''.join(tmp_list) hash_sha1 = hashlib.sha1() hash_sha1.update(tmp_str.encode('utf-8')) digest = hash_sha1.hexdigest() # 2. 若与微信给的 signature 一致,即接入验证通过 if digest == signature: print("✅ 微信服务器验证成功") return echostr else: print("❌ 签名验证失败") return '验证失败', 403 # -------------------------------- # 4.2 POST 请求:接收并处理用户消息 # -------------------------------- elif request.method == 'POST': # 1. 解析 XML(微信服务器 POST 过来的报文内容) try: xml_data = ET.fromstring(request.data or b'') except ET.ParseError: print("❌ XML 解析失败") return 'success' # 2. 获取消息类型 msg_type = xml_data.findtext('MsgType') # 3. 如果是文本消息,则异步调用 AI 服务并立即被动回复 if msg_type == 'text': user_openid = xml_data.findtext('FromUserName') official_id = xml_data.findtext('ToUserName') user_msg = xml_data.findtext('Content', default='') print(f"📩 用户 [{user_openid}] 发送了消息: {user_msg}") # 4. 启动后台线程,不阻塞主线程 thread = threading.Thread( target=async_reply, args=(user_openid, official_id, user_msg, conv_map) ) thread.start() # 5. 构造被动回复给用户,告知“正在查询” reply_content = "正在为您查询答案,请稍等..." # 注意:使用 CDATA 包裹,避免出现 <、& 等特殊字符引起 XML 解析错误 reply_xml = f""" {int(time.time())} """.strip() response = make_response(reply_xml) response.headers['Content-Type'] = 'application/xml' return response # 6. 针对非文本类型的消息,直接返回 success,不做任何处理 else: print(f"📎 收到非文本消息类型: {msg_type}") return 'success' # ====================================== # 5. 启动 Flask Server # ====================================== if __name__ == "__main__": # host="0.0.0.0" 表示对外监听所有网卡;port=80 为 HTTP 默认端口 # debug=True 仅用于开发环境,生产环境请关掉 app.run(host="0.0.0.0", port=8010, debug=True)