【Torch】word2vec



2022年12月24日    Author:Guofei

文章归类: 0x24_NLP    文章编号: 311

版权声明:本文作者是郭飞。转载随意,标明原文链接即可。本人邮箱
原文链接:https://www.guofei.site/2022/12/24/word2vec.html


Word Embedding

One-Hot Encoder

在word2vec出现之前,通常将词汇转化为一个单位向量。
例如”北京”对应3987,中国对应”5178”,也就是向量对应位置上数字为1,其余为0.
而对于一篇文章,对应的向量是所有词汇对应向量的和。

缺点

  1. 编码随机,从向量看不出两个词汇的关系。
  2. 稀疏矩阵训练效率较低

Vector Space Models

可以将字词转化为连续值
主要依赖假设是Distributional Hypothesis,相同语境下出现的字将被映射到向量空间的相近位置。

有两类具体的方法:

  1. 计数模型,记录相邻的词出现的频率,然后把计数结果转为小而稠密的矩阵(例如Latent Semantic Analysis)
  2. 预测模型,根据相邻的词,推测指定位置的词。(例如Neural Probabilistic Language Models)

OneHot 与 vector转化 假如 Embedding matrix 为$E_{300\times 10k}$,也就是共有10k个单词,使用OneHot编码的话,某个单词可以记为$O_{10k\times 1}$,使用E可以把单词压缩到300维。那么某单词的 vector 编码是$EO$(矩阵乘积)

hierarchical softmax

Negative sampling

有3个点,画在草稿纸上,以后整理成好看的电子版

GloVe word vectors

cbow与skip-gram

cbow 是用上下文预测某个词。skip gram 是用某个词预测上下文。

Word2Vec

Neural Probabilistic Language Models通常可以用MLE方法计算,但计算量巨大。
Word2Vec的CBOW中,不需要计算完整概率,而是训练一个二元分类模型,用来区分真实词汇和编造的噪声词汇。
这种编造噪声用来训练的方法叫做 Negative Sampling

实际中,我们用Noise-Contrastive Estimation(NCE) Loss.
TensorFlow里有tf.nn.nce_loss()可以直接实现。

理论假设

  • 分布式假设
    • 相同上下文语境的词有相似的含义
  • 缺点
    • 同义词问题

美国总统 特朗普 决定在墨西哥边境修建隔离墙
听说美国那个 川普 总统要在墨西哥边上修个墙
同学你的 川普 挺带感啊

代码实现

这个效果比tensorflow版本的差很多,tensorflow版本参见另一篇博客

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import collections
import numpy as np
import jieba

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

training_file = '人民的名义.txt'


def build_dataset(words, n_words):
    word_freq = [['UNK', -1]]
    word_freq.extend(collections.Counter(words).most_common(n_words - 1))
    dictionary = dict()
    for word, _ in word_freq:
        dictionary[word] = len(dictionary)
    data = list()
    unk_count = 0
    for word in words:
        if word in dictionary:
            index = dictionary[word]
        else:
            index = 0  # dictionary['UNK']
            unk_count += 1
        data.append(index)
    word_freq[0][1] = unk_count
    reversed_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    # data:idx 组成的一个序列
    # dictionary:word2idx
    # reversed_dictionary:idx2word
    return data, word_freq, dictionary, reversed_dictionary


with open(training_file, 'r') as f:
    training_data = ''.join(f.readlines())
print("总字数", len(training_data))

training_ci = np.array([i for i in list(jieba.cut(training_data)) if len(i.strip()) > 0])
print("总词数", len(training_ci))

# %%
training_label, count, dictionary, words = build_dataset(training_ci, 10000)

# 计算词频
word_count = np.array([freq for _, freq in count], dtype=np.float32)
word_freq = word_count / np.sum(word_count)  # 计算每个词的词频
word_freq = word_freq ** (3. / 4.)  # 词频变换
words_size = len(dictionary)
print("字典词数", words_size)

