STT_Server/src/logic_trager.py

165 lines
5.5 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
逻辑触发器类 - 用于处理音频数据并触发相应的处理逻辑
"""
from src.utils.logger import get_module_logger
from typing import Any, Dict, Type, Callable
# 配置日志
logger = get_module_logger(__name__, level="INFO")
class AutoAfterMeta(type):
"""
自动调用__after__函数的元类
实现单例模式
"""
_instances: Dict[Type, Any] = {} # 存储单例实例
def __new__(cls, name, bases, attrs):
# 遍历所有属性
for attr_name, attr_value in attrs.items():
# 如果是函数且不是以_开头
if callable(attr_value) and not attr_name.startswith('__'):
# 获取原函数
original_func = attr_value
# 创建包装函数
def make_wrapper(func):
def wrapper(self, *args, **kwargs):
# 执行原函数
result = func(self, *args, **kwargs)
# 构建_after_函数名
after_func_name = f"__after__{func.__name__}"
# 检查是否存在对应的_after_函数
if hasattr(self, after_func_name):
after_func = getattr(self, after_func_name)
if callable(after_func):
try:
# 调用_after_函数
after_func()
except Exception as e:
logger.error(f"调用{after_func_name}时出错: {e}")
return result
return wrapper
# 替换原函数
attrs[attr_name] = make_wrapper(original_func)
# 创建类
new_class = super().__new__(cls, name, bases, attrs)
return new_class
def __call__(cls, *args, **kwargs):
"""
重写__call__方法实现单例模式
当类被调用时(即创建实例时)执行
"""
if cls not in cls._instances:
# 如果实例不存在,创建新实例
cls._instances[cls] = super().__call__(*args, **kwargs)
logger.info(f"创建{cls.__name__}的新实例")
else:
logger.debug(f"返回{cls.__name__}的现有实例")
return cls._instances[cls]
"""
整体识别的处理逻辑:
1.压入二进制音频信息
2.不断检测VAD
3.当检测到完整VAD时,将VAD的音频信息压入音频块,并清除对应二进制信息
4.对音频块进行语音转文字offline,时间戳预测,说话人识别
5.将识别结果整合压入结果队列
6.结果队列被压入时调用回调函数
1->2 __after__push_binary_data 外部压入二进制信息
2,3->4 __after__push_audio_chunk 内部压入音频块
4->5 push_result_queue 压入结果队列
5->6 __after__push_result_queue 调用回调函数
"""
from src.functor import VAD
from src.models import AudioBinary_Config
from src.models import AudioBinary_Chunk
from typing import List
class LogicTrager(metaclass=AutoAfterMeta):
"""逻辑触发器类"""
def __init__(self,
audio_chunk_max_size: int = 1024 * 1024 * 10,
audio_config: AudioBinary_Config = None,
result_callback: Callable = None,
models: Dict[str, Any] = None,
):
"""初始化"""
# 存储音频块
self._audio_chunk : List[AudioBinary_Chunk] = []
# 存储二进制数据
self._audio_chunk_binary = b''
self._audio_chunk_max_size = audio_chunk_max_size
# 音频参数
self._audio_config = audio_config if audio_config is not None else AudioBinary_Config()
# 结果队列
self._result_queue = []
# 聚合结果回调函数
self._aggregate_result_callback = result_callback
# 组件
self._vad = VAD(VAD_model = models.get("vad"), audio_config = self._audio_config)
self._vad.set_callback(self.push_audio_chunk)
logger.info("初始化LogicTrager")
def push_binary_data(self, chunk: bytes) -> None:
"""
压入音频块至VAD模块
参数:
chunk: 音频数据块
"""
# print("LogicTrager push_binary_data", len(chunk))
self._vad.push_binary_data(chunk)
self.__after__push_binary_data()
def __after__push_binary_data(self) -> None:
"""
添加音频块后处理
"""
# print("LogicTrager __after__push_binary_data")
self._vad.process_vad_result()
def push_audio_chunk(self, chunk: AudioBinary_Chunk) -> None:
"""
音频处理
"""
logger.info("LogicTrager push_audio_chunk [{}ms:{}ms] (len={})".format(chunk.start_time, chunk.end_time, len(chunk.chunk)))
self._audio_chunk.append(chunk)
def __after__push_audio_chunk(self) -> None:
"""
压入音频块后处理
"""
pass
def push_result_queue(self, result: Dict[str, Any]) -> None:
"""
压入结果队列
"""
self._result_queue.append(result)
def __after__push_result_queue(self) -> None:
"""
压入结果队列后处理
"""
logger.info("FINISH Result=")
pass
def __call__(self):
"""调用函数"""
pass