本文介绍: 分类用的是一个简单线性层,其维度为(768, token.vocab_size),其中token.vocab_sized大小为21128,即预测21128个词的分类分数,再与真实标签进行损失计算利用训练 bert 模型最后一个隐层的[cls] token特征进行中文分类利用 bert 模型提取特征,对最后一个隐层的第15个token特征进行分类;对训练数据的第15个词进行 mask 掉,预测第15个词;1-2–基于训练模型实现下游任务。1-1–使用预训练模型推理。3–中文句子关系推断

目录

1–中文分类

1-1–使用预训练模型推理

1-2–基于预训练模型实现下游任务

2–中文填空

3–中文句子关系推断


1–中文分类

1-1–使用预训练模型推理

代码实例

import torch
from datasets import load_dataset
from transformers import BertTokenizer, BertModel

# 定义全局分词工具
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')

# 定义数据class Dataset(torch.utils.data.Dataset):
    def __init__(self, split):
        self.dataset = load_dataset(path = 'lansinuote/ChnSentiCorp', split = split) # 加载数据def __len__(self):
        return len(self.dataset)

    def __getitem__(self, i):
        text = self.dataset[i]['text']
        label = self.dataset[i]['label']
        return text, label

# 自定义数据处理(加载)方式
def my_collate_fn(data): # data类型dataset 的返回值相同,本例中dataset返回一个列表[text, label]
    # 根据dataset的返回结果取出对应textlabel
    sents = [i[0] for i in data]
    labels = [i[1] for i in data]

    # 使用全局分词工具进行编码
    data = tokenizer.batch_encode_plus(batch_text_or_text_pairs = sents,
                                        truncation = True,
                                        padding = 'max_length',
                                        max_length = 500,
                                        return_tensors = 'pt',
                                        return_length = True)
    input_ids = data['input_ids']
    attention_mask = data['attention_mask']
    token_type_ids = data['token_type_ids']
    labels = torch.LongTensor(labels)
    return input_ids, attention_mask, token_type_ids, labels

def main():
    dataset = Dataset('train') # 初始化训练集
    # print(len(dataset), dataset[0])
    
    # 定义dataloader
    loader = torch.utils.data.DataLoader(dataset = dataset,
                                        batch_size = 16,
                                        collate_fn = my_collate_fn,
                                        shuffle = True,
                                        drop_last = True)
    
    # 遍历dataloader加载数据
    for i, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
        break
    print(len(loader))
    print(input_ids.shape, attention_mask.shape, token_type_ids.shape, labels) # 打印一个样本
    
    # 加载预训练模型
    model = BertModel.from_pretrained('bert-base-chinese')
    for param in model.parameters(): # 不进行梯度计算反向传播
        param.requires_grad_(False)
    # 调用预训练模型推理一个样本    
    output = model(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)
    print(output.last_hidden_state.shape) # 打印最后一个隐层输出特征的维度

if __name__ == "__main__":
    main()
    print("All done!")

输出结果

# dataloader单个样本torch.Size([16, 500]) 
torch.Size([16, 500]) 
torch.Size([16, 500]) 
tensor([1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1])
# 最后一个隐层的输出特征:
torch.Size([16, 500, 768])

1-2–基于预训练模型实现下游任务

        利用预训练 bert 模型最后一个隐层的[cls] token的特征进行中文分类

代码

import torch
from datasets import load_dataset
from transformers import BertTokenizer, BertModel, AdamW

# 定义全局分词工具
tokenizer = BertTokenizer.from_pretrained('bert-base-chinese')

# 定义数据class Dataset(torch.utils.data.Dataset):
    def __init__(self, split):
        self.dataset = load_dataset(path = 'lansinuote/ChnSentiCorp', split = split) # 加载数据集

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, i):
        text = self.dataset[i]['text']
        label = self.dataset[i]['label']
        return text, label

