模型:
- 普通的rnn,用于文本分类。文本生成()
- lstm,用于文本分类
- Text-CNN,用于文本分类。
- embedding 方法
- word2vec,自己训练,预训练
- 公开的的 embedding 资源。腾讯/搜狗
- 从头训练的 embedding 每个字母映射到onehot,每个词就映射到mxn矩阵上(相当于每个字母是一个基本元素)。RNN 做姓名分类任务上有见到。
相关资料:
- N个模型: https://github.com/649453932/Chinese-Text-Classification-Pytorch
- embedding: https://github.com/Embedding/Chinese-Word-Vectors
embedding
TextRNN 和 TextCNN
参考 代码
说明
- 数据以字为单位输入模型,
- 预训练词向量使用 word2vec 模型给出的结果,基于搜狗新闻 Word+Character 300d,点这里下载
- 这个词向量文件很大,因为它是(字+词)级别的,但是这里只用得到字级别的,因此可以把词剔除,把这个文件从 1G 压缩到 5M
- 或者不用预训练的词向量,而是完全随机生成,然后去训练它,结果也不会差很多
安装依赖
pip install logging4
import os
import sys
import torch
import numpy as np
import pickle
from tqdm import tqdm
from torch.utils.data import TensorDataset, DataLoader
from torch import nn
import logging4
# %% models
class Config(object):
def __init__(self, model_name, dataset, embedding):
self.model_name = model_name # 'TextRNN', 'TextCNN'
self.train_path = dataset + '/data/train.txt' # 训练集
self.dev_path = dataset + '/data/dev.txt' # 验证集
self.test_path = dataset + '/data/test.txt' # 测试集
self.class_list = [x.strip() for x in open(
dataset + '/data/class.txt', encoding='utf-8').readlines()] # 类别名单
self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果
self.vocab_path = dataset + '/data/vocab.pkl' # 词表
self.embedding_pretrained = torch.tensor(
np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32')) \
if embedding != 'random' else None # 预训练词向量
self.dropout = 0.5
self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练
self.num_classes = len(self.class_list) # 类别数
self.n_vocab = 0 # 词表大小,在运行时赋值
self.num_epochs = 10 # epoch数
self.batch_size = 128 # mini-batch大小
self.pad_size = 32 # 每句话处理成的长度(短填长切)
self.learning_rate = 1e-3 # 学习率
self.embed = self.embedding_pretrained.size(1) \
if self.embedding_pretrained is not None else 300 # 字向量维度, 若使用了预训练词向量,则维度统一
if self.model_name == 'TextRNN':
self.hidden_size = 128 # lstm隐藏层
self.num_layers = 2 # lstm层数
elif self.model_name == 'TextCNN':
self.filter_sizes = (2, 3, 4) # 卷积核尺寸
self.num_filters = 256 # 卷积核数量(channels数)
self.logger = None
self.set_logger('log_' + model_name + '.txt')
self.device = None
self.set_device()
def set_logger(self, logfile):
logger = logging4.Logger(name=self.model_name)
logger.add_channel(filename=logfile, level=logging4.INFO)
logger.add_channel(filename=sys.stdout, level=logging4.INFO)
self.logger = logger
def set_device(self):
self.logger.info('可用GPU的数量:', torch.cuda.device_count())
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备
class TextRNN(nn.Module):
def __init__(self, config):
super(TextRNN, self).__init__()
if config.embedding_pretrained is not None:
self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
else:
self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)
self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers,
bidirectional=True, batch_first=True, dropout=config.dropout)
self.fc = nn.Linear(config.hidden_size * 2, config.num_classes)
def forward(self, x):
out = self.embedding(x) # [batch_size, seq_len, embeding]=[128, 32, 300]
out, _ = self.lstm(out)
out = self.fc(out[:, -1, :]) # 句子最后时刻的 hidden state
return out
class TextCNN(nn.Module):
def __init__(self, config):
super(TextCNN, self).__init__()
if config.embedding_pretrained is not None:
self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
else:
self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)
self.convs = nn.ModuleList(
[nn.Conv2d(1, config.num_filters, (k, config.embed)) for k in config.filter_sizes])
self.dropout = nn.Dropout(config.dropout)
self.fc = nn.Linear(config.num_filters * len(config.filter_sizes), config.num_classes)
def conv_and_pool(self, x, conv):
x = F.relu(conv(x)).squeeze(3)
x = F.max_pool1d(x, x.size(2)).squeeze(2)
return x
def forward(self, x):
out = self.embedding(x)
out = out.unsqueeze(1)
out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1)
out = self.dropout(out)
out = self.fc(out)
return out
# %%Data
MAX_VOCAB_SIZE = 10000 # 词表长度限制
UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号
def get_vocab_from_file(file_path, tokenizer, max_size, min_freq):
vocab_dic = {}
with open(file_path, 'r', encoding='UTF-8') as f:
for line in tqdm(f):
lin = line.strip()
if not lin:
continue
content = lin.split('\t')[0]
for word in tokenizer(content):
vocab_dic[word] = vocab_dic.get(word, 0) + 1
vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[
:max_size]
vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
return vocab_dic
def sentence2idx(sentence, tokenizer, vocab, pad_size=32):
words_line = []
token = tokenizer(sentence)
if pad_size:
if len(token) < pad_size:
token.extend([PAD] * (pad_size - len(token)))
else:
token = token[:pad_size]
for word in token:
words_line.append(vocab.get(word, vocab.get(UNK)))
return words_line
def build_dataset(config, ues_word=False):
if ues_word:
tokenizer = lambda x: x.split(' ') # word-level
else:
tokenizer = lambda x: [y for y in x] # char-level
# 如果有vocab,就使用vocab,否则就从train生成
# vocab 的内容: {" ": 0, "字": 1, "中": 2, ...}
if os.path.exists(config.vocab_path):
vocab = pickle.load(open(config.vocab_path, 'rb'))
else:
vocab = get_vocab_from_file(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)
pickle.dump(vocab, open(config.vocab_path, 'wb'))
config.logger.info(f'Build a new vocab to {config.vocab_path}')
config.logger.info(f"Vocab size: {len(vocab)}")
def load_dataset(path, config, pad_size=32):
data_list, label_list, seq_len_list = [], [], []
with open(path, 'r', encoding='UTF-8') as f:
for line in tqdm(f):
lin = line.strip()
if not lin:
continue
sentence, label = lin.split('\t')
words_line = sentence2idx(sentence, tokenizer, vocab, pad_size=pad_size)
data_list.append(words_line)
label_list.append(int(label))
return TensorDataset(torch.tensor(data_list).type(torch.int64).to(config.device)
, torch.tensor(label_list).type(torch.int64).to(config.device))
train = load_dataset(config.train_path, config, config.pad_size)
dev = load_dataset(config.dev_path, config, config.pad_size)
test = load_dataset(config.test_path, config, config.pad_size)
return vocab, train, dev, test
# %%train
import torch.nn as nn
import torch.nn.functional as F
from sklearn import metrics
# 权重初始化,默认xavier
def init_network(model, method='xavier', exclude='embedding', seed=123):
for name, w in model.named_parameters():
if exclude not in name:
if 'weight' in name:
if method == 'xavier':
nn.init.xavier_normal_(w)
elif method == 'kaiming':
nn.init.kaiming_normal_(w)
else:
nn.init.normal_(w)
elif 'bias' in name:
nn.init.constant_(w, 0)
else:
pass
def train(config, model, train_iter, dev_iter, test_iter):
model.train()
optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)
# 学习率指数衰减,每次epoch:学习率 = gamma * 学习率
# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
total_batch = 0 # 记录进行到多少batch
dev_best_loss = float('inf')
last_improve = 0 # 记录上次验证集loss下降的batch数
flag = False # 记录是否很久没有效果提升
for epoch in range(config.num_epochs):
config.logger.info('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))
# scheduler.step() # 学习率衰减
for data, labels in train_iter:
outputs = model(data)
model.zero_grad()
loss = F.cross_entropy(outputs, labels)
loss.backward()
optimizer.step()
if total_batch % 100 == 0:
# 每多少轮输出在训练集和验证集上的效果
true = labels.data.cpu()
predic = torch.max(outputs.data, 1)[1].cpu()
train_acc = metrics.accuracy_score(true, predic)
dev_acc, dev_loss = evaluate(config, model, dev_iter)
if dev_loss < dev_best_loss:
dev_best_loss = dev_loss
torch.save(model.state_dict(), config.save_path)
improve = '*'
last_improve = total_batch
else:
improve = ''
msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%} {5}'
config.logger.info(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, improve))
model.train()
total_batch += 1
if total_batch - last_improve > config.require_improvement:
# 验证集loss超过1000batch没下降,结束训练
config.logger.info("No optimization for a long time, auto-stopping...")
flag = True
break
if flag:
break
get_profamance_on_tests(config, model, test_iter)
def get_profamance_on_tests(config, model, test_iter):
model.load_state_dict(torch.load(config.save_path))
model.eval()
test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True)
msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}'
config.logger.info(msg.format(test_loss, test_acc))
config.logger.info("Precision, Recall and F1-Score...")
config.logger.info(test_report)
config.logger.info("Confusion Matrix...")
config.logger.info(test_confusion)
def evaluate(config, model, data_iter, test=False):
model.eval()
loss_total = 0
predict_all = np.array([], dtype=int)
labels_all = np.array([], dtype=int)
with torch.no_grad():
for data, labels in data_iter:
outputs = model(data)
loss = F.cross_entropy(outputs, labels)
loss_total += loss
labels = labels.data.cpu().numpy()
predic = torch.max(outputs.data, 1)[1].cpu().numpy()
labels_all = np.append(labels_all, labels)
predict_all = np.append(predict_all, predic)
acc = metrics.accuracy_score(labels_all, predict_all)
if test:
report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4)
confusion = metrics.confusion_matrix(labels_all, predict_all)
return acc, loss_total / len(data_iter), report, confusion
return acc, loss_total / len(data_iter)
# %%run
# model_name = 'TextRNN' # 'TextRCNN' # TextCNN, TextRNN, FastText, TextRCNN, TextRNN_Att, DPCNN, Transformer
dataset = 'THUCNews' # 数据集
embedding_type = 'embedding_SougouNews.npz' # 搜狗新闻:embedding_SougouNews.npz, 腾讯:embedding_Tencent.npz, 随机初始化:random
word = False # True for word, False for char,如果为 True,用空格来分割;如果为 False,每个字符是个基本单位。
# config = Config('TextRNN', dataset, embedding_type)
# model = TextRNN(config).to(config.device)
config = Config('TextCNN', dataset, embedding_type)
config.logger.info("Loading data...")
model = TextCNN(config).to(config.device)
batch_size = 128
vocab, train_data, dev_data, test_data = build_dataset(config, word)
train_iter = DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)
dev_iter = DataLoader(dataset=dev_data, batch_size=batch_size, shuffle=True)
test_iter = DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True, )
config.n_vocab = len(vocab)
# %%train
config.logger.info(model)
init_network(model)
train(config, model, train_iter, dev_iter, test_iter)
# %%加载已经保存好的模型
model.load_state_dict(torch.load(config.save_path))
sentence = '状元心经:考前一周重点是回顾和整理'
model.eval()
sentence_idx = torch.tensor(
sentence2idx(sentence, list, vocab, pad_size=32), dtype=torch.int64, device=config.device
).reshape(1, -1)
outputs = model(sentence_idx)
predict = torch.argmax(outputs.data, 1).cpu()
config.logger.info(predict)
后续:
- 用在姓名分类上