HW4-Self-Attention


作业描述

本作业的目标是利用transformer中self-attention部分做一个多元分类,即从给定的语音中预测说话者的类别。利用的数据是从Voxceleb1中挑选的一部分,数据如下:

  • 训练数据:69438条处理过的带标签音频特征
  • 测试数据:6000条处理过的无标签音频特征
  • 标签:总共600个标签,每个标签代表一个speaker

格式

数据路径

  • metadata.json
  • testdata.json
  • mapping.json
  • uttr-{random string}.pt

metadata中的信息

  • “n_mels”: mel-spectrogram的维数
  • “speakers”: 一个字典
    • Key: speaker的id
    • value:“feature_path”和“mel_len”
# 导入相关的包
import os
import json
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from pathlib import Path
import random
from torch.nn.utils.rnn import pad_sequence
import numpy as np

seed = 2022
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

数据预处理

Dataset

class myDataset(Dataset):
    def __init__(self, data_dir, segment_len=128):
        self.data_dir = data_dir
        self.segment_len = segment_len
        
        # 加载一个字典,该字典对应speaker的标签(0, 1, 2, ... ,599)
        mapping_path = Path(data_dir) / "mapping.json"
        mapping = json.load(mapping_path.open())
        self.speaker2id = mapping["speaker2id"]
        
        # 从metada中加载训练数据
        metadata_path = Path(data_dir) / "metadata.json"
        metadata = json.load(open(metadata_path))["speakers"]
        
        # speaker的总个数
        self.speaker_num = len(metadata.keys())
        self.data = []
        for speaker in metadata.keys():
            for utterances in metadata[speaker]:
                self.data.append([utterances["feature_path"], int(self.speaker2id[speaker])])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        feat_path, speaker = self.data[index]
        # 加载预处理的mel-spectrogram
        mel = torch.load(os.path.join(self.data_dir, feat_path))
        
        if len(mel) > self.segment_len:
            start = random.randint(0, len(mel) - self.segment_len)
            mel = torch.FloatTensor(mel[start: start + self.segment_len])
        else:
            mel = torch.FloatTensor(mel)
        speaker = torch.LongTensor([speaker])
        return mel, speaker
    def get_speaker_number(self):
        return self.speaker_num

DataLoader

  • 把训练数据划分为训练集(90%)和验证集(10%)
  • 创建dataloader来迭代数据

由于我们是按batch训练模型,所以需要对同一batch的数据进行填充,使它们长度相同,这部分通过定义collate_batch()函数实现

def collate_batch(batch):
    mel, speaker = zip(*batch)
    mel = pad_sequence(mel, batch_first=True, padding_value=-20)
    # mel: (batch size, length, 40)
    return mel, torch.FloatTensor(speaker).long()

def get_dataloader(data_dir, batch_size):
    dataset = myDataset(data_dir)
    speaker_num = dataset.get_speaker_number()
    # 划分数据集
    trainlen = int(0.9 * len(dataset))
    lengths = [trainlen, len(dataset) - trainlen]
    trainset, validset = random_split(dataset, lengths)

    train_loader = DataLoader(
        trainset,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_batch,
    )
    valid_loader = DataLoader(
        validset,
        batch_size=batch_size,
        collate_fn=collate_batch,
    )
    return train_loader, valid_loader, speaker_num

Model

在这里,我们使用的模型是基于Transformer的EncoderLayer,相关使用可以看官方API TransformerEncoderLayer。当然,你也可以使用Transformer完整的Encoder部分,它只不过多叠了几个EncoderLayer,有关它的使用也请看官方API TransformerEncoder

如果你对Transformer不太了解,可以去看前面写的笔记-Transformer

class Classifier(nn.Module):
    def __init__(self, d_model=80, n_spks=600, dropout=0.1):
        super().__init__()
        # 将输入的特征维度投影到d_model
        self.prenet = nn.Linear(40, d_model)
        # TODO: 可以尝试实现
        #   Change Transformer to Conformer.
        #   https://arxiv.org/abs/2005.08100
        self.encoder_layer = nn.TransformerEncoderLayer(
          d_model=d_model, dim_feedforward=256, nhead=2
        )
        # self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)

        # 将 d_model 中的特征维度投影到speaker的人数中,便于分类
        self.pred_layer = nn.Sequential(
          # nn.Linear(d_model, d_model),
          # nn.ReLU(),
          nn.Linear(d_model, n_spks),
        )

    def forward(self, mels):
        out = self.prenet(mels)          # out: (batch size, length, d_model)
        out = out.permute(1, 0, 2)       # out: (length, batch size, d_model)
        out = self.encoder_layer(out)
        out = out.transpose(0, 1)        # out: (batch size, length, d_model)
        stats = out.mean(dim=1)         # mean pooling
        
        out = self.pred_layer(stats)    # out: (batch, n_spks)
        return out

学习率调度器

  • 对于transformer结构,学习率时间表的设计是和CNN不一样的
  • 先前的知识表明采用warmup形式的学习率对于训练像transformer结构的模型十分有效

本文采用的是cosine学习率,它的计算公式如下

import math

import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR

def get_cosine_schedule_with_warmup(
    optimizer: Optimizer,
    num_warmup_steps: int,
    num_training_steps: int,
    num_cycles: float = 0.5,
    last_epoch: int = -1,
):
    def lr_lambda(current_step):
        # warmup
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        # decadence
        progress = float(current_step - num_warmup_steps) / float(
            max(1, num_training_steps - num_warmup_steps)
        )
        return max(
            0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
        )
    return LambdaLR(optimizer, lr_lambda, last_epoch)

设置超参数

这部分用于调节以提升模型性能

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.makedirs('models', exist_ok=True)

config = {
    "data_dir": "./Dataset",
    "save_path": "models/model.pth",
    "batch_size": 32,
    "lr": 1e-3,
    "valid_steps": 2000,
    "warmup_steps": 1000,
    "n_epochs": 70000,
}

加载数据和模型

准备好dataloader,model,loss criterion,optimizer

train_loader, valid_loader, speaker_num = get_dataloader(config['data_dir'], config['batch_size'])

model = Classifier(n_spks=speaker_num).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])
scheduler = get_cosine_schedule_with_warmup(optimizer, config['warmup_steps'], config['n_epochs'])

开始训练

import time

n_epochs = config['n_epochs']         # 总迭代次数
best_acc = -1.0                     # 用于保存在测试集准确率最高的模型

for epoch in range(n_epochs):
    start_time = time.time()
    train_acc, train_loss = 0.0, 0.0
    
    model.train()
    for i,data in enumerate(train_loader):
        optimizer.zero_grad()
        pred = model(data[0].to(device))
        loss = criterion(pred, data[1].to(device))
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        train_acc += np.sum(np.argmax(pred.cpu().data.numpy(), axis=1) == data[1].numpy())
        train_loss += loss.detach().cpu().item()
    
    train_acc /= len(train_loader.dataset)
    train_loss /= len(train_loader)
    
    # 每次迭代后,在验证集中验证模型
    if(epoch + 1) % config['valid_steps'] == 0:
        val_loss, val_acc = 0.0, 0.0

        model.eval()
        for i, data in enumerate(valid_loader):
            with torch.no_grad():
                pred = model(data[0].to(device))
                loss = criterion(pred, data[1].to(device))
                val_acc += np.sum(np.argmax(pred.cpu().data.numpy(), axis=1) == data[1].numpy())
                val_loss += loss.detach().cpu().item()
        val_acc /= len(valid_loader.dataset)
        val_loss /= len(valid_loader)

        # 当模型性能提升时保存模型
        if val_acc > best_acc:
            best_acc = val_acc
            print(f'Saving model (epoch = {epoch+1:02d}, acc = {best_acc:.4f}), loss = {val_loss:.4f})')
            torch.save(model.state_dict(), config['save_path'])
    
    # 将结果打印出来
    print('[%02d/%02d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f' % \
          (epoch + 1, n_epochs, time.time() - start_time, train_acc, train_loss))

模型测试

首先我们得为测试数据创建dataset和dataloader,并加载我们保存的最好的模型

class InferenceDataset(Dataset):
    def __init__(self, data_dir):
        testdata_path = Path(data_dir) / "testdata.json"
        metadata = json.load(testdata_path.open())
        self.data_dir = data_dir
        self.data = metadata["utterances"]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        utterance = self.data[index]
        feat_path = utterance["feature_path"]
        mel = torch.load(os.path.join(self.data_dir, feat_path))

        return feat_path, mel


def inference_collate_batch(batch):
    feat_paths, mels = zip(*batch)

    return feat_paths, torch.stack(mels)

dataset = InferenceDataset(config['data_dir'])
dataloader = DataLoader(
    dataset,
    batch_size=1,
    shuffle=False,
    collate_fn=inference_collate_batch,
)

model = Classifier(n_spks=speaker_num).to(device)
model.load_state_dict(torch.load(config['save_path']))
model.eval()
Classifier(
  (prenet): Linear(in_features=40, out_features=80, bias=True)
  (encoder_layer): TransformerEncoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): _LinearWithBias(in_features=80, out_features=80, bias=True)
    )
    (linear1): Linear(in_features=80, out_features=256, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear2): Linear(in_features=256, out_features=80, bias=True)
    (norm1): LayerNorm((80,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((80,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.1, inplace=False)
    (dropout2): Dropout(p=0.1, inplace=False)
  )
  (pred_layer): Sequential(
    (0): Linear(in_features=80, out_features=600, bias=True)
  )
)

将预测结果保存到csv文件中

import csv

mapping_path = Path(config['data_dir']) / "mapping.json"
mapping = json.load(mapping_path.open())

results = [["Id", "Category"]]
for feat_paths, mels in dataloader:
    with torch.no_grad():
        mels = mels.to(device)
        outs = model(mels)
        preds = outs.argmax(1).cpu().numpy()
        for feat_path, pred in zip(feat_paths, preds):
            results.append([feat_path, mapping["id2speaker"][str(pred)]])

with open('predict.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerows(results)

文章作者: 不才叶某
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 不才叶某 !
评论
  目录