# 自定义数据处理(加载)方式
def my_collate_fn(data): # data 的类型与 dataset 的返回值相同,本例中dataset返回一个列表[text, label]
    # 根据dataset的返回结果取出对应的text和label
    sents = [i[0] for i in data]
    labels = [i[1] for i in data]

    # 使用全局的分词工具进行编码
    data = tokenizer.batch_encode_plus(batch_text_or_text_pairs = sents,
                                        truncation = True,
                                        padding = 'max_length',
                                        max_length = 500,
                                        return_tensors = 'pt',
                                        return_length = True)
    input_ids = data['input_ids']
    attention_mask = data['attention_mask']
    token_type_ids = data['token_type_ids']
    labels = torch.LongTensor(labels)
    return input_ids, attention_mask, token_type_ids, labels

# 定义下游任务模型
class Model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.pretrained_model = BertModel.from_pretrained('bert-base-chinese') # 加载预训练模型
        self.fc = torch.nn.Linear(768, 2)
        
        # 固定预训练模型
        for param in self.pretrained_model.parameters():
            param.requires_grad = False

    def forward(self, input_ids, attention_mask, token_type_ids):
        with torch.no_grad():
            output = self.pretrained_model(input_ids=input_ids,
                       attention_mask=attention_mask,
                       token_type_ids=token_type_ids)

        output = self.fc(output.last_hidden_state[:, 0]) # 利用最后一个隐层的[cls]token特征进行分类

        output = output.softmax(dim=1)

        return output

# 定义测试函数
def test(model, dataset):
    model.eval()
    correct = 0
    total = 0
    # 定义加载测试集的dataloader
    loader_test = torch.utils.data.DataLoader(dataset = dataset,
                                              batch_size = 32,
                                              collate_fn = my_collate_fn,
                                              shuffle = True,
                                              drop_last = True)
    for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
        if idx == 5: # 测试5个batch
            break
        print(idx)
        with torch.no_grad():
            input_ids = input_ids.cuda()
            attention_mask = attention_mask.cuda()
            token_type_ids = token_type_ids.cuda()
            labels = labels.cuda()
            output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

        output = output.argmax(dim=1)
        correct += (output == labels).sum().item()
        total += len(labels)
    print("Acc: ", correct / total) # 打印5个batch的总体准确率

def main():
    dataset = Dataset('train') # 初始化训练集
    # print(len(dataset), dataset[0])
    
    # 定义dataloader
    loader = torch.utils.data.DataLoader(dataset = dataset,
                                        batch_size = 16,
                                        num_workers = 8,
                                        collate_fn = my_collate_fn,
                                        shuffle = True,
                                        drop_last = True)
    # 初始化模型
    model = Model()
    model = model.cuda() # 使用GPU

    # 初始化优化器和损失函数
    optimizer = AdamW(model.parameters(), lr=5e-4)
    criterion = torch.nn.CrossEntropyLoss().cuda()
    
    # 训练模型
    model.train()
    for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader): # 遍历加载数据
        input_ids = input_ids.cuda()
        attention_mask = attention_mask.cuda()
        token_type_ids = token_type_ids.cuda()
        labels = labels.cuda()
        output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if idx % 5 == 0: # 每5个batch打印当前准确率损失
            output = output.argmax(dim=1)
            accuracy = (output == labels).sum().item() / len(labels)
            print(idx, loss.item(), accuracy)
        if idx == 300: # 使用300个batch进行训练
            break
        
    # 测试模型
    test(model, Dataset('validation'))

if __name__ == "__main__":
    main()

部分输出结果

...
260 0.5995925664901733 0.75
265 0.3791050910949707 1.0
270 0.42692136764526367 0.9375
275 0.4765201210975647 0.875
280 0.4071955382823944 0.9375
285 0.4194560945034027 0.875
290 0.449373722076416 0.9375
295 0.38813596963882446 1.0
300 0.5164415240287781 0.875
Acc:  0.89375

2–中文填空

        对训练数据的第15个词进行 mask 掉,预测第15个词;

        利用 bert 模型提取特征,对最后一个隐层的第15个token特征进行分类

        分类用的是一个简单的线性层,其维度为(768, token.vocab_size),其中token.vocab_sized大小为21128,即预测21128个词的分类分数,再与真实标签进行损失计算

代码

import torch
from datasets import load_dataset, load_from_disk
from transformers import BertTokenizer, BertModel, AdamW

