Changes
This commit is contained in:
parent
cf9a422870
commit
10ed726b8f
590
wechat_bot_userfilter.py
Normal file
590
wechat_bot_userfilter.py
Normal file
@ -0,0 +1,590 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
完整的微信客服系统,支持:
|
||||
1. SQLite 存储用户信息和对话历史。
|
||||
2. 白名单机制,根据 openid 判断是否允许调用 AI。
|
||||
3. 将 Dify 返回的 conversation_id 持久化到数据库(users.preferences 字段),
|
||||
并在程序启动时加载,实现跨重启的多轮会话上下文记忆。
|
||||
4. 管理接口,用于添加/移除白名单用户。
|
||||
5. 用户可通过发送指定指令加入白名单(需验证密码)。
|
||||
6. 异步回复机制:收到用户消息后立即返回空串给微信,后台线程调用 AI 再通过客服接口发送回复。
|
||||
|
||||
请务必替换下列配置:
|
||||
WX_TOKEN、WX_APPID、WX_APPSECRET、DIFY_API_BASE、DIFY_API_KEY
|
||||
WHITELIST_PASSWORD - 用户加入白名单所需的密码
|
||||
"""
|
||||
|
||||
from flask import Flask, request, jsonify, make_response
|
||||
import sqlite3 # SQLite 驱动
|
||||
import requests # 发送 HTTP 请求
|
||||
import hashlib # 用于微信签名验证
|
||||
import time # 获取时间戳
|
||||
import threading # 异步线程
|
||||
import json # 处理 JSON
|
||||
import xml.etree.ElementTree as ET # 解析微信 XML 消息
|
||||
import re # 正则表达式用于解析指令
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
# ======================================
|
||||
# 全局配置区 —— 请务必替换成你自己的配置
|
||||
# ======================================
|
||||
WX_TOKEN = 'your_wechat_token' # 微信公众号后台“服务器配置”中的 Token
|
||||
WX_APPID = 'your_appid' # 微信公众号的 AppID
|
||||
WX_APPSECRET = 'your_appsecret' # 微信公众号的 AppSecret
|
||||
|
||||
DIFY_API_BASE = 'https://api.dify.ai/v1' # Dify API 基础 URL,例如 https://api.dify.ai/v1
|
||||
DIFY_API_KEY = 'your_dify_api_key' # Dify 平台分配给你的 API Key
|
||||
WHITELIST_PASSWORD = "your_secure_password" # 用户加入白名单所需的密码
|
||||
|
||||
DATABASE = 'wechat.db' # SQLite 数据库文件名
|
||||
|
||||
# conv_map 用于保存 <openid, conversation_id>,实现多轮对话上下文管理
|
||||
conv_map = {}
|
||||
|
||||
|
||||
# ======================================
|
||||
# 1. 数据库相关函数:初始化与增删改查
|
||||
# ======================================
|
||||
|
||||
def init_db():
|
||||
"""
|
||||
说明:
|
||||
初始化 SQLite 数据库,创建必要的表:users(用户表)和 messages(消息表)。
|
||||
如果表已存在则不会重复创建。
|
||||
表结构:
|
||||
users:
|
||||
openid TEXT PRIMARY KEY -- 用户唯一标识(微信 openid)
|
||||
whitelisted INTEGER DEFAULT 0 -- 0 表示未授权用户,1 表示白名单用户
|
||||
preferences TEXT -- 存放用户个性化设置,可为 JSON 字符串,包含 conversation_id 等
|
||||
messages:
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT -- 消息自增 ID
|
||||
openid TEXT -- 关联用户 openid
|
||||
role TEXT -- 'user'(用户消息)或 'assistant'(AI 回复)
|
||||
content TEXT -- 消息内容
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP -- 消息记录时间
|
||||
"""
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 创建 users 表
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
openid TEXT PRIMARY KEY,
|
||||
whitelisted INTEGER DEFAULT 0,
|
||||
preferences TEXT
|
||||
)
|
||||
''')
|
||||
|
||||
# 创建 messages 表
|
||||
cursor.execute('''
|
||||
CREATE TABLE IF NOT EXISTS messages (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
openid TEXT,
|
||||
role TEXT,
|
||||
content TEXT,
|
||||
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
''')
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def load_conversations_from_db():
|
||||
"""
|
||||
说明:
|
||||
服务启动后调用此函数,从 users 表的 preferences 字段中加载所有保存的 conversation_id
|
||||
并填充到内存 conv_map 中,实现跨重启的会话上下文恢复。
|
||||
"""
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT openid, preferences FROM users WHERE preferences IS NOT NULL")
|
||||
for openid, prefs_json in cursor.fetchall():
|
||||
try:
|
||||
prefs = json.loads(prefs_json)
|
||||
if 'conversation_id' in prefs:
|
||||
conv_map[openid] = prefs['conversation_id']
|
||||
except Exception:
|
||||
# 如果解析失败,跳过该条记录
|
||||
pass
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_user(openid):
|
||||
"""
|
||||
根据 openid 从 users 表查询用户记录。
|
||||
返回:若存在返回元组 (openid, whitelisted, preferences),否则返回 None。
|
||||
"""
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT openid, whitelisted, preferences FROM users WHERE openid=?", (openid,))
|
||||
user = cursor.fetchone()
|
||||
conn.close()
|
||||
return user
|
||||
|
||||
|
||||
def add_user(openid, whitelisted=0, preferences=None):
|
||||
"""
|
||||
如果用户不存在,则插入新记录;若已存在,则忽略插入。
|
||||
参数:
|
||||
openid (str) 用户 openid
|
||||
whitelisted (int) 0 或 1,默认为 0
|
||||
preferences (dict/None) 用户偏好设置,会自动转换成 JSON 字符串
|
||||
"""
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
prefs_json = json.dumps(preferences) if preferences else None
|
||||
cursor.execute(
|
||||
"INSERT OR IGNORE INTO users (openid, whitelisted, preferences) VALUES (?, ?, ?)",
|
||||
(openid, whitelisted, prefs_json)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def set_user_whitelist(openid, status):
|
||||
"""
|
||||
设置用户是否在白名单中。如果用户不存在则先插入,再设置状态。
|
||||
参数:
|
||||
openid (str) 用户 openid
|
||||
status (bool) True 表示加入白名单,False 表示移出白名单
|
||||
"""
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 检查用户是否已存在
|
||||
cursor.execute("SELECT openid FROM users WHERE openid=?", (openid,))
|
||||
exists = cursor.fetchone()
|
||||
if exists:
|
||||
# 更新已存在用户的 whitelisted 字段
|
||||
cursor.execute("UPDATE users SET whitelisted=? WHERE openid=?", (1 if status else 0, openid))
|
||||
else:
|
||||
# 用户不存在时先插入,然后设置 whitelisted
|
||||
cursor.execute("INSERT INTO users (openid, whitelisted) VALUES (?, ?)", (openid, 1 if status else 0))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
return True
|
||||
|
||||
|
||||
def update_user_preferences(openid, new_prefs):
|
||||
"""
|
||||
更新用户的 preferences 字段,将 new_prefs(dict)转换为 JSON 存储。
|
||||
参数:
|
||||
openid (str) 用户 openid
|
||||
new_prefs (dict) 需要存储的偏好设置字典(例如包含 conversation_id 键)
|
||||
"""
|
||||
prefs_json = json.dumps(new_prefs, ensure_ascii=False)
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
# 如果用户不存在,则先插入一条空记录,再更新 preferences
|
||||
cursor.execute("INSERT OR IGNORE INTO users (openid, preferences) VALUES (?, NULL)", (openid,))
|
||||
cursor.execute("UPDATE users SET preferences=? WHERE openid=?", (prefs_json, openid))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def log_message(openid, role, content):
|
||||
"""
|
||||
在 messages 表中插入一条消息记录,保存用户对话历史。
|
||||
参数:
|
||||
openid (str) 用户 openid
|
||||
role (str) 'user' 或 'assistant'
|
||||
content (str) 消息内容
|
||||
"""
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"INSERT INTO messages (openid, role, content) VALUES (?, ?, ?)",
|
||||
(openid, role, content)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
# 初始化数据库并加载会话上下文
|
||||
init_db()
|
||||
load_conversations_from_db()
|
||||
|
||||
|
||||
# ======================================
|
||||
# 2. 微信签名与接口调用相关函数
|
||||
# ======================================
|
||||
|
||||
def verify_signature(token, signature, timestamp, nonce):
|
||||
"""
|
||||
验证来自微信服务器的签名是否合法(用于 GET 请求接入验证)。
|
||||
参数:
|
||||
token (str) 公众号后台配置的 Token
|
||||
signature (str) 微信服务器传来的签名
|
||||
timestamp (str) 时间戳
|
||||
nonce (str) 随机数
|
||||
返回:
|
||||
True: 验证通过;False: 验证失败
|
||||
"""
|
||||
tmp_list = [token, timestamp, nonce]
|
||||
tmp_list.sort()
|
||||
tmp_str = ''.join(tmp_list)
|
||||
sha1 = hashlib.sha1(tmp_str.encode('utf-8')).hexdigest()
|
||||
return sha1 == signature
|
||||
|
||||
|
||||
# 全局缓存微信 access_token,减少重复向微信服务器拉取
|
||||
wechat_token_cache = {
|
||||
'access_token': None,
|
||||
'expires_at': 0 # token 到期时间戳
|
||||
}
|
||||
|
||||
|
||||
def get_access_token():
|
||||
"""
|
||||
获取微信公众号全局接口调用凭证 access_token,并缓存起来。
|
||||
如果缓存中的 token 未过期则直接返回,否则重新向微信服务器拉取。
|
||||
返回:access_token (str);若获取失败则返回 None。
|
||||
"""
|
||||
# 如果缓存中有且未过期就直接返回
|
||||
if wechat_token_cache['access_token'] and wechat_token_cache['expires_at'] > time.time():
|
||||
return wechat_token_cache['access_token']
|
||||
|
||||
# 向微信接口拉取新的 access_token
|
||||
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={WX_APPID}&secret={WX_APPSECRET}"
|
||||
try:
|
||||
response = requests.get(url, timeout=5)
|
||||
res_json = response.json()
|
||||
token = res_json.get("access_token")
|
||||
expires_in = res_json.get("expires_in", 0)
|
||||
if token:
|
||||
# 提前 60 秒失效,避免临界情况
|
||||
wechat_token_cache['access_token'] = token
|
||||
wechat_token_cache['expires_at'] = time.time() + int(expires_in) - 60
|
||||
return token
|
||||
else:
|
||||
print("Failed to get access_token:", res_json)
|
||||
return None
|
||||
except Exception as e:
|
||||
print("Exception in get_access_token:", str(e))
|
||||
return None
|
||||
|
||||
|
||||
def send_text_message_to_user(openid, text):
|
||||
"""
|
||||
通过微信客服消息接口向指定用户发送文本消息。
|
||||
参数:
|
||||
openid (str) 用户 openid
|
||||
text (str) 要发送的文本内容
|
||||
返回:
|
||||
True: 发送成功;False: 发送失败
|
||||
"""
|
||||
token = get_access_token()
|
||||
if not token:
|
||||
print("Cannot send message: access_token is None")
|
||||
return False
|
||||
|
||||
url = f"https://api.weixin.qq.com/cgi-bin/message/custom/send?access_token={token}"
|
||||
payload = {
|
||||
"touser": openid,
|
||||
"msgtype": "text",
|
||||
"text": {"content": text}
|
||||
}
|
||||
try:
|
||||
response = requests.post(url, json=payload, timeout=5)
|
||||
res_json = response.json()
|
||||
if res_json.get("errcode") == 0:
|
||||
return True
|
||||
else:
|
||||
print("send_text_message_to_user failed:", res_json)
|
||||
return False
|
||||
except Exception as e:
|
||||
print("Exception in send_text_message_to_user:", str(e))
|
||||
return False
|
||||
|
||||
|
||||
# ======================================
|
||||
# 3. AI 回复获取函数:get_ai_reply(含持久化 conversation_id)
|
||||
# ======================================
|
||||
|
||||
def get_ai_reply(user_openid, user_message):
|
||||
"""
|
||||
调用 Dify 平台的 Chat API,获取 AI 回复并维护 conversation_id,实现跨进程/跨重启的上下文关联。
|
||||
参数:
|
||||
user_openid (str) 用户 openid,用作 user 字段传给 Dify,以实现多用户区分
|
||||
user_message (str) 用户发送的消息文本
|
||||
返回:
|
||||
reply (str) AI 对用户消息的回复文本;如果出错,则返回提示文本
|
||||
"""
|
||||
# 1. 构造 Dify 请求 payload
|
||||
payload = {
|
||||
"inputs": {}, # 保留字段,可放入其他上下文信息
|
||||
"query": user_message, # 用户输入文本
|
||||
"response_mode": "blocking", # 阻塞式返回
|
||||
"user": user_openid # 将 openid 传给 Dify,用于区分多用户
|
||||
}
|
||||
|
||||
# 2. 如果 conv_map 中已有 conversation_id,则带上以实现上下文关联
|
||||
if user_openid in conv_map:
|
||||
payload["conversation_id"] = conv_map[user_openid]
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
# 如果配置了 DIFY_API_KEY,则在请求头中添加 Authorization
|
||||
if DIFY_API_KEY:
|
||||
headers["Authorization"] = f"Bearer {DIFY_API_KEY}"
|
||||
|
||||
try:
|
||||
# 3. 调用 Dify Chat 接口
|
||||
resp = requests.post(
|
||||
f"{DIFY_API_BASE}/chat-messages",
|
||||
headers=headers,
|
||||
json=payload,
|
||||
timeout=10
|
||||
)
|
||||
# 打印调试信息
|
||||
print("Dify API status code:", resp.status_code)
|
||||
print("Dify API response:", resp.text)
|
||||
|
||||
# 4. 非200状态码视为失败
|
||||
if resp.status_code != 200:
|
||||
return "AI 服务暂时不可用,请稍后再试。"
|
||||
|
||||
res_json = resp.json()
|
||||
# 5. 提取 answer 字段。如果没有就给出默认错误提示
|
||||
reply = res_json.get("answer", "抱歉,我暂时无法回答您的问题。")
|
||||
|
||||
# 6. Dify 返回的 answer 可能带有 Unicode 转义(如 "\\u4f60\\u597d"),
|
||||
# 下面尝试用 json.loads 解码成真实的中文
|
||||
try:
|
||||
reply = json.loads(f'"{reply}"')
|
||||
except Exception:
|
||||
pass # 如果解析失败,就保留原样
|
||||
|
||||
# 7. 如果 Dify 返回了新的 conversation_id,则更新 conv_map,并持久化到 users.preferences
|
||||
new_cid = res_json.get("conversation_id")
|
||||
if new_cid:
|
||||
# 更新内存字典
|
||||
conv_map[user_openid] = new_cid
|
||||
|
||||
# 持久化到数据库:先读取旧的 preferences,再合并更新 conversation_id
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("SELECT preferences FROM users WHERE openid=?", (user_openid,))
|
||||
row = cursor.fetchone()
|
||||
try:
|
||||
old_prefs = json.loads(row[0]) if row and row[0] else {}
|
||||
except Exception:
|
||||
old_prefs = {}
|
||||
old_prefs['conversation_id'] = new_cid
|
||||
# 更新用户的 preferences 字段
|
||||
cursor.execute("UPDATE users SET preferences=? WHERE openid=?",
|
||||
(json.dumps(old_prefs, ensure_ascii=False), user_openid))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
return reply
|
||||
|
||||
except Exception as e:
|
||||
print("Exception in get_ai_reply:", str(e))
|
||||
return f"AI 接口调用失败,请稍后再试。错误信息:{str(e)}"
|
||||
|
||||
|
||||
# ======================================
|
||||
# 4. 后台线程:处理用户消息并发送 AI 回复
|
||||
# ======================================
|
||||
|
||||
def process_and_reply(user_openid, user_message):
|
||||
"""
|
||||
在线程中调用 get_ai_reply 获取 AI 回复,然后:
|
||||
1. 将用户消息及 AI 回复记录到数据库(messages 表)。
|
||||
2. 通过微信客服消息接口发送回复给用户。
|
||||
参数:
|
||||
user_openid (str) 用户 openid
|
||||
user_message (str) 用户发送的消息内容
|
||||
"""
|
||||
# 1. 获取 AI 回复
|
||||
reply = get_ai_reply(user_openid, user_message)
|
||||
# 2. 将 AI 回复写入数据库(role='assistant')
|
||||
log_message(user_openid, "assistant", reply)
|
||||
# 3. 通过客服接口发送给用户
|
||||
send_text_message_to_user(user_openid, reply)
|
||||
|
||||
|
||||
# ======================================
|
||||
# 5. 微信消息接入与路由
|
||||
# ======================================
|
||||
|
||||
def handle_whitelist_command(openid, content):
|
||||
"""
|
||||
处理用户加入白名单的指令
|
||||
支持的指令格式:
|
||||
"加入白名单"
|
||||
"加入白名单 密码"
|
||||
"""
|
||||
# 检查是否包含密码
|
||||
password_match = re.search(r'加入白名单\s+(\S+)', content)
|
||||
|
||||
if password_match:
|
||||
# 指令格式:加入白名单 + 密码
|
||||
password = password_match.group(1)
|
||||
if password == WHITELIST_PASSWORD:
|
||||
# 密码正确,加入白名单
|
||||
set_user_whitelist(openid, True)
|
||||
return "✅ 您已成功加入白名单!现在可以使用AI服务了。"
|
||||
else:
|
||||
# 密码错误
|
||||
return "❌ 密码不正确,请确认后重试。"
|
||||
else:
|
||||
# 指令格式:加入白名单(无密码)
|
||||
if WHITELIST_PASSWORD == "":
|
||||
# 未设置密码,直接加入
|
||||
set_user_whitelist(openid, True)
|
||||
return "✅ 您已成功加入白名单!现在可以使用AI服务了。"
|
||||
else:
|
||||
# 需要密码但未提供
|
||||
return "⚠️ 请提供密码:加入白名单 [密码]"
|
||||
|
||||
|
||||
@app.route('/wx', methods=['GET', 'POST'])
|
||||
def wechat():
|
||||
if request.method == 'GET':
|
||||
# --------------------------------
|
||||
# 微信服务器接入验证
|
||||
# --------------------------------
|
||||
args = request.args
|
||||
signature = args.get("signature", "")
|
||||
timestamp = args.get("timestamp", "")
|
||||
nonce = args.get("nonce", "")
|
||||
echostr = args.get("echostr", "")
|
||||
|
||||
# 验证签名,若通过返回 echostr,否则返回 400 错误
|
||||
if verify_signature(WX_TOKEN, signature, timestamp, nonce):
|
||||
return echostr
|
||||
else:
|
||||
return "Invalid signature", 400
|
||||
|
||||
else:
|
||||
# --------------------------------
|
||||
# 处理微信服务器 POST 过来的消息
|
||||
# --------------------------------
|
||||
xml_data = request.data # 获取原始 XML
|
||||
try:
|
||||
xml_root = ET.fromstring(xml_data)
|
||||
except ET.ParseError:
|
||||
# 解析失败则直接返回空串,微信服务器不会重试
|
||||
return ""
|
||||
|
||||
# 提取常用字段
|
||||
to_user = xml_root.findtext('ToUserName') # 公众号原始 ID
|
||||
from_user = xml_root.findtext('FromUserName') # 用户 openid
|
||||
msg_type = xml_root.findtext('MsgType') # 消息类型
|
||||
|
||||
# 仅处理文本消息 (text),其它消息类型直接返回空
|
||||
if msg_type != 'text':
|
||||
return ""
|
||||
|
||||
content = xml_root.findtext('Content', default='').strip()
|
||||
user_openid = from_user
|
||||
|
||||
# 确保该用户已存在于数据库,若不存在则插入新用户(默认不在白名单,preferences 为空)
|
||||
if not get_user(user_openid):
|
||||
add_user(user_openid)
|
||||
|
||||
# 将用户发送的消息记录到 messages 表(role='user')
|
||||
log_message(user_openid, "user", content)
|
||||
|
||||
# 检查用户是否在白名单
|
||||
user_record = get_user(user_openid)
|
||||
whitelisted = (user_record[1] == 1) # user_record = (openid, whitelisted, preferences)
|
||||
|
||||
# 处理加入白名单指令(无论用户是否在白名单,都可以发送此指令)
|
||||
if content.startswith("加入白名单"):
|
||||
reply_text = handle_whitelist_command(user_openid, content)
|
||||
# 将回复也写入数据库(role='assistant')
|
||||
log_message(user_openid, "assistant", reply_text)
|
||||
# 构造标准的被动回复 XML
|
||||
reply_xml = f"""
|
||||
<xml>
|
||||
<ToUserName><![CDATA[{user_openid}]]></ToUserName>
|
||||
<FromUserName><![CDATA[{to_user}]]></FromUserName>
|
||||
<CreateTime>{int(time.time())}</CreateTime>
|
||||
<MsgType><![CDATA[text]]></MsgType>
|
||||
<Content><![CDATA[{reply_text}]]></Content>
|
||||
</xml>
|
||||
""".strip()
|
||||
response = make_response(reply_xml)
|
||||
response.headers['Content-Type'] = 'application/xml'
|
||||
return response
|
||||
|
||||
if not whitelisted:
|
||||
# 如果不在白名单中,不调用 AI,直接同步回复提示
|
||||
reply_text = "⚠️ 您不在白名单中,无法使用AI服务。\n\n请发送【加入白名单】指令申请加入(如需密码请提供)。"
|
||||
# 将回复也写入数据库(role='assistant')
|
||||
log_message(user_openid, "assistant", reply_text)
|
||||
# 构造标准的被动回复 XML
|
||||
reply_xml = f"""
|
||||
<xml>
|
||||
<ToUserName><![CDATA[{user_openid}]]></ToUserName>
|
||||
<FromUserName><![CDATA[{to_user}]]></FromUserName>
|
||||
<CreateTime>{int(time.time())}</CreateTime>
|
||||
<MsgType><![CDATA[text]]></MsgType>
|
||||
<Content><![CDATA[{reply_text}]]></Content>
|
||||
</xml>
|
||||
""".strip()
|
||||
response = make_response(reply_xml)
|
||||
response.headers['Content-Type'] = 'application/xml'
|
||||
return response
|
||||
|
||||
else:
|
||||
# 白名单用户:异步调用 AI,并通过客服消息接口发送回复
|
||||
# 使用守护线程,避免阻塞主线程
|
||||
thread = threading.Thread(target=process_and_reply, args=(user_openid, content))
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
# 主线程即时返回空串给微信服务器,表示后续通过客服接口回复
|
||||
return ""
|
||||
|
||||
|
||||
# ======================================
|
||||
# 6. 管理接口:添加/移除白名单用户
|
||||
# ======================================
|
||||
|
||||
@app.route('/admin/whitelist/add', methods=['GET', 'POST'])
|
||||
def add_whitelist():
|
||||
"""
|
||||
管理接口:将指定 openid 的用户加入白名单。
|
||||
请求方式:
|
||||
GET: /admin/whitelist/add?openid=XXX
|
||||
POST: /admin/whitelist/add (form-data 或 JSON 中包含 {"openid": "XXX"})
|
||||
返回:JSON 格式 {"success": True/False, "message": "提示内容"}
|
||||
"""
|
||||
# 从 URL 查询参数或 POST 表单中获取 openid
|
||||
openid = request.args.get("openid") or request.form.get("openid")
|
||||
if not openid:
|
||||
return jsonify({"success": False, "message": "参数 openid 不能为空"}), 400
|
||||
|
||||
set_user_whitelist(openid, True)
|
||||
return jsonify({"success": True, "message": f"User {openid} 已加入白名单"})
|
||||
|
||||
|
||||
@app.route('/admin/whitelist/remove', methods=['GET', 'POST'])
|
||||
def remove_whitelist():
|
||||
"""
|
||||
管理接口:将指定 openid 的用户从白名单中移除。
|
||||
请求方式:
|
||||
GET: /admin/whitelist/remove?openid=XXX
|
||||
POST: /admin/whitelist/remove (form-data 或 JSON 中包含 {"openid": "XXX"})
|
||||
返回:JSON 格式 {"success": True/False, "message": "提示内容"}
|
||||
"""
|
||||
openid = request.args.get("openid") or request.form.get("openid")
|
||||
if not openid:
|
||||
return jsonify({"success": False, "message": "参数 openid 不能为空"}), 400
|
||||
|
||||
set_user_whitelist(openid, False)
|
||||
return jsonify({"success": True, "message": f"User {openid} 已移出白名单"})
|
||||
|
||||
|
||||
# ======================================
|
||||
# 7. 应用启动
|
||||
# ======================================
|
||||
|
||||
if __name__ == '__main__':
|
||||
# debug=True 仅用于开发调试,生产环境请 False 并使用 WSGI 部署
|
||||
app.run(host="0.0.0.0", port=8010, debug=True)
|
301
wechat_bot_userfilter技术文档.md
Normal file
301
wechat_bot_userfilter技术文档.md
Normal file
@ -0,0 +1,301 @@
|
||||
## 概述
|
||||
本系统是一个基于微信公众号的智能客服解决方案,集成了Dify AI平台,提供自然语言处理能力。系统采用白名单机制管理用户权限,支持多轮对话上下文记忆,并实现异步消息处理机制以优化用户体验。
|
||||
|
||||
## 核心功能
|
||||
|
||||
1. **用户管理与权限控制**
|
||||
- 基于openid的白名单机制
|
||||
- 用户自助加入白名单功能(可选密码保护)
|
||||
- 管理员手动管理白名单
|
||||
|
||||
2. **对话管理**
|
||||
- 完整的对话历史记录
|
||||
- 跨重启的多轮会话上下文记忆
|
||||
- 基于Dify AI的智能回复
|
||||
|
||||
3. **系统架构**
|
||||
- Flask Web框架
|
||||
- SQLite数据库存储
|
||||
- 多线程异步处理
|
||||
- 微信API集成
|
||||
|
||||
## 配置说明
|
||||
|
||||
### 必需配置项(全局配置区)
|
||||
|
||||
```python
|
||||
# 微信相关配置
|
||||
WX_TOKEN = 'your_wechat_token' # 微信公众号后台"服务器配置"中的Token
|
||||
WX_APPID = 'your_appid' # 微信公众号的AppID
|
||||
WX_APPSECRET = 'your_appsecret' # 微信公众号的AppSecret
|
||||
|
||||
# Dify AI平台配置
|
||||
DIFY_API_BASE = 'https://api.dify.ai/v1' # Dify API基础URL
|
||||
DIFY_API_KEY = 'your_dify_api_key' # Dify平台分配的API Key
|
||||
|
||||
# 白名单安全配置
|
||||
WHITELIST_PASSWORD = "your_secure_password" # 用户加入白名单所需的密码
|
||||
|
||||
# 数据库配置
|
||||
DATABASE = 'wechat.db' # SQLite数据库文件名
|
||||
```
|
||||
|
||||
### 配置注意事项
|
||||
1. 所有配置项**必须**在部署前替换为实际值
|
||||
2. `WHITELIST_PASSWORD` 应设置为强密码(建议12位以上,包含大小写字母、数字和特殊字符)
|
||||
3. 生产环境应使用HTTPS协议保证通信安全
|
||||
4. 建议定期轮换API密钥和密码
|
||||
|
||||
## 数据库设计
|
||||
|
||||
### users表结构
|
||||
| 字段名 | 类型 | 说明 |
|
||||
|-------------|---------|-------------------------------|
|
||||
| openid | TEXT | 用户唯一标识(主键) |
|
||||
| whitelisted | INTEGER | 白名单状态(0=否,1=是) |
|
||||
| preferences | TEXT | JSON格式的用户偏好设置 |
|
||||
|
||||
### messages表结构
|
||||
| 字段名 | 类型 | 说明 |
|
||||
|------------|---------|-------------------------------|
|
||||
| id | INTEGER | 自增主键 |
|
||||
| openid | TEXT | 关联用户openid |
|
||||
| role | TEXT | 消息角色(user/assistant) |
|
||||
| content | TEXT | 消息内容 |
|
||||
| timestamp | DATETIME| 消息时间(默认当前时间) |
|
||||
|
||||
## 白名单操作指南
|
||||
|
||||
### 用户自助加入白名单
|
||||
|
||||
#### 1. 无密码加入(当WHITELIST_PASSWORD为空时)
|
||||
- 用户发送消息:`加入白名单`
|
||||
- 系统回复:`✅ 您已成功加入白名单!现在可以使用AI服务了。`
|
||||
|
||||
#### 2. 密码验证加入
|
||||
- 用户发送消息:`加入白名单 密码内容`
|
||||
- 示例:`加入白名单 MySecurePass123!`
|
||||
- 密码正确时回复:`✅ 您已成功加入白名单!现在可以使用AI服务了。`
|
||||
- 密码错误时回复:`❌ 密码不正确,请确认后重试。`
|
||||
|
||||
### 管理员管理白名单
|
||||
|
||||
#### 1. 添加用户到白名单
|
||||
**请求方式**:
|
||||
- GET: `/admin/whitelist/add?openid=用户OPENID`
|
||||
- POST: `/admin/whitelist/add` (表单或JSON包含openid参数)
|
||||
|
||||
**示例**:
|
||||
```bash
|
||||
# GET请求
|
||||
curl "http://yourserver:8010/admin/whitelist/add?openid=o123456789"
|
||||
|
||||
# POST请求
|
||||
curl -X POST -d "openid=o123456789" http://yourserver:8010/admin/whitelist/add
|
||||
```
|
||||
|
||||
**成功响应**:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"message": "User o123456789 已加入白名单"
|
||||
}
|
||||
```
|
||||
|
||||
#### 2. 从白名单移除用户
|
||||
**请求方式**:
|
||||
- GET: `/admin/whitelist/remove?openid=用户OPENID`
|
||||
- POST: `/admin/whitelist/remove` (表单或JSON包含openid参数)
|
||||
|
||||
**示例**:
|
||||
```bash
|
||||
# GET请求
|
||||
curl "http://yourserver:8010/admin/whitelist/remove?openid=o123456789"
|
||||
|
||||
# POST请求
|
||||
curl -X POST -d "openid=o123456789" http://yourserver:8010/admin/whitelist/remove
|
||||
```
|
||||
|
||||
**成功响应**:
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"message": "User o123456789 已移出白名单"
|
||||
}
|
||||
```
|
||||
|
||||
### 获取用户openid的方法
|
||||
1. 让用户向公众号发送任意消息
|
||||
2. 查看数据库users表:
|
||||
```sql
|
||||
SELECT openid FROM users;
|
||||
```
|
||||
3. 查看日志文件中的用户消息记录
|
||||
4. 在微信开发者工具中调试获取
|
||||
|
||||
## 系统工作流程
|
||||
|
||||
### 新用户首次交互
|
||||
1. 用户发送消息到公众号
|
||||
2. 系统检查用户不在白名单
|
||||
3. 同步回复提示信息:
|
||||
```
|
||||
⚠️ 您不在白名单中,无法使用AI服务。
|
||||
|
||||
请发送【加入白名单】指令申请加入(如需密码请提供)。
|
||||
```
|
||||
4. 用户发送加入指令
|
||||
5. 系统验证并加入白名单
|
||||
6. 用户后续消息正常获得AI回复
|
||||
|
||||
### 白名单用户消息处理
|
||||
1. 用户发送消息
|
||||
2. 系统确认白名单状态
|
||||
3. 主线程立即返回空响应
|
||||
4. 后台线程处理:
|
||||
- 调用Dify API获取AI回复
|
||||
- 保存对话记录到数据库
|
||||
- 通过微信客服接口发送回复
|
||||
5. 用户收到AI回复
|
||||
|
||||
## 部署与运行
|
||||
|
||||
### 运行要求
|
||||
- Python 3.7+
|
||||
- 依赖库:`flask`, `requests`, `sqlite3`
|
||||
|
||||
### 安装依赖
|
||||
```bash
|
||||
pip install flask requests
|
||||
```
|
||||
|
||||
### 启动服务
|
||||
```bash
|
||||
python app.py
|
||||
```
|
||||
|
||||
### 生产环境部署建议
|
||||
1. 使用WSGI服务器(如Gunicorn):
|
||||
```bash
|
||||
gunicorn -w 4 -b 0.0.0.0:8010 app:app
|
||||
```
|
||||
2. 配置Nginx反向代理
|
||||
3. 设置HTTPS加密
|
||||
4. 使用进程管理工具(如systemd)确保服务持续运行
|
||||
5. 定期备份数据库文件(wechat.db)
|
||||
|
||||
|
||||
|
||||
### 常见问题及解决方法
|
||||
|
||||
1. **用户无法加入白名单**
|
||||
- 检查`WHITELIST_PASSWORD`配置是否正确
|
||||
- 验证数据库写入权限
|
||||
- 查看应用日志中的错误信息
|
||||
|
||||
2. **AI服务无响应**
|
||||
- 检查Dify API密钥和URL配置
|
||||
- 验证网络连接(能否访问Dify API)
|
||||
- 查看Dify平台状态
|
||||
|
||||
3. **微信消息无法接收**
|
||||
- 验证`WX_TOKEN`、`WX_APPID`、`WX_APPSECRET`配置
|
||||
- 检查服务器网络配置(80/443端口开放)
|
||||
- 确认微信公众号服务器配置正确
|
||||
|
||||
4. **管理接口无响应**
|
||||
- 检查防火墙设置(8010端口开放)
|
||||
- 验证服务是否正常运行
|
||||
- 确认openid格式正确
|
||||
|
||||
### 日志分析
|
||||
系统关键操作都会在控制台输出日志,包括:
|
||||
- 微信签名验证结果
|
||||
- 数据库操作状态
|
||||
- Dify API调用详情
|
||||
- 白名单变更记录
|
||||
|
||||
## 系统优化建议
|
||||
|
||||
1. **数据库优化**
|
||||
- 定期归档历史消息
|
||||
- 添加索引优化查询性能
|
||||
- 实现数据库连接池
|
||||
|
||||
2. **安全性增强**
|
||||
- 实现IP白名单限制
|
||||
- 添加请求频率限制
|
||||
- 敏感操作审计日志
|
||||
|
||||
3. **功能扩展**
|
||||
- 添加多客服支持
|
||||
- 实现对话记录导出
|
||||
- 集成更多AI平台
|
||||
- 添加用户反馈机制
|
||||
|
||||
## 注意事项
|
||||
|
||||
1. **微信API限制**
|
||||
- 客服消息接口有频率限制(最多5条/秒)
|
||||
- 48小时内需回复用户消息
|
||||
- access_token有效期7200秒,需缓存复用
|
||||
|
||||
2. **数据隐私**
|
||||
- 用户对话记录包含敏感信息,需加密存储
|
||||
- 遵守GDPR和当地数据保护法规
|
||||
- 定期清理不必要的数据
|
||||
|
||||
3. **性能考虑**
|
||||
- 单实例适合中小规模使用
|
||||
- 大规模部署需改用MySQL/PostgreSQL
|
||||
- 考虑添加Redis缓存提升性能
|
||||
|
||||
4. **备份策略**
|
||||
- 每日自动备份数据库
|
||||
- 保留最近7天的备份
|
||||
- 定期验证备份可恢复性
|
||||
|
||||
## 附录
|
||||
|
||||
### 微信消息XML格式示例
|
||||
```xml
|
||||
<xml>
|
||||
<ToUserName><![CDATA[公众号ID]]></ToUserName>
|
||||
<FromUserName><![CDATA[用户openid]]></FromUserName>
|
||||
<CreateTime>1678901234</CreateTime>
|
||||
<MsgType><![CDATA[text]]></MsgType>
|
||||
<Content><![CDATA[测试消息]]></Content>
|
||||
<MsgId>1234567890123456</MsgId>
|
||||
</xml>
|
||||
```
|
||||
|
||||
### Dify API响应示例
|
||||
```json
|
||||
{
|
||||
"conversation_id": "12345678-90ab-cdef-ghij-klmnopqrstuv",
|
||||
"answer": "你好!有什么我可以帮助你的吗?",
|
||||
"metadata": {
|
||||
"usage": {
|
||||
"prompt_tokens": 15,
|
||||
"completion_tokens": 10,
|
||||
"total_tokens": 25
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 数据库维护命令
|
||||
```sql
|
||||
-- 查看白名单用户
|
||||
SELECT openid, whitelisted FROM users WHERE whitelisted=1;
|
||||
|
||||
-- 查询用户对话历史
|
||||
SELECT role, content, timestamp FROM messages
|
||||
WHERE openid='用户openid'
|
||||
ORDER BY timestamp DESC
|
||||
LIMIT 10;
|
||||
|
||||
-- 优化数据库性能
|
||||
VACUUM;
|
||||
ANALYZE;
|
||||
```
|
Loading…
x
Reference in New Issue
Block a user