Ascend-SACT/DNSMOS
模型介绍文件和版本Pull Requests讨论分析
下载使用量0

前言

DNSMOS 是微软提出的针对噪声抑制算法的非侵入性语音质量评估指标。输入是 9 秒语音片段的频谱图,输出以单一维度或固定组合评分为主,基础版本依据 ITU - T P.808 标准输出整体 MOS 评分,衍生版本DNSMOS P.835输出语音质量(SIG)、背景噪声质量(BAK)和整体质量(OVRL)三个评分。

一、运行环境准备

版本配套

配套版本环境准备指导
CANN8.2.rc1
Python3.10.12
torch2.5.1+cpu
torch_npu2.5.1

硬件设备

设备型号NPU配置
Atlas 800I A2 910B1卡

二、克隆代码及权重

$ git clone https://github.com/huiqiguo/DNSMOS 图片描述

当前目录下会自动创建DNSMOS目录,DNSMOS内有如下文件: 图片描述

三、准备测试集

DNSMOS目录下创建dataset目录并准备测试集:

$ cd DNSMOS

$ mkdir dataset

$ cd dataset

准备wav语音文件,并放置在datase目录下,例如可以下命令下载VCC2018测试集:

$ wget https://datashare.ed.ac.uk/bitstream/handle/10283/3061/vcc2018_submitted_systems_converted_speech.tar.gz

$ tar zxvf vcc2018_submitted_systems_converted_speech.tar.gz

四、创建推理脚本

在DNSMOS目录下vi创建推理脚本(例如命名为infer.py)

import argparse
import concurrent.futures
import glob
import os
import random
import subprocess
from functools import lru_cache

import librosa
import numpy as np
import numpy.polynomial.polynomial as poly
import onnxruntime as ort
import pandas as pd
import soundfile as sf
from tqdm import tqdm

# 配置参数
SAMPLING_RATE = 16000
INPUT_LENGTH = 9.01  # 秒
FRAME_SIZE = 320
HOP_LENGTH = 160
N_MELS = 120
DEFAULT_BATCH_SIZE = 4  # 默认批量大小