# 定义全局分词工具
token = BertTokenizer.from_pretrained('bert-base-chinese')

# 定义数据class Dataset(torch.utils.data.Dataset):
    def __init__(self, split):
        dataset = load_dataset(path = 'lansinuote/ChnSentiCorp', split = split)
        # dataset = load_from_disk('./data/ChnSentiCorp')
        # dataset = dataset[split]

        def f(data):
            return len(data['text']) > 30
        self.dataset = dataset.filter(f) # 筛选数据集

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, i):
        text = self.dataset[i]['text']

        return text
        
def collate_fn(data):
    # batch编码
    data = token.batch_encode_plus(batch_text_or_text_pairs = data,
                                   truncation = True,
                                   padding = 'max_length',
                                   max_length = 30, # padding到30个词
                                   return_tensors = 'pt', # 返回pytorch格式
                                   return_length = True)

    input_ids = data['input_ids']
    attention_mask = data['attention_mask']
    token_type_ids = data['token_type_ids']

    # 把第15个词固定替换为mask
    labels = input_ids[:, 15].reshape(-1).clone() # 记录真实标签
    input_ids[:, 15] = token.get_vocab()[token.mask_token]

    return input_ids, attention_mask, token_type_ids, labels

# 定义下游任务模型
class Model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.decoder = torch.nn.Linear(768, token.vocab_size, bias=False) # token.vocab_size为21128,预测21128个词的分类分数
        self.bias = torch.nn.Parameter(torch.zeros(token.vocab_size))
        self.decoder.bias = self.bias
        self.pretrained = BertModel.from_pretrained('bert-base-chinese')
        
        # 固定预训练模型
        for param in self.pretrained.parameters():
            param.requires_grad = False

    def forward(self, input_ids, attention_mask, token_type_ids):
        # 使用bert模型提取特征
        with torch.no_grad():
            output = self.pretrained(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

        output = self.decoder(output.last_hidden_state[:, 15])
        return output

# 测试
def test(model):
    model.eval()
    correct = 0
    total = 0

    loader_test = torch.utils.data.DataLoader(dataset = Dataset('test'), 
                                              batch_size = 32, 
                                              collate_fn = collate_fn, 
                                              shuffle = True, 
                                              drop_last = True)

    for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
        input_ids = input_ids.cuda()
        attention_mask = attention_mask.cuda()
        token_type_ids = token_type_ids.cuda()
        labels = labels.cuda()
        
        if idx == 15: # 测试15个batch
            break
        with torch.no_grad():
            output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

        output = output.argmax(dim=1)
        correct += (output == labels).sum().item()
        total += len(labels)
        print(token.decode(input_ids[0])) # 打印测试数据
        print("真实标签: ", token.decode(labels[0]), "预测标签: ", token.decode(output[0]))

    print("Acc: ", correct / total)


def main():
    # 初始化训练集
    dataset = Dataset('train')
    # 定义dataloader
    loader = torch.utils.data.DataLoader(dataset = dataset,
                                            batch_size = 16,
                                            collate_fn = collate_fn,
                                            shuffle = True,
                                            drop_last = True)
    # 初始化模型
    model = Model().cuda()
    
    # 训练
    optimizer = AdamW(model.parameters(), lr=5e-4)
    criterion = torch.nn.CrossEntropyLoss().cuda()
    model.train()
    
    for epoch in range(5):
        for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
            input_ids = input_ids.cuda()
            attention_mask = attention_mask.cuda()
            token_type_ids = token_type_ids.cuda()
            labels = labels.cuda()
            output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)
            loss = criterion(output, labels)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

            if idx % 50 == 0:
                output = output.argmax(dim=1)
                accuracy = (output == labels).sum().item() / len(labels)
                print(epoch, idx, loss.item(), accuracy)
    # 测试模型            
    test(model)

if __name__ == "__main__":
    main()

部分输出结果:

