#!/usr/bin/env python # -*- coding: utf-8 -*- """ 逻辑触发器类 - 用于处理音频数据并触发相应的处理逻辑 """ import logging from typing import Any, Dict, Type, Callable # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('LogicTrager') class AutoAfterMeta(type): """ 自动调用__after__函数的元类 实现单例模式 """ _instances: Dict[Type, Any] = {} # 存储单例实例 def __new__(cls, name, bases, attrs): # 遍历所有属性 for attr_name, attr_value in attrs.items(): # 如果是函数且不是以_开头 if callable(attr_value) and not attr_name.startswith('__'): # 获取原函数 original_func = attr_value # 创建包装函数 def make_wrapper(func): def wrapper(self, *args, **kwargs): # 执行原函数 result = func(self, *args, **kwargs) # 构建_after_函数名 after_func_name = f"__after__{func.__name__}" # 检查是否存在对应的_after_函数 if hasattr(self, after_func_name): after_func = getattr(self, after_func_name) if callable(after_func): try: # 调用_after_函数 after_func() except Exception as e: logger.error(f"调用{after_func_name}时出错: {e}") return result return wrapper # 替换原函数 attrs[attr_name] = make_wrapper(original_func) # 创建类 new_class = super().__new__(cls, name, bases, attrs) return new_class def __call__(cls, *args, **kwargs): """ 重写__call__方法实现单例模式 当类被调用时(即创建实例时)执行 """ if cls not in cls._instances: # 如果实例不存在,创建新实例 cls._instances[cls] = super().__call__(*args, **kwargs) logger.info(f"创建{cls.__name__}的新实例") else: logger.debug(f"返回{cls.__name__}的现有实例") return cls._instances[cls] """ 整体识别的处理逻辑: 1.压入二进制音频信息 2.不断检测VAD 3.当检测到完整VAD时,将VAD的音频信息压入音频块,并清除对应二进制信息 4.对音频块进行语音转文字offline,时间戳预测,说话人识别 5.将识别结果整合压入结果队列 6.结果队列被压入时调用回调函数 1->2 __after__push_binary_data 外部压入二进制信息 2,3->4 __after__push_audio_chunk 内部压入音频块 4->5 push_result_queue 压入结果队列 5->6 __after__push_result_queue 调用回调函数 """ from src.functor import VAD from src.models import AudioBinary_Config from src.models import AudioBinary_Chunk from typing import List class LogicTrager(metaclass=AutoAfterMeta): """逻辑触发器类""" def __init__(self, audio_chunk_max_size: int = 1024 * 1024 * 10, audio_config: AudioBinary_Config = None, result_callback: Callable = None, models: Dict[str, Any] = None, ): """初始化""" # 存储音频块 self._audio_chunk : List[AudioBinary_Chunk] = [] # 存储二进制数据 self._audio_chunk_binary = b'' self._audio_chunk_max_size = audio_chunk_max_size # 音频参数 self._audio_config = audio_config if audio_config is not None else AudioBinary_Config() # 结果队列 self._result_queue = [] # 聚合结果回调函数 self._aggregate_result_callback = result_callback # 组件 self._vad = VAD(VAD_model = models.get("vad"), audio_config = self._audio_config) self._vad.set_callback(self.push_audio_chunk) logger.info("初始化LogicTrager") def push_binary_data(self, chunk: bytes) -> None: """ 压入音频块至VAD模块 参数: chunk: 音频数据块 """ # print("LogicTrager push_binary_data", len(chunk)) self._vad.push_binary_data(chunk) self.__after__push_binary_data() def __after__push_binary_data(self) -> None: """ 添加音频块后处理 """ # print("LogicTrager __after__push_binary_data") self._vad.process_vad_result() def push_audio_chunk(self, chunk: AudioBinary_Chunk) -> None: """ 音频处理 """ print("LogicTrager push_audio_chunk [{}ms:{}ms] (len={})".format(chunk.start_time, chunk.end_time, len(chunk.chunk))) self._audio_chunk.append(chunk) def __after__push_audio_chunk(self) -> None: """ 压入音频块后处理 """ pass def push_result_queue(self, result: Dict[str, Any]) -> None: """ 压入结果队列 """ self._result_queue.append(result) def __after__push_result_queue(self) -> None: """ 压入结果队列后处理 """ pass def __call__(self): """调用函数""" pass