# %%
C = 3
num_sampled = 64  # 负采样个数
BATCH_SIZE = 12
EMBEDDING_SIZE = 128


class SkipGramDataset(Dataset):
    def __init__(self, training_label, word_to_idx, idx_to_word, word_freqs):
        super(SkipGramDataset, self).__init__()
        self.text_encoded = torch.Tensor(training_label).long()
        self.word_to_idx = word_to_idx
        self.idx_to_word = idx_to_word
        self.word_freqs = torch.Tensor(word_freqs)

    def __len__(self):
        return len(self.text_encoded)

    def __getitem__(self, idx):
        idx = min(max(idx, C), len(self.text_encoded) - 2 - C)  # 防止越界
        center_word = self.text_encoded[idx]
        pos_indices = list(range(idx - C, idx)) + list(range(idx + 1, idx + 1 + C))
        pos_words = self.text_encoded[pos_indices]
        # 多项式分布采样,取出指定个数的高频词
        neg_words = torch.multinomial(self.word_freqs, num_sampled + 2 * C, False)  # True)
        # 去掉正向标签
        neg_words = torch.Tensor(np.setdiff1d(neg_words.numpy(), pos_words.numpy())[:num_sampled]).long()
        return center_word, pos_words, neg_words


print('制作数据集...')
train_dataset = SkipGramDataset(training_label, dictionary, words, word_freq)
dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, drop_last=True, shuffle=True)