4 200 0.7910566329956055 0.75
4 250 0.9690109491348267 0.8125
4 300 0.4056988060474396 0.9375
4 350 0.31916332244873047 1.0
4 400 0.8943865895271301 0.6875
4 450 0.4540601968765259 0.9375
4 500 0.7437821626663208 0.75
4 550 0.3669029474258423 0.9375
[CLS] 之 前 看 到 很 多 评 论 , 说 很 好 兴 冲 [MASK] 买 了 , 看 了 看 感 觉 非 常 失 望 炒 [SEP]
真实标签:  冲 预测标签:  冲
[CLS] 刚 看 了 一 章 就 丢 边 上 了 , 光 盘 也 [MASK] 知 道 放 哪 里 了 , 很 垃 圾 , 很 多 [SEP]
真实标签:  不 预测标签:  不
[CLS] 酒 店 生 意 清 淡 , 大 堂 里 都 没 几 个 [MASK] 人 。 房 间 倒 是 不 小 , 但 觉 得 被 [SEP]
真实标签:  客 预测标签:  客
[CLS] 选 购 的 时 候 , 还 是 有 货 的 。 最 后 [MASK] 通 知 我 说 , 没 货 了 。 当 当 的 服 [SEP]
真实标签:  却 预测标签:  ,
[CLS] 简 单 , 大 方 , 在 同 类 尺 寸 的 款 型 [MASK] 笔 记 本 中 不 显 厚 重 , 轻 薄 感 ! [SEP]
真实标签:  的 预测标签:  的
[CLS] 当 时 是 同 事 极 力 推 荐 这 本 书 。 我 [MASK] 到 网 上 的 介 绍 和 那 么 多 的 [UNK] 名 [SEP]
真实标签:  看 预测标签:  看
[CLS] 酒 店 位 置 离 火 车 站 很 近 , 走 路 10 [MASK] 钟 不 到 就 能 到 。 。 。 服 务 不 错 [SEP]
真实标签:  分 预测标签:  分
[CLS] 买 之 前 也 没 见 过 这 本 书, 听 他 们 [MASK] 的 天 花 乱 坠, 翻 了 几 页 就 够 了 [SEP]
真实标签:  说 预测标签:  写
[CLS] 看 了 百 家 讲 坛 , 来 了 兴 趣 。 因 为 [MASK] 作 忙 , 只 看 了 一 点 点 , 因 此 买 [SEP]
真实标签:  工 预测标签:  工
[CLS] 第 一 次 买 的 拉 拉 升 职 记 , 三 天 到 [MASK] ( 我 住 北 京 三 环 边 上 ) , 还 比 [SEP]
真实标签:  货 预测标签:  手
[CLS] 房 间 隔 音 效 果 极 差 , 深 夜 如 果 隔 [MASK] 客 人 大 声 喧 哗 的 话 [UNK] [UNK] 服 务 员 [SEP]
真实标签:  壁 预测标签:  音
[CLS] 读 过 她 的 《 茶 人 三 部 曲 》 , 一 口 [MASK] 读 完 的 。 一 直 在 搜 寻 她 的 文 字 [SEP]
真实标签:  气 预测标签:  气
[CLS] 条 件 、 服 务 、 设 施 都 很 好 , 房 间 [MASK] 干 净 、 很 舒 适 , 尤 其 是 前 台 服 [SEP]
真实标签:  很 预测标签:  很
[CLS] 大 俗 即 大 雅 ! 这 是 看 郑 振 铎 先 生 [MASK] 部 书 后 最 由 衷 的 感 想 , 看 过 中 [SEP]
真实标签:  三 预测标签:  这
[CLS] 觉 得 相 当 没 意 思 的 一 本 书 。 不 伦 [MASK] 类 的 。 看 的 时 候 很 纠 结 , 看 完 [SEP]
真实标签:  不 预测标签:  不
Acc:  0.6979166666666666

3–中文句子关系推断

代码

import torch
import random
from datasets import load_dataset, load_from_disk
from transformers import BertTokenizer, BertModel, AdamW

# 定义全局分词工具
token = BertTokenizer.from_pretrained('bert-base-chinese')

# 定义数据class Dataset(torch.utils.data.Dataset):
    def __init__(self, split):
        # dataset = load_dataset(path='lansinuote/ChnSentiCorp', split=split)
        dataset = load_from_disk('./data/ChnSentiCorp')
        dataset = dataset[split]

        def f(data):
            return len(data['text']) > 40

        self.dataset = dataset.filter(f)

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, i):
        text = self.dataset[i]['text']

        # 切分一句话为前半句和后半句
        sentence1 = text[:20]
        sentence2 = text[20:40]
        label = 0 # label为0表示为同一句

        # 有一半的概率把后半句替换为一句无关的话
        if random.randint(0, 1) == 0:
            j = random.randint(0, len(self.dataset) - 1)
            sentence2 = self.dataset[j]['text'][20:40]
            label = 1

        return sentence1, sentence2, label

def collate_fn(data):
    sents = [i[:2] for i in data]
    labels = [i[2] for i in data]

    # 编码
    data = token.batch_encode_plus(batch_text_or_text_pairs = sents,
                                   truncation = True,
                                   padding = 'max_length',
                                   max_length = 45,
                                   return_tensors = 'pt',
                                   return_length = True,
                                   add_special_tokens = True)

    input_ids = data['input_ids']
    attention_mask = data['attention_mask']
    token_type_ids = data['token_type_ids']
    labels = torch.LongTensor(labels)

    return input_ids, attention_mask, token_type_ids, labels

# 定义下游任务模型
class Model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = torch.nn.Linear(768, 2) # 二分类
        self.pretrained = BertModel.from_pretrained('bert-base-chinese')
        
        # 固定预训练模型
        for param in self.pretrained.parameters():
            param.requires_grad = False

    def forward(self, input_ids, attention_mask, token_type_ids):
        with torch.no_grad():
            output = self.pretrained(input_ids = input_ids, attention_mask = attention_mask, token_type_ids = token_type_ids)

        output = self.fc(output.last_hidden_state[:, 0])
        output = output.softmax(dim=1)
        return output
 
def main():
    model = Model().cuda()
    optimizer = AdamW(model.parameters(), lr=5e-4)
    criterion = torch.nn.CrossEntropyLoss().cuda() 
    
    # dataloader
    loader = torch.utils.data.DataLoader(dataset = Dataset('train'),
                                        batch_size = 8,
                                        collate_fn = collate_fn,
                                        shuffle = True,
                                        drop_last = True)  
    # 训练
    model.train()
    
    for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader):
        input_ids = input_ids.cuda()
        attention_mask = attention_mask.cuda()
        token_type_ids = token_type_ids.cuda()
        labels = labels.cuda()
        output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if idx % 5 == 0: # 每5个batch打印
            output = output.argmax(dim=1)
            accuracy = (output == labels).sum().item() / len(labels)
            print(idx, loss.item(), accuracy)

        if idx == 300: # 训练300个batch
            break
    
    # 测试
    test(model)

# 定义测试函数
def test(model):
    model.eval()
    correct = 0
    total = 0
    loader_test = torch.utils.data.DataLoader(dataset = Dataset('test'),
                                              batch_size = 32,
                                              collate_fn = collate_fn,
                                              shuffle = True,
                                              drop_last = True)

    for idx, (input_ids, attention_mask, token_type_ids, labels) in enumerate(loader_test):
        input_ids = input_ids.cuda()
        attention_mask = attention_mask.cuda()
        token_type_ids = token_type_ids.cuda()
        labels = labels.cuda()
        if idx == 5: # 测试5个batch
            break
        with torch.no_grad():
            output = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids)

        pred = output.argmax(dim=1)
        correct += (pred == labels).sum().item()
        total += len(labels)

    print('acc:', correct / total)
    
if __name__ == "__main__":
    main()

部分运行结果:

240 0.39283961057662964 0.875
245 0.7069525122642517 0.5
250 0.41953372955322266 0.875
255 0.5032698512077332 0.75
260 0.6422066688537598 0.75
265 0.5467717051506042 0.75
270 0.4452913701534271 0.875
275 0.5998544096946716 0.625
280 0.4301206171512604 0.875
285 0.5177156329154968 0.75
290 0.3987200856208801 0.875
295 0.33609679341316223 1.0
300 0.3723036050796509 0.875
acc: 0.925

原文地址:https://blog.csdn.net/weixin_43863869/article/details/134657636

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_27548.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注