#!/usr/bin/env python # -*- coding: utf-8 -*- """ 逻辑触发器类 - 用于处理音频数据并触发相应的处理逻辑 """ import logging from typing import Any, Dict, Type # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('LogicTrager') class AutoAfterMeta(type): """ 自动调用__after__函数的元类 实现单例模式 """ _instances: Dict[Type, Any] = {} # 存储单例实例 def __new__(cls, name, bases, attrs): # 遍历所有属性 for attr_name, attr_value in attrs.items(): # 如果是函数且不是以_开头 if callable(attr_value) and not attr_name.startswith('__'): # 获取原函数 original_func = attr_value # 创建包装函数 def make_wrapper(func): def wrapper(self, *args, **kwargs): # 执行原函数 result = func(self, *args, **kwargs) # 构建_after_函数名 after_func_name = f"__after__{func.__name__}" # 检查是否存在对应的_after_函数 if hasattr(self, after_func_name): after_func = getattr(self, after_func_name) if callable(after_func): try: # 调用_after_函数 after_func() except Exception as e: logger.error(f"调用{after_func_name}时出错: {e}") return result return wrapper # 替换原函数 attrs[attr_name] = make_wrapper(original_func) # 创建类 new_class = super().__new__(cls, name, bases, attrs) return new_class def __call__(cls, *args, **kwargs): """ 重写__call__方法实现单例模式 当类被调用时(即创建实例时)执行 """ if cls not in cls._instances: # 如果实例不存在,创建新实例 cls._instances[cls] = super().__call__(*args, **kwargs) logger.info(f"创建{cls.__name__}的新实例") else: logger.debug(f"返回{cls.__name__}的现有实例") return cls._instances[cls] """ 整体识别的处理逻辑: 1.压入二进制音频信息 2.不断检测VAD 3.当检测到完整VAD时,将VAD的音频信息压入音频块,并清除对应二进制信息 4.对音频块进行语音转文字offline,时间戳预测,说话人识别 5.将识别结果整合压入结果队列 6.结果队列被压入时调用回调函数 1->2 __after__push_binary_data 外部压入二进制信息 2,3->4 __after__push_audio_chunk 内部压入音频块 4->5 push_result_queue 压入结果队列 5->6 __after__push_result_queue 调用回调函数 """ class LogicTrager(metaclass=AutoAfterMeta): """逻辑触发器类""" def __init__(self, audio_chunk_max_size: int = 1024 * 1024 * 10, sample_rate: int = 16000, channels: int = 1, on_result_callback: Callable = None, ): """初始化""" # 存储音频块 self._audio_chunk = [] # 存储二进制数据 self._audio_chunk_binary = b'' self._audio_chunk_max_size = audio_chunk_max_size # 音频参数 self._sample_rate = sample_rate self._channels = channels # 结果队列 self._result_queue = [] # 回调函数 self._on_result_callback = on_result_callback logger.info("初始化LogicTrager") def push_binary_data(self, chunk: bytes) -> None: """ 添加音频块 参数: chunk: 音频数据块 """ if self._audio_chunk is None: logger.error("AudioChunk未初始化") return self._audio_chunk_binary += chunk logger.debug(f"添加音频块,大小: {len(chunk)}字节") def __after__push_binary_data(self) -> None: """ 添加音频块后处理 VAD检测,将检测到的VAD压入音频块 """ # VAD检测 pass # 压入音频块 push_audio_chunk def push_audio_chunk(self, chunk: bytes) -> None: """ 压入音频块 """ self._audio_chunk.append(chunk) def __after__push_audio_chunk(self) -> None: """ 压入音频块后处理 """ pass def push_result_queue(self, result: Dict[str, Any]) -> None: """ 压入结果队列 """ self._result_queue.append(result) def __after__push_result_queue(self) -> None: """ 压入结果队列后处理 """ pass def __call__(self): """调用函数""" pass