上傳檔案到「/」

This commit is contained in:
Zhenguan.Lin 2025-06-05 19:47:32 +08:00
commit 545aea1e2c

258
wechat_kefu.py Normal file
View File

@ -0,0 +1,258 @@
from flask import Flask, request, make_response
import hashlib
import time
import xml.etree.ElementTree as ET
import requests
import threading
import xml.sax.saxutils as saxutils
import json
app = Flask(__name__)
# ======================================
# 全局配置区
# ======================================
# 与微信公众号后台“服务器配置”中填写的 Token 保持一致,用于验证微信服务器请求的合法性
TOKEN = ""
# Dify AI 服务配置:基础 URL 与 API Key
DIFY_API_BASE = "" # ⚠️ 替换为你的本地地址
DIFY_API_KEY = "" # ⚠️ 清空或使用本地密钥
# 会话映射字典:存储 <user_openid, Dify 返回的 conversation_id> 映射
conv_map = {}
# ======================================
# 1. 获取微信 access_token
# ======================================
def get_access_token():
"""
说明
向微信接口获取 access_token token 用于调用客服消息接口发送消息
建议在生产环境中实现缓存机制过期后再重新拉取
返回
access_token 字符串
异常抛出
requests 异常或微信接口返回 errcode != 0
"""
APPID = "" # 在微信公众号后台获取APPID
APPSECRET = "" # 在微信公众号后台获取APPSECRET
# 拼接 URLgrant_type 固定为 client_credential
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={APPID}&secret={APPSECRET}"
resp = requests.get(url, timeout=5)
data = resp.json()
# 简单校验:若返回包含 errcode 且不等于 0则说明调用失败
if data.get("errcode"):
raise Exception(f"获取 access_token 失败errcode={data['errcode']}errmsg={data.get('errmsg')}")
return data.get("access_token")
# ======================================
# 2. 发送微信客服消息
# ======================================
def send_wechat_customer_message(openid, content):
"""
说明
通过微信客服消息接口主动向用户推送文本消息仅已认证公众号且
用户在 48 小时内与公众号有过交互时才能发送客服消息
参数
openid (str)用户的唯一标识
content (str)要发送的纯文本内容
返回
None
异常捕获
1. get_access_token() 失败
2. HTTP 请求失败
3. 微信接口返回 errcode != 0
"""
try:
# 1. 获取 access_token
access_token = get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}"
# 2. 构造消息体:发送文本
payload = {
"touser": openid,
"msgtype": "text",
"text": {
"content": content
}
}
# 3. 发起 POST 请求
response = requests.post(url, json=payload, timeout=5)
print("【客服消息】发送状态码:", response.status_code)
print("【客服消息】发送结果:", response.text)
# 4. 检查微信接口返回
resp_json = response.json()
if resp_json.get("errcode") != 0:
raise Exception(f"微信客服消息接口返回错误errcode={resp_json['errcode']}errmsg={resp_json.get('errmsg')}")
except Exception as e:
# 打印错误日志,生产环境可做更完善的监控/告警
print("【客服消息】发送失败:", str(e))
# ======================================
# 3. 异步调用 Dify 并发送客服消息
# ======================================
def async_reply(user_openid, official_id, user_msg, conv_map):
"""
说明
在后台线程中调用 Dify AI 服务获取智能回复后通过客服消息接口主动推送给用户
参数
user_openid (str)用户的唯一标识
official_id (str)公众号的 ID一般不作为 Dify 调用参数
user_msg (str)用户发送的原始文本消息
conv_map (dict)保存 <user_openid, conversation_id> 用于多轮对话
"""
# 1. 构造 Dify 请求 payload
payload = {
"inputs": {}, # 此处保留,可扩展放置其他上下文数据
"query": user_msg, # 用户输入文本
"response_mode": "blocking",
"user": user_openid # 把用户 ID 传给 Dify便于多用户区分
}
# 如果 conv_map 中已经存在此用户的 conversation_id则带上以维持上下文
if user_openid in conv_map:
payload["conversation_id"] = conv_map[user_openid]
# 2. 构造 HTTP 请求头
headers = {"Content-Type": "application/json"}
# 仅在配置了 API Key 时添加认证头
if DIFY_API_KEY:
headers["Authorization"] = f"Bearer {DIFY_API_KEY}"
try:
# 3. 发起 Dify AI 服务调用
resp = requests.post(
f"{DIFY_API_BASE}/chat-messages",
headers=headers,
json=payload,
timeout=10
)
print("【异步调用】Dify 响应状态码:", resp.status_code)
print("【异步调用】Dify 原始响应:", resp.text)
# 4. 检查 HTTP 返回码
if resp.status_code != 200:
raise Exception(f"Dify 返回非200状态码: {resp.status_code}")
data = resp.json()
# 5. 提取 answer 字段(可能含有 Unicode 编码),尝试自动解码
reply = data.get("answer", "抱歉,我无法回答这个问题。")
try:
# 若 reply 中有类似 "\u89c6\u89c9" 需要被解码为中文,则此方法可行
reply = json.loads(f'"{reply}"')
except json.JSONDecodeError:
# 若不是 Unicode 编码,就保留原样
pass
# 6. 如果 Dify 返回了 conversation_id更新 conv_map
if data.get("conversation_id"):
conv_map[user_openid] = data["conversation_id"]
# 7. 通过客服消息接口主动推送给用户
send_wechat_customer_message(user_openid, reply)
except Exception as e:
# 一旦出现任何异常,就发一条"调用失败"消息给用户
error_msg = f"调用AI接口失败请稍后再试。错误信息{str(e)}"
print("【异步调用】错误:", error_msg)
send_wechat_customer_message(user_openid, error_msg)
# ======================================
# 4. 微信消息接入与路由
# ======================================
@app.route('/wx', methods=['GET', 'POST'])
def wechat():
# --------------------------------
# 4.1 GET 请求:微信服务器接入验证
# --------------------------------
if request.method == 'GET':
data = request.args
signature = data.get('signature')
timestamp = data.get('timestamp')
nonce = data.get('nonce')
echostr = data.get('echostr')
# 1. 对 timestamp、nonce、TOKEN 字典序排序,拼接后做 SHA1
tmp_list = sorted([timestamp, nonce, TOKEN])
tmp_str = ''.join(tmp_list)
hash_sha1 = hashlib.sha1()
hash_sha1.update(tmp_str.encode('utf-8'))
digest = hash_sha1.hexdigest()
# 2. 若与微信给的 signature 一致,即接入验证通过
if digest == signature:
print("✅ 微信服务器验证成功")
return echostr
else:
print("❌ 签名验证失败")
return '验证失败', 403
# --------------------------------
# 4.2 POST 请求:接收并处理用户消息
# --------------------------------
elif request.method == 'POST':
# 1. 解析 XML微信服务器 POST 过来的报文内容)
try:
xml_data = ET.fromstring(request.data or b'')
except ET.ParseError:
print("❌ XML 解析失败")
return 'success'
# 2. 获取消息类型
msg_type = xml_data.findtext('MsgType')
# 3. 如果是文本消息,则异步调用 AI 服务并立即被动回复
if msg_type == 'text':
user_openid = xml_data.findtext('FromUserName')
official_id = xml_data.findtext('ToUserName')
user_msg = xml_data.findtext('Content', default='')
print(f"📩 用户 [{user_openid}] 发送了消息: {user_msg}")
# 4. 启动后台线程,不阻塞主线程
thread = threading.Thread(
target=async_reply,
args=(user_openid, official_id, user_msg, conv_map)
)
thread.start()
# 5. 构造被动回复给用户,告知“正在查询”
reply_content = "正在为您查询答案,请稍等..."
# 注意:使用 CDATA 包裹,避免出现 <、& 等特殊字符引起 XML 解析错误
reply_xml = f"""
<xml>
<ToUserName><![CDATA[{user_openid}]]></ToUserName>
<FromUserName><![CDATA[{official_id}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{reply_content}]]></Content>
</xml>
""".strip()
response = make_response(reply_xml)
response.headers['Content-Type'] = 'application/xml'
return response
# 6. 针对非文本类型的消息,直接返回 success不做任何处理
else:
print(f"📎 收到非文本消息类型: {msg_type}")
return 'success'
# ======================================
# 5. 启动 Flask Server
# ======================================
if __name__ == "__main__":
# host="0.0.0.0" 表示对外监听所有网卡port=80 为 HTTP 默认端口
# debug=True 仅用于开发环境,生产环境请关掉
app.run(host="0.0.0.0", port=8010, debug=True)