class Model(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super(Model, self).__init__()
        self.vocab_size = vocab_size
        self.embed_size = embed_size

        initrange = 0.5 / self.embed_size
        self.in_embed = nn.Embedding(self.vocab_size, self.embed_size, sparse=False)
        self.in_embed.weight.data.uniform_(-initrange, initrange)

    def forward(self, input_labels, pos_labels, neg_labels):
        input_embedding = self.in_embed(input_labels)

        pos_embedding = self.in_embed(pos_labels)
        neg_embedding = self.in_embed(neg_labels)

        log_pos = torch.bmm(pos_embedding, input_embedding.unsqueeze(2)).squeeze()
        log_neg = torch.bmm(neg_embedding, -input_embedding.unsqueeze(2)).squeeze()

        log_pos = F.logsigmoid(log_pos).sum(1)
        log_neg = F.logsigmoid(log_neg).sum(1)
        loss = log_pos + log_neg
        return -loss


model = Model(words_size, EMBEDDING_SIZE).to(device)
model.train()

valid_size = 16
valid_window = words_size / 2  # 取样数据的分布范围.
valid_examples = np.random.choice(int(valid_window), valid_size, replace=False)  # 0- words_size/2,中的数取16个。不能重复。

optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
NUM_EPOCHS = 20
for e in range(NUM_EPOCHS):
    for ei, (input_labels, pos_labels, neg_labels) in enumerate(dataloader):
        input_labels = input_labels.to(device)
        pos_labels = pos_labels.to(device)
        neg_labels = neg_labels.to(device)

        optimizer.zero_grad()
        loss = model(input_labels, pos_labels, neg_labels).mean()
        loss.backward()
        optimizer.step()

        if ei % 3000 == 0:
            print("epoch: {}, iter: {}, loss: {}".format(e, ei, loss.item()))
    if e % 1 == 0:
        norm = torch.sum(model.in_embed.weight.data.pow(2), -1).sqrt().unsqueeze(1)
        normalized_embeddings = model.in_embed.weight.data / norm
        valid_embeddings = normalized_embeddings[valid_examples]

        similarity = torch.mm(valid_embeddings, normalized_embeddings.T)
        for i in range(valid_size):
            valid_word = words[valid_examples[i]]
            top_k = 8  # 取最近的排名前8的词
            nearest = (-similarity[i, :]).argsort()[1:top_k + 1]
            log_str = 'Nearest to %s:' % valid_word
            for k in range(top_k):
                close_word = words[nearest[k].cpu().item()]
                log_str = '%s,%s' % (log_str, close_word)
            print(log_str)

使用 word2vec 预训练模型

100多种中文词向量 https://github.com/Embedding/Chinese-Word-Vectors

import torch
from tqdm import tqdm


class Word2VecRes:
    def __init__(self, model_name):
        f = open(model_name, 'r')
        summary = f.readline().split(' ')
        print(f'Loading word feature vector, len(words)={summary[0]}, len(vec)={summary[1]}')
        words, feature_vec = [], []

        pbar = tqdm(total=int(summary[0]))
        for line in f:
            lin = line.strip().split(' ')
            words.append(lin[0])
            feature_vec.append([float(item) for item in lin[1:]])
            pbar.update(1)

        word2idx = dict([[j, i] for i, j in enumerate(words)])
        feature_vec = torch.tensor(feature_vec)
        self.words = words
        self.word2idx = word2idx
        self.feature_vec = feature_vec
        f.close()

    def get_vec(self, words):
        valid_idx = [self.word2idx[word] for word in words]
        return self.feature_vec[valid_idx, :]

    def get_sim_from_vector(self, embeddings, top_k):
        similarity = torch.mm(embeddings, self.feature_vec.T)

        close_words = []
        for idx in range(similarity.shape[0]):
            nearest = (-similarity[idx, :]).argsort()[:top_k]
            close_word = [self.words[idx] for idx in nearest.tolist()]
            close_words.append(close_word)
        return close_words

    def get_similar(self, words, top_k=8):
        close_words = self.get_sim_from_vector(self.get_vec(words), top_k=top_k)
        return list(zip(words, close_words))


word2vec_res = Word2VecRes('sgns.target.word-word.dynwin5.thr10.neg5.dim300.iter5')

word2vec_res.get_vec(['第一'])

valid_words = ['第一', '已经', '批准', '东北', '迅速', '再次']

word2vec_res.get_similar(valid_words)

结果:

第一 close to: ['第二', '第', '名列第一', '百零四', '百零一', '百一十', '百零二', '章']
已经 close to: ['早已', '都已', '已', '早就', '已然', '业已', '已是', '快要']
批准 close to: ['核准', '批复', '备案', '国务院', '审批', '批准后', '核发', '行政主管']
东北 close to: ['西南', '西北', '东南', '东南部', '华北', '北部', '向东南', '西北部']
迅速 close to: ['快速', '较快', '飞速', '急速', '极快', '迅猛', '迅即', '最快']
再次 close to: ['再度', '再一', '重新', '一再', '初次', '数次', '次', '屡次']

答案获取:

def task_answer(question):
    question = [i.strip() for i in question.replace('=', '-').split('-')]
    tmp = word2vec_res.get_vec([question[2]]) - (
            word2vec_res.get_vec([question[0]]) - word2vec_res.get_vec([question[1]]))

    answers = word2vec_res.get_sim_from_vector(tmp, 4)
    for answer in answers[0]:
        if answer not in question:
            return answer


questions = [
    '薄伽丘-十日谈=司马迁-({})',
    '薄伽丘-十日谈=屈原-({})',
    '云南-昆明=西藏-({})',
    '斯德哥尔摩-瑞典=里斯本-({})',
    '男-女=国王-({})',
    '中国-人民币-日本-({})'

]
for question in questions:
    answer = task_answer(question)
    print(question.format(answer))

结果:

薄伽丘-十日谈=司马迁-(史记)
薄伽丘-十日谈=屈原-(楚辞)
云南-昆明=西藏-(拉萨)
斯德哥尔摩-瑞典=里斯本-(葡萄牙)
男-女=国王-(王后)
中国-人民币-日本-(日元)

参考文献

doc2vec 是 Quoc Le和Tomas Mikolov在2014提出的,原始论文


您的支持将鼓励我继续创作!