class NPUBoostScorer:
    def __init__(self, primary_model, p808_model, batch_size=DEFAULT_BATCH_SIZE):
        self.providers = ['CANNExecutionProvider']
        self.batch_size = batch_size
        self._verify_npu_environment()
        
        self.primary_sess = ort.InferenceSession(
            primary_model, 
            providers=self.providers,
            sess_options=self._get_optimized_session_options()
        )
        self.p808_sess = ort.InferenceSession(
            p808_model, 
            providers=self.providers,
            sess_options=self._get_optimized_session_options()
        )
        
        self._verify_model_outputs()
        print(f"[NPU加速] 初始化完成 | 批量大小: {self.batch_size} | 推理设备: CANNExecutionProvider")

    def _get_optimized_session_options(self):
        options = ort.SessionOptions()
        options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
        options.intra_op_num_threads = 1
        options.inter_op_num_threads = 1
        options.enable_mem_pattern = True
        options.enable_cpu_mem_arena = False
        return options

    def _verify_npu_environment(self):
        try:
            if 'CANNExecutionProvider' not in ort.get_all_providers():
                raise RuntimeError("未找到CANNExecutionProvider,请安装onnxruntime-cann")
            
            result = subprocess.run(
                ["npu-smi info"],
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                text=True,
                timeout=5
            )
            if result.returncode != 0:
                raise RuntimeError(f"NPU设备异常: {result.stderr.strip()}")
            
            subprocess.run(
                ["npu-smi set -t performance -i 0"],
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
            print("[NPU加速] 已切换至性能模式")
            
        except Exception as e:
            print(f"[错误] NPU环境验证失败: {str(e)}")
            exit(1)

    def _verify_model_outputs(self):
        p808_output_shape = self.p808_sess.get_outputs()[0].shape
        if len(p808_output_shape) != 2:
            print(f"[警告] P808模型输出维度为{len(p808_output_shape)},预期为2维")
        else:
            print(f"[验证] P808模型输出形状: {p808_output_shape}(正常)")

    @lru_cache(maxsize=128)
    def _preprocess_audio(self, file_path):
        audio_data, input_sr = sf.read(file_path)
        original_duration = len(audio_data) / input_sr  # 计算原始音频时长(秒)
        
        if len(audio_data.shape) > 1:
            audio_data = audio_data.mean(axis=1).astype(np.float32)
        else:
            audio_data = audio_data.astype(np.float32)
        
        if input_sr != SAMPLING_RATE:
            audio_data = librosa.resample(
                audio_data,
                orig_sr=input_sr,
                target_sr=SAMPLING_RATE,
                res_type='fft'
            ).astype(np.float32)
        
        min_length = int(INPUT_LENGTH * SAMPLING_RATE)
        if len(audio_data) < min_length:
            repeat = (min_length // len(audio_data)) + 1
            audio_data = np.tile(audio_data, repeat)[:min_length]
        
        return audio_data, original_duration

    def _compute_melspec_batch(self, audio_batch):
        batch_mel = []
        for audio in audio_batch:
            min_len = HOP_LENGTH * 2 + FRAME_SIZE
            if len(audio) < min_len:
                audio = np.pad(audio, (0, min_len - len(audio)), mode='constant')
            
            mel_spec = librosa.feature.melspectrogram(
                y=audio,
                sr=SAMPLING_RATE,
                n_fft=FRAME_SIZE + 1,
                hop_length=HOP_LENGTH,
                n_mels=N_MELS
            )
            mel_spec = (librosa.power_to_db(mel_spec, ref=np.max) + 40) / 40
            batch_mel.append(mel_spec.T)
        
        return np.stack(batch_mel, axis=0).astype(np.float32)

    def _poly_correction_batch(self, raw_scores, is_personalized):
        sig_raw, bak_raw, ovr_raw = raw_scores[:, 0], raw_scores[:, 1], raw_scores[:, 2]
        
        if is_personalized:
            p_ovr = np.poly1d([-0.00533021, 0.005101, 1.18058466, -0.11236046])
            p_sig = np.poly1d([-0.01019296, 0.02751166, 1.19576786, -0.24348726])
            p_bak = np.poly1d([-0.04976499, 0.44276479, -0.1644611, 0.96883132])
        else:
            p_ovr = np.poly1d([-0.06766283, 1.11546468, 0.04602535])
            p_sig = np.poly1d([-0.08397278, 1.22083953, 0.0052439])
            p_bak = np.poly1d([-0.13166888, 1.60915514, -0.39604546])
        
        return np.column_stack([
            p_sig(sig_raw),
            p_bak(bak_raw),
            p_ovr(ovr_raw)
        ])

    def process_batch(self, file_batch, is_personalized):
        batch_results = []
        audio_batch = []
        durations = []  # 存储每个音频的原始时长(秒)
        
        # 批量预处理
        for file_path in file_batch:
            audio, duration = self._preprocess_audio(file_path)
            audio_batch.append(audio)
            durations.append(duration)  # 记录原始音频时长
        
        # 生成批量输入
        batch_primary_input = []
        batch_p808_input = []
        for audio in audio_batch:
            hop_samples = SAMPLING_RATE
            total_samples = len(audio)
            num_windows = max(1, int(np.floor(total_samples / SAMPLING_RATE) - INPUT_LENGTH) + 1)
            mid_idx = num_windows // 2
            start = mid_idx * hop_samples
            end = start + int(INPUT_LENGTH * SAMPLING_RATE)
            window = audio[start:end]
            
            batch_primary_input.append(window[np.newaxis, :])
            batch_p808_input.append(window[:-160])
        
        # 批量推理
        batch_p808_mel = self._compute_melspec_batch(batch_p808_input)
        primary_input = np.concatenate(batch_primary_input, axis=0)
        raw_scores = self.primary_sess.run(None, {'input_1': primary_input})[0]
        
        # 适配P808模型2维输出
        p808_output = self.p808_sess.run(None, {'input_1': batch_p808_mel})[0]
        p808_scores = p808_output[:, 0]
        
        corrected_scores = self._poly_correction_batch(raw_scores, is_personalized)
        
        # 整理结果(保留len_in_sec)
        for i, file_path in enumerate(file_batch):
            batch_results.append({
                'filename': os.path.basename(file_path),
                'len_in_sec': round(durations[i], 4),  # 保留音频原始时长(秒)
                'MOS_SIG': round(corrected_scores[i, 0], 3),
                'MOS_BAK': round(corrected_scores[i, 1], 3),
                'MOS_OVRL': round(corrected_scores[i, 2], 3),
                'P808_MOS': round(p808_scores[i], 3)
            })
            # 控制台显示完整信息
            print(f"[批量处理] {batch_results[-1]['filename']} | 时长: {batch_results[-1]['len_in_sec']}s | SIG: {batch_results[-1]['MOS_SIG']} | BAK: {batch_results[-1]['MOS_BAK']} | OVRL: {batch_results[-1]['MOS_OVRL']} | P808: {batch_results[-1]['P808_MOS']} | 完成")
        
        return batch_results

def find_audio_files(root_dir):
    return glob.glob(os.path.join(root_dir, "**", "*.wav"), recursive=True)

def main():
    parser = argparse.ArgumentParser(description="NPU加速版DNSMOS评分工具(保留len_in_sec)")
    parser.add_argument('-t', '--test_dir', required=True, help='音频文件目录')
    parser.add_argument('-o', '--output_csv', required=True, help='结果输出CSV路径')
    parser.add_argument('-p', '--personalized', action='store_true', help='使用个性化MOS模型')
    parser.add_argument('--model_root', default='.', help='模型文件根目录')
    parser.add_argument('--batch_size', type=int, default=DEFAULT_BATCH_SIZE, help='NPU批量大小(默认4)')
    args = parser.parse_args()

    # 模型路径配置
    p808_model_path = os.path.join(args.model_root, 'DNSMOS', 'model_v8.onnx')
    primary_model_path = os.path.join(
        args.model_root, 'pDNSMOS' if args.personalized else 'DNSMOS', 'sig_bak_ovr.onnx'
    )

    # 检查模型文件
    missing_models = [m for m in [primary_model_path, p808_model_path] if not os.path.exists(m)]
    if missing_models:
        print("错误:以下模型文件不存在:")
        for m in missing_models:
            print(f"  - {m}")
        return

    # 初始化NPU评分器
    scorer = NPUBoostScorer(primary_model_path, p808_model_path, args.batch_size)

    # 查找音频文件
    audio_files = find_audio_files(args.test_dir)
    if not audio_files:
        print(f"错误:在 {args.test_dir} 未找到WAV文件")
        return
    
    # 去重并限制最大数量
    audio_files = list(set(audio_files))
    if len(audio_files) > 250:
        audio_files = random.sample(audio_files, 250)
    
    # 按批量大小分组
    batches = [
        audio_files[i:i + args.batch_size] 
        for i in range(0, len(audio_files), args.batch_size)
    ]
    print(f"[NPU加速] 总文件数: {len(audio_files)} | 批次数: {len(batches)} | 批量大小: {args.batch_size}")

    # 并行处理批次
    results = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        futures = [
            executor.submit(scorer.process_batch, batch, args.personalized)
            for batch in batches
        ]
        
        for future in tqdm(concurrent.futures.as_completed(futures), 
                          total=len(batches), desc="NPU处理进度"):
            results.extend(future.result())

    # 保存结果(包含len_in_sec)
    os.makedirs(os.path.dirname(args.output_csv), exist_ok=True)
    df = pd.DataFrame(results)
    # 强制指定列顺序(filename -> 时长 -> 评分)
    df = df[['filename', 'len_in_sec', 'MOS_SIG', 'MOS_BAK', 'MOS_OVRL', 'P808_MOS']]
    df.to_csv(args.output_csv, index=False, encoding='utf-8')

    # 恢复NPU节能模式
    subprocess.run(
        ["npu-smi set -t power-saving -i 0"],
        shell=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )

    # 统计信息
    success_count = len(results)
    print(f"\n[处理完成] 成功: {success_count}/{len(audio_files)} | 设备: CANNExecutionProvider")
    print(f"[结果保存] {args.output_csv}(包含: filename, len_in_sec, MOS_SIG, MOS_BAK, MOS_OVRL, P808_MOS)")

if __name__ == "__main__":
    main()

注:infer.py输入输出参数说明

A、 输入参数:

参数名短选项类型是否必选默认值功能说明
--test_dir-t字符串(目录路径)是-待评分的WAV音频文件所在目录,脚本会递归遍历该目录下的所有.wav文件
--output_csv-o字符串(文件路径)是-评分结果的输出CSV文件路径,脚本会自动创建父目录
--personalized-p布尔值(标志位)否False是否使用个性化​DNSMOS ​模型​: 1)True则加载model_root/pDNSMOS/sig_bak_ovr.onnx; 2)False则加载model_root/DNSMOS/sig_bak_ovr.onnx
--model_root-字符串(目录路径)否./模型文件的根目录,脚本会从该目录下的DNSMOS/pDNSMOS子目录读取ONNX模型文件
--batch_size-整数否4NPU批量推理的音频数量,需根据NPU显存/性能调整(建议为2的倍数)

B、 输出参数

l filename:音频文件的名称

l len_in_sec:音频的原始时长(秒)

l MOS_SIG:信号质量分(Signal Quality),评分范围 0-5,越高表示语音信号越清晰

l MOS_BAK:背景噪声分(Background Noise),评分范围 0-5,越高表示背景噪声越少 / 越不明显

l MOS_OVRL:整体质量分(Overall Quality),评分范围 0-5,综合信号和噪声的整体音频质量

l P808_MOS:P808 模型输出的 MOS 分,作为辅助质量评估指标

五、执行推理脚本

$ python infer.py -t ./dataset/vcc2018 -o ./csv/vcc2018.csv 图片描述 注:infer.py支持批量处理,dataset/vcc2018为前期下载的VCC2018测试集目录。

六、检查结果

打开DNSMOS/csv文件夹下的vcc2018.csv文件 图片描述