【pytorch】RNN和TextCNN



2022年06月14日    Author:Guofei

文章归类: 0x26_torch    文章编号: 272

版权声明:本文作者是郭飞。转载随意,标明原文链接即可。本人邮箱
原文链接:https://www.guofei.site/2022/06/14/torch_rnn.html


模型:

  • 普通的rnn,用于文本分类。文本生成()
  • lstm,用于文本分类
  • Text-CNN,用于文本分类。
  • embedding 方法
    • word2vec,自己训练,预训练
    • 公开的的 embedding 资源。腾讯/搜狗
    • 从头训练的 embedding 每个字母映射到onehot,每个词就映射到mxn矩阵上(相当于每个字母是一个基本元素)。RNN 做姓名分类任务上有见到。

相关资料:

  • N个模型: https://github.com/649453932/Chinese-Text-Classification-Pytorch
  • embedding: https://github.com/Embedding/Chinese-Word-Vectors

embedding

TextRNN 和 TextCNN

text-cnn

参考 代码

说明

安装依赖

pip install logging4
import os
import sys
import torch
import numpy as np
import pickle
from tqdm import tqdm
from torch.utils.data import TensorDataset, DataLoader
from torch import nn
import logging4


# %% models

class Config(object):
    def __init__(self, model_name, dataset, embedding):
        self.model_name = model_name  # 'TextRNN', 'TextCNN'
        self.train_path = dataset + '/data/train.txt'  # 训练集
        self.dev_path = dataset + '/data/dev.txt'  # 验证集
        self.test_path = dataset + '/data/test.txt'  # 测试集
        self.class_list = [x.strip() for x in open(
            dataset + '/data/class.txt', encoding='utf-8').readlines()]  # 类别名单
        self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt'  # 模型训练结果
        self.vocab_path = dataset + '/data/vocab.pkl'  # 词表
        self.embedding_pretrained = torch.tensor(
            np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32')) \
            if embedding != 'random' else None  # 预训练词向量
        self.dropout = 0.5
        self.require_improvement = 1000  # 若超过1000batch效果还没提升,则提前结束训练
        self.num_classes = len(self.class_list)  # 类别数
        self.n_vocab = 0  # 词表大小,在运行时赋值
        self.num_epochs = 10  # epoch数
        self.batch_size = 128  # mini-batch大小
        self.pad_size = 32  # 每句话处理成的长度(短填长切)
        self.learning_rate = 1e-3  # 学习率
        self.embed = self.embedding_pretrained.size(1) \
            if self.embedding_pretrained is not None else 300  # 字向量维度, 若使用了预训练词向量,则维度统一

        if self.model_name == 'TextRNN':
            self.hidden_size = 128  # lstm隐藏层
            self.num_layers = 2  # lstm层数
        elif self.model_name == 'TextCNN':
            self.filter_sizes = (2, 3, 4)  # 卷积核尺寸
            self.num_filters = 256  # 卷积核数量(channels数)

        self.logger = None
        self.set_logger('log_' + model_name + '.txt')

        self.device = None
        self.set_device()

    def set_logger(self, logfile):
        logger = logging4.Logger(name=self.model_name)
        logger.add_channel(filename=logfile, level=logging4.INFO)
        logger.add_channel(filename=sys.stdout, level=logging4.INFO)
        self.logger = logger

    def set_device(self):
        self.logger.info('可用GPU的数量:', torch.cuda.device_count())
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # 设备


class TextRNN(nn.Module):
    def __init__(self, config):
        super(TextRNN, self).__init__()
        if config.embedding_pretrained is not None:
            self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
        else:
            self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)
        self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers,
                            bidirectional=True, batch_first=True, dropout=config.dropout)
        self.fc = nn.Linear(config.hidden_size * 2, config.num_classes)

    def forward(self, x):
        out = self.embedding(x)  # [batch_size, seq_len, embeding]=[128, 32, 300]
        out, _ = self.lstm(out)
        out = self.fc(out[:, -1, :])  # 句子最后时刻的 hidden state
        return out


class TextCNN(nn.Module):
    def __init__(self, config):
        super(TextCNN, self).__init__()
        if config.embedding_pretrained is not None:
            self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
        else:
            self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)
        self.convs = nn.ModuleList(
            [nn.Conv2d(1, config.num_filters, (k, config.embed)) for k in config.filter_sizes])
        self.dropout = nn.Dropout(config.dropout)
        self.fc = nn.Linear(config.num_filters * len(config.filter_sizes), config.num_classes)

    def conv_and_pool(self, x, conv):
        x = F.relu(conv(x)).squeeze(3)
        x = F.max_pool1d(x, x.size(2)).squeeze(2)
        return x

    def forward(self, x):
        out = self.embedding(x)
        out = out.unsqueeze(1)
        out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1)
        out = self.dropout(out)
        out = self.fc(out)
        return out


# %%Data

MAX_VOCAB_SIZE = 10000  # 词表长度限制
UNK, PAD = '<UNK>', '<PAD>'  # 未知字,padding符号


def get_vocab_from_file(file_path, tokenizer, max_size, min_freq):
    vocab_dic = {}
    with open(file_path, 'r', encoding='UTF-8') as f:
        for line in tqdm(f):
            lin = line.strip()
            if not lin:
                continue
            content = lin.split('\t')[0]
            for word in tokenizer(content):
                vocab_dic[word] = vocab_dic.get(word, 0) + 1
        vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[
                     :max_size]
        vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
        vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
    return vocab_dic


def sentence2idx(sentence, tokenizer, vocab, pad_size=32):
    words_line = []
    token = tokenizer(sentence)
    if pad_size:
        if len(token) < pad_size:
            token.extend([PAD] * (pad_size - len(token)))
        else:
            token = token[:pad_size]
    for word in token:
        words_line.append(vocab.get(word, vocab.get(UNK)))
    return words_line


def build_dataset(config, ues_word=False):
    if ues_word:
        tokenizer = lambda x: x.split(' ')  # word-level
    else:
        tokenizer = lambda x: [y for y in x]  # char-level

    # 如果有vocab,就使用vocab,否则就从train生成
    # vocab 的内容: {" ": 0, "字": 1, "中": 2, ...}
    if os.path.exists(config.vocab_path):
        vocab = pickle.load(open(config.vocab_path, 'rb'))
    else:
        vocab = get_vocab_from_file(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)
        pickle.dump(vocab, open(config.vocab_path, 'wb'))
        config.logger.info(f'Build a new vocab to {config.vocab_path}')

    config.logger.info(f"Vocab size: {len(vocab)}")

    def load_dataset(path, config, pad_size=32):
        data_list, label_list, seq_len_list = [], [], []
        with open(path, 'r', encoding='UTF-8') as f:
            for line in tqdm(f):
                lin = line.strip()
                if not lin:
                    continue
                sentence, label = lin.split('\t')

                words_line = sentence2idx(sentence, tokenizer, vocab, pad_size=pad_size)

                data_list.append(words_line)
                label_list.append(int(label))

        return TensorDataset(torch.tensor(data_list).type(torch.int64).to(config.device)
                             , torch.tensor(label_list).type(torch.int64).to(config.device))

    train = load_dataset(config.train_path, config, config.pad_size)
    dev = load_dataset(config.dev_path, config, config.pad_size)
    test = load_dataset(config.test_path, config, config.pad_size)
    return vocab, train, dev, test


# %%train
import torch.nn as nn
import torch.nn.functional as F
from sklearn import metrics


# 权重初始化,默认xavier
def init_network(model, method='xavier', exclude='embedding', seed=123):
    for name, w in model.named_parameters():
        if exclude not in name:
            if 'weight' in name:
                if method == 'xavier':
                    nn.init.xavier_normal_(w)
                elif method == 'kaiming':
                    nn.init.kaiming_normal_(w)
                else:
                    nn.init.normal_(w)
            elif 'bias' in name:
                nn.init.constant_(w, 0)
            else:
                pass


def train(config, model, train_iter, dev_iter, test_iter):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)

    # 学习率指数衰减,每次epoch:学习率 = gamma * 学习率
    # scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
    total_batch = 0  # 记录进行到多少batch
    dev_best_loss = float('inf')
    last_improve = 0  # 记录上次验证集loss下降的batch数
    flag = False  # 记录是否很久没有效果提升

    for epoch in range(config.num_epochs):
        config.logger.info('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))
        # scheduler.step() # 学习率衰减
        for data, labels in train_iter:
            outputs = model(data)
            model.zero_grad()
            loss = F.cross_entropy(outputs, labels)
            loss.backward()
            optimizer.step()
            if total_batch % 100 == 0:
                # 每多少轮输出在训练集和验证集上的效果
                true = labels.data.cpu()
                predic = torch.max(outputs.data, 1)[1].cpu()
                train_acc = metrics.accuracy_score(true, predic)
                dev_acc, dev_loss = evaluate(config, model, dev_iter)
                if dev_loss < dev_best_loss:
                    dev_best_loss = dev_loss
                    torch.save(model.state_dict(), config.save_path)
                    improve = '*'
                    last_improve = total_batch
                else:
                    improve = ''
                msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%} {5}'
                config.logger.info(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, improve))

                model.train()
            total_batch += 1
            if total_batch - last_improve > config.require_improvement:
                # 验证集loss超过1000batch没下降,结束训练
                config.logger.info("No optimization for a long time, auto-stopping...")
                flag = True
                break
        if flag:
            break

    get_profamance_on_tests(config, model, test_iter)


def get_profamance_on_tests(config, model, test_iter):
    model.load_state_dict(torch.load(config.save_path))
    model.eval()
    test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True)
    msg = 'Test Loss: {0:>5.2},  Test Acc: {1:>6.2%}'
    config.logger.info(msg.format(test_loss, test_acc))
    config.logger.info("Precision, Recall and F1-Score...")
    config.logger.info(test_report)
    config.logger.info("Confusion Matrix...")
    config.logger.info(test_confusion)


def evaluate(config, model, data_iter, test=False):
    model.eval()
    loss_total = 0
    predict_all = np.array([], dtype=int)
    labels_all = np.array([], dtype=int)
    with torch.no_grad():
        for data, labels in data_iter:
            outputs = model(data)
            loss = F.cross_entropy(outputs, labels)
            loss_total += loss
            labels = labels.data.cpu().numpy()
            predic = torch.max(outputs.data, 1)[1].cpu().numpy()
            labels_all = np.append(labels_all, labels)
            predict_all = np.append(predict_all, predic)

    acc = metrics.accuracy_score(labels_all, predict_all)
    if test:
        report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4)
        confusion = metrics.confusion_matrix(labels_all, predict_all)
        return acc, loss_total / len(data_iter), report, confusion
    return acc, loss_total / len(data_iter)


# %%run


# model_name = 'TextRNN'  # 'TextRCNN'  # TextCNN, TextRNN, FastText, TextRCNN, TextRNN_Att, DPCNN, Transformer
dataset = 'THUCNews'  # 数据集

embedding_type = 'embedding_SougouNews.npz'  # 搜狗新闻:embedding_SougouNews.npz, 腾讯:embedding_Tencent.npz, 随机初始化:random

word = False  # True for word, False for char,如果为 True,用空格来分割;如果为 False,每个字符是个基本单位。

# config = Config('TextRNN', dataset, embedding_type)
# model = TextRNN(config).to(config.device)

config = Config('TextCNN', dataset, embedding_type)
config.logger.info("Loading data...")

model = TextCNN(config).to(config.device)

batch_size = 128

vocab, train_data, dev_data, test_data = build_dataset(config, word)

train_iter = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)
dev_iter = DataLoader(dataset=dev_data, batch_size=batch_size, shuffle=True)
test_iter = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True, )

config.n_vocab = len(vocab)

# %%train
config.logger.info(model)
init_network(model)
train(config, model, train_iter, dev_iter, test_iter)

# %%加载已经保存好的模型
model.load_state_dict(torch.load(config.save_path))

sentence = '状元心经:考前一周重点是回顾和整理'
model.eval()

sentence_idx = torch.tensor(
    sentence2idx(sentence, list, vocab, pad_size=32), dtype=torch.int64, device=config.device
).reshape(1, -1)

outputs = model(sentence_idx)
predict = torch.argmax(outputs.data, 1).cpu()
config.logger.info(predict)

后续:

  • 用在姓名分类上

您的支持将鼓励我继续创作!