yy-WeChatPublicNumber/wechat_kefu.py

259 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, request, make_response
import hashlib
import time
import xml.etree.ElementTree as ET
import requests
import threading
import xml.sax.saxutils as saxutils
import json
app = Flask(__name__)
# ======================================
# 全局配置区
# ======================================
# 与微信公众号后台“服务器配置”中填写的 Token 保持一致,用于验证微信服务器请求的合法性
TOKEN = ""
# Dify AI 服务配置:基础 URL 与 API Key
DIFY_API_BASE = "" # ⚠️ 替换为你的本地地址
DIFY_API_KEY = "" # ⚠️ 清空或使用本地密钥
# 会话映射字典:存储 <user_openid, Dify 返回的 conversation_id> 映射
conv_map = {}
# ======================================
# 1. 获取微信 access_token
# ======================================
def get_access_token():
"""
说明:
向微信接口获取 access_token此 token 用于调用“客服消息接口”发送消息。
建议在生产环境中实现缓存机制,过期后再重新拉取。
返回:
access_token 字符串
异常抛出:
requests 异常或微信接口返回 errcode != 0 时
"""
APPID = "" # 在微信公众号后台获取APPID
APPSECRET = "" # 在微信公众号后台获取APPSECRET
# 拼接 URLgrant_type 固定为 client_credential
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={APPID}&secret={APPSECRET}"
resp = requests.get(url, timeout=5)
data = resp.json()
# 简单校验:若返回包含 errcode 且不等于 0则说明调用失败
if data.get("errcode"):
raise Exception(f"获取 access_token 失败errcode={data['errcode']}errmsg={data.get('errmsg')}")
return data.get("access_token")
# ======================================
# 2. 发送微信客服消息
# ======================================
def send_wechat_customer_message(openid, content):
"""
说明:
通过微信“客服消息接口”主动向用户推送文本消息。仅已认证公众号且
用户在 48 小时内与公众号有过交互时,才能发送客服消息。
参数:
openid (str):用户的唯一标识
content (str):要发送的纯文本内容
返回:
None
异常捕获:
1. get_access_token() 失败
2. HTTP 请求失败
3. 微信接口返回 errcode != 0
"""
try:
# 1. 获取 access_token
access_token = get_access_token()
url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={access_token}"
# 2. 构造消息体:发送文本
payload = {
"touser": openid,
"msgtype": "text",
"text": {
"content": content
}
}
# 3. 发起 POST 请求
response = requests.post(url, json=payload, timeout=5)
print("【客服消息】发送状态码:", response.status_code)
print("【客服消息】发送结果:", response.text)
# 4. 检查微信接口返回
resp_json = response.json()
if resp_json.get("errcode") != 0:
raise Exception(f"微信客服消息接口返回错误errcode={resp_json['errcode']}errmsg={resp_json.get('errmsg')}")
except Exception as e:
# 打印错误日志,生产环境可做更完善的监控/告警
print("【客服消息】发送失败:", str(e))
# ======================================
# 3. 异步调用 Dify 并发送客服消息
# ======================================
def async_reply(user_openid, official_id, user_msg, conv_map):
"""
说明:
在后台线程中调用 Dify AI 服务,获取智能回复后,通过客服消息接口主动推送给用户。
参数:
user_openid (str):用户的唯一标识
official_id (str):公众号的 ID一般不作为 Dify 调用参数)
user_msg (str):用户发送的原始文本消息
conv_map (dict):保存 <user_openid, conversation_id> 用于多轮对话
"""
# 1. 构造 Dify 请求 payload
payload = {
"inputs": {}, # 此处保留,可扩展放置其他上下文数据
"query": user_msg, # 用户输入文本
"response_mode": "blocking",
"user": user_openid # 把用户 ID 传给 Dify便于多用户区分
}
# 如果 conv_map 中已经存在此用户的 conversation_id则带上以维持上下文
if user_openid in conv_map:
payload["conversation_id"] = conv_map[user_openid]
# 2. 构造 HTTP 请求头
headers = {"Content-Type": "application/json"}
# 仅在配置了 API Key 时添加认证头
if DIFY_API_KEY:
headers["Authorization"] = f"Bearer {DIFY_API_KEY}"
try:
# 3. 发起 Dify AI 服务调用
resp = requests.post(
f"{DIFY_API_BASE}/chat-messages",
headers=headers,
json=payload,
timeout=10
)
print("【异步调用】Dify 响应状态码:", resp.status_code)
print("【异步调用】Dify 原始响应:", resp.text)
# 4. 检查 HTTP 返回码
if resp.status_code != 200:
raise Exception(f"Dify 返回非200状态码: {resp.status_code}")
data = resp.json()
# 5. 提取 answer 字段(可能含有 Unicode 编码),尝试自动解码
reply = data.get("answer", "抱歉,我无法回答这个问题。")
try:
# 若 reply 中有类似 "\u89c6\u89c9" 需要被解码为中文,则此方法可行
reply = json.loads(f'"{reply}"')
except json.JSONDecodeError:
# 若不是 Unicode 编码,就保留原样
pass
# 6. 如果 Dify 返回了 conversation_id更新 conv_map
if data.get("conversation_id"):
conv_map[user_openid] = data["conversation_id"]
# 7. 通过客服消息接口主动推送给用户
send_wechat_customer_message(user_openid, reply)
except Exception as e:
# 一旦出现任何异常,就发一条"调用失败"消息给用户
error_msg = f"调用AI接口失败请稍后再试。错误信息{str(e)}"
print("【异步调用】错误:", error_msg)
send_wechat_customer_message(user_openid, error_msg)
# ======================================
# 4. 微信消息接入与路由
# ======================================
@app.route('/wx', methods=['GET', 'POST'])
def wechat():
# --------------------------------
# 4.1 GET 请求:微信服务器接入验证
# --------------------------------
if request.method == 'GET':
data = request.args
signature = data.get('signature')
timestamp = data.get('timestamp')
nonce = data.get('nonce')
echostr = data.get('echostr')
# 1. 对 timestamp、nonce、TOKEN 字典序排序,拼接后做 SHA1
tmp_list = sorted([timestamp, nonce, TOKEN])
tmp_str = ''.join(tmp_list)
hash_sha1 = hashlib.sha1()
hash_sha1.update(tmp_str.encode('utf-8'))
digest = hash_sha1.hexdigest()
# 2. 若与微信给的 signature 一致,即接入验证通过
if digest == signature:
print("✅ 微信服务器验证成功")
return echostr
else:
print("❌ 签名验证失败")
return '验证失败', 403
# --------------------------------
# 4.2 POST 请求:接收并处理用户消息
# --------------------------------
elif request.method == 'POST':
# 1. 解析 XML微信服务器 POST 过来的报文内容)
try:
xml_data = ET.fromstring(request.data or b'')
except ET.ParseError:
print("❌ XML 解析失败")
return 'success'
# 2. 获取消息类型
msg_type = xml_data.findtext('MsgType')
# 3. 如果是文本消息,则异步调用 AI 服务并立即被动回复
if msg_type == 'text':
user_openid = xml_data.findtext('FromUserName')
official_id = xml_data.findtext('ToUserName')
user_msg = xml_data.findtext('Content', default='')
print(f"📩 用户 [{user_openid}] 发送了消息: {user_msg}")
# 4. 启动后台线程,不阻塞主线程
thread = threading.Thread(
target=async_reply,
args=(user_openid, official_id, user_msg, conv_map)
)
thread.start()
# 5. 构造被动回复给用户,告知“正在查询”
reply_content = "正在为您查询答案,请稍等..."
# 注意:使用 CDATA 包裹,避免出现 <、& 等特殊字符引起 XML 解析错误
reply_xml = f"""
<xml>
<ToUserName><![CDATA[{user_openid}]]></ToUserName>
<FromUserName><![CDATA[{official_id}]]></FromUserName>
<CreateTime>{int(time.time())}</CreateTime>
<MsgType><![CDATA[text]]></MsgType>
<Content><![CDATA[{reply_content}]]></Content>
</xml>
""".strip()
response = make_response(reply_xml)
response.headers['Content-Type'] = 'application/xml'
return response
# 6. 针对非文本类型的消息,直接返回 success不做任何处理
else:
print(f"📎 收到非文本消息类型: {msg_type}")
return 'success'
# ======================================
# 5. 启动 Flask Server
# ======================================
if __name__ == "__main__":
# host="0.0.0.0" 表示对外监听所有网卡port=80 为 HTTP 默认端口
# debug=True 仅用于开发环境,生产环境请关掉
app.run(host="0.0.0.0", port=8010, debug=True)