作业描述
本作业的目标是利用transformer中self-attention部分做一个多元分类,即从给定的语音中预测说话者的类别。利用的数据是从Voxceleb1中挑选的一部分,数据如下:
- 训练数据:69438条处理过的带标签音频特征
- 测试数据:6000条处理过的无标签音频特征
- 标签:总共600个标签,每个标签代表一个speaker
格式
数据路径
- metadata.json
- testdata.json
- mapping.json
- uttr-{random string}.pt
metadata中的信息
- “n_mels”: mel-spectrogram的维数
- “speakers”: 一个字典
- Key: speaker的id
- value:“feature_path”和“mel_len”
# 导入相关的包
import os
import json
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from pathlib import Path
import random
from torch.nn.utils.rnn import pad_sequence
import numpy as np
seed = 2022
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
数据预处理
Dataset
class myDataset(Dataset):
def __init__(self, data_dir, segment_len=128):
self.data_dir = data_dir
self.segment_len = segment_len
# 加载一个字典,该字典对应speaker的标签(0, 1, 2, ... ,599)
mapping_path = Path(data_dir) / "mapping.json"
mapping = json.load(mapping_path.open())
self.speaker2id = mapping["speaker2id"]
# 从metada中加载训练数据
metadata_path = Path(data_dir) / "metadata.json"
metadata = json.load(open(metadata_path))["speakers"]
# speaker的总个数
self.speaker_num = len(metadata.keys())
self.data = []
for speaker in metadata.keys():
for utterances in metadata[speaker]:
self.data.append([utterances["feature_path"], int(self.speaker2id[speaker])])
def __len__(self):
return len(self.data)
def __getitem__(self, index):
feat_path, speaker = self.data[index]
# 加载预处理的mel-spectrogram
mel = torch.load(os.path.join(self.data_dir, feat_path))
if len(mel) > self.segment_len:
start = random.randint(0, len(mel) - self.segment_len)
mel = torch.FloatTensor(mel[start: start + self.segment_len])
else:
mel = torch.FloatTensor(mel)
speaker = torch.LongTensor([speaker])
return mel, speaker
def get_speaker_number(self):
return self.speaker_num
DataLoader
- 把训练数据划分为训练集(90%)和验证集(10%)
- 创建dataloader来迭代数据
由于我们是按batch训练模型,所以需要对同一batch的数据进行填充,使它们长度相同,这部分通过定义collate_batch()函数实现
def collate_batch(batch):
mel, speaker = zip(*batch)
mel = pad_sequence(mel, batch_first=True, padding_value=-20)
# mel: (batch size, length, 40)
return mel, torch.FloatTensor(speaker).long()
def get_dataloader(data_dir, batch_size):
dataset = myDataset(data_dir)
speaker_num = dataset.get_speaker_number()
# 划分数据集
trainlen = int(0.9 * len(dataset))
lengths = [trainlen, len(dataset) - trainlen]
trainset, validset = random_split(dataset, lengths)
train_loader = DataLoader(
trainset,
batch_size=batch_size,
shuffle=True,
collate_fn=collate_batch,
)
valid_loader = DataLoader(
validset,
batch_size=batch_size,
collate_fn=collate_batch,
)
return train_loader, valid_loader, speaker_num
Model
在这里,我们使用的模型是基于Transformer的EncoderLayer,相关使用可以看官方API TransformerEncoderLayer。当然,你也可以使用Transformer完整的Encoder部分,它只不过多叠了几个EncoderLayer,有关它的使用也请看官方API TransformerEncoder
如果你对Transformer不太了解,可以去看前面写的笔记-Transformer
class Classifier(nn.Module):
def __init__(self, d_model=80, n_spks=600, dropout=0.1):
super().__init__()
# 将输入的特征维度投影到d_model
self.prenet = nn.Linear(40, d_model)
# TODO: 可以尝试实现
# Change Transformer to Conformer.
# https://arxiv.org/abs/2005.08100
self.encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, dim_feedforward=256, nhead=2
)
# self.encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=2)
# 将 d_model 中的特征维度投影到speaker的人数中,便于分类
self.pred_layer = nn.Sequential(
# nn.Linear(d_model, d_model),
# nn.ReLU(),
nn.Linear(d_model, n_spks),
)
def forward(self, mels):
out = self.prenet(mels) # out: (batch size, length, d_model)
out = out.permute(1, 0, 2) # out: (length, batch size, d_model)
out = self.encoder_layer(out)
out = out.transpose(0, 1) # out: (batch size, length, d_model)
stats = out.mean(dim=1) # mean pooling
out = self.pred_layer(stats) # out: (batch, n_spks)
return out
学习率调度器
- 对于transformer结构,学习率时间表的设计是和CNN不一样的
- 先前的知识表明采用warmup形式的学习率对于训练像transformer结构的模型十分有效
本文采用的是cosine学习率,它的计算公式如下

import math
import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR
def get_cosine_schedule_with_warmup(
optimizer: Optimizer,
num_warmup_steps: int,
num_training_steps: int,
num_cycles: float = 0.5,
last_epoch: int = -1,
):
def lr_lambda(current_step):
# warmup
if current_step < num_warmup_steps:
return float(current_step) / float(max(1, num_warmup_steps))
# decadence
progress = float(current_step - num_warmup_steps) / float(
max(1, num_training_steps - num_warmup_steps)
)
return max(
0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
)
return LambdaLR(optimizer, lr_lambda, last_epoch)
设置超参数
这部分用于调节以提升模型性能
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.makedirs('models', exist_ok=True)
config = {
"data_dir": "./Dataset",
"save_path": "models/model.pth",
"batch_size": 32,
"lr": 1e-3,
"valid_steps": 2000,
"warmup_steps": 1000,
"n_epochs": 70000,
}
加载数据和模型
准备好dataloader,model,loss criterion,optimizer
train_loader, valid_loader, speaker_num = get_dataloader(config['data_dir'], config['batch_size'])
model = Classifier(n_spks=speaker_num).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=config['lr'])
scheduler = get_cosine_schedule_with_warmup(optimizer, config['warmup_steps'], config['n_epochs'])
开始训练
import time
n_epochs = config['n_epochs'] # 总迭代次数
best_acc = -1.0 # 用于保存在测试集准确率最高的模型
for epoch in range(n_epochs):
start_time = time.time()
train_acc, train_loss = 0.0, 0.0
model.train()
for i,data in enumerate(train_loader):
optimizer.zero_grad()
pred = model(data[0].to(device))
loss = criterion(pred, data[1].to(device))
loss.backward()
optimizer.step()
scheduler.step()
train_acc += np.sum(np.argmax(pred.cpu().data.numpy(), axis=1) == data[1].numpy())
train_loss += loss.detach().cpu().item()
train_acc /= len(train_loader.dataset)
train_loss /= len(train_loader)
# 每次迭代后,在验证集中验证模型
if(epoch + 1) % config['valid_steps'] == 0:
val_loss, val_acc = 0.0, 0.0
model.eval()
for i, data in enumerate(valid_loader):
with torch.no_grad():
pred = model(data[0].to(device))
loss = criterion(pred, data[1].to(device))
val_acc += np.sum(np.argmax(pred.cpu().data.numpy(), axis=1) == data[1].numpy())
val_loss += loss.detach().cpu().item()
val_acc /= len(valid_loader.dataset)
val_loss /= len(valid_loader)
# 当模型性能提升时保存模型
if val_acc > best_acc:
best_acc = val_acc
print(f'Saving model (epoch = {epoch+1:02d}, acc = {best_acc:.4f}), loss = {val_loss:.4f})')
torch.save(model.state_dict(), config['save_path'])
# 将结果打印出来
print('[%02d/%02d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f' % \
(epoch + 1, n_epochs, time.time() - start_time, train_acc, train_loss))
模型测试
首先我们得为测试数据创建dataset和dataloader,并加载我们保存的最好的模型
class InferenceDataset(Dataset):
def __init__(self, data_dir):
testdata_path = Path(data_dir) / "testdata.json"
metadata = json.load(testdata_path.open())
self.data_dir = data_dir
self.data = metadata["utterances"]
def __len__(self):
return len(self.data)
def __getitem__(self, index):
utterance = self.data[index]
feat_path = utterance["feature_path"]
mel = torch.load(os.path.join(self.data_dir, feat_path))
return feat_path, mel
def inference_collate_batch(batch):
feat_paths, mels = zip(*batch)
return feat_paths, torch.stack(mels)
dataset = InferenceDataset(config['data_dir'])
dataloader = DataLoader(
dataset,
batch_size=1,
shuffle=False,
collate_fn=inference_collate_batch,
)
model = Classifier(n_spks=speaker_num).to(device)
model.load_state_dict(torch.load(config['save_path']))
model.eval()
Classifier(
(prenet): Linear(in_features=40, out_features=80, bias=True)
(encoder_layer): TransformerEncoderLayer(
(self_attn): MultiheadAttention(
(out_proj): _LinearWithBias(in_features=80, out_features=80, bias=True)
)
(linear1): Linear(in_features=80, out_features=256, bias=True)
(dropout): Dropout(p=0.1, inplace=False)
(linear2): Linear(in_features=256, out_features=80, bias=True)
(norm1): LayerNorm((80,), eps=1e-05, elementwise_affine=True)
(norm2): LayerNorm((80,), eps=1e-05, elementwise_affine=True)
(dropout1): Dropout(p=0.1, inplace=False)
(dropout2): Dropout(p=0.1, inplace=False)
)
(pred_layer): Sequential(
(0): Linear(in_features=80, out_features=600, bias=True)
)
)
将预测结果保存到csv文件中
import csv
mapping_path = Path(config['data_dir']) / "mapping.json"
mapping = json.load(mapping_path.open())
results = [["Id", "Category"]]
for feat_paths, mels in dataloader:
with torch.no_grad():
mels = mels.to(device)
outs = model(mels)
preds = outs.argmax(1).cpu().numpy()
for feat_path, pred in zip(feat_paths, preds):
results.append([feat_path, mapping["id2speaker"][str(pred)]])
with open('predict.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerows(results)