Это 4-й день моего участия в ноябрьском испытании обновлений.Подробности о событии:Вызов последнего обновления 2021 г.
TextRNN
TextRNN просто введите Word Embedding в двунаправленный LSTM, а затем введите вывод последнего бита в полностью подключенный слой, а затем выполните на нем классификацию softmax Модель выглядит следующим образом:
Код:
class RNN(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers=2, bidirectional=True, dropout=0.2, pad_idx=0):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.rnn = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers,batch_first=True,
bidirectional=bidirectional)
self.fc = nn.Linear(hidden_dim * 2, output_dim)
# 这里hidden_dim乘以2是因为是双向,需要拼接两个方向,跟n_layers的层数无关。
self.dropout = nn.Dropout(dropout)
def forward(self, text):
# text.shape=[seq_len, batch_size]
embedded = self.dropout(self.embedding(text))
# output: [batch,seq,2*hidden if bidirection else hidden]
# hidden/cell: [bidirec * n_layers, batch, hidden]
output, (hidden, cell) = self.rnn(embedded)
# concat the final forward (hidden[-2,:,:]) and backward (hidden[-1,:,:]) hidden layers
hidden = self.dropout(torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1))
# hidden = [batch size, hid dim * num directions],
return self.fc(hidden.squeeze(0)) # 在接一个全连接层,最终输出[batch size, output_dim]
TextRNN_ATT
Добавить механизм внимания на основе TextRNN, код:
class RNN_ATTs(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim,
n_layers=2, bidirectional=True, dropout=0.2, pad_idx=0, hidden_size2=64):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers,
bidirectional=bidirectional, batch_first=True, dropout=dropout)
self.tanh1 = nn.Tanh()
# self.u = nn.Parameter(torch.Tensor(config.hidden_size * 2, config.hidden_size * 2))
self.w = nn.Parameter(torch.zeros(hidden_dim * 2))
self.tanh2 = nn.Tanh()
self.fc1 = nn.Linear(hidden_dim * 2, hidden_size2)
self.fc = nn.Linear(hidden_size2, output_dim)
def forward(self, x):
emb = self.embedding(x) # [batch_size, seq_len, embeding]=[128, 32, 300]
H, _ = self.lstm(emb) # [batch_size, seq_len, hidden_size * num_direction]=[128, 32, 256]
M = self.tanh1(H) # [128, 32, 256]
# M = torch.tanh(torch.matmul(H, self.u))
alpha = F.softmax(torch.matmul(M, self.w), dim=1).unsqueeze(-1) # [128, 32, 1]
out = H * alpha # [128, 32, 256]
out = torch.sum(out, 1) # [128, 256]
out = F.relu(out)
out = self.fc1(out)
out = self.fc(out) # [128, 64]
return out
набор данных
Набор данных использует набор данных cnews, который содержит три файла, а именно cnews.train.txt, cnews.val.txt, cnews, test.txt. Категория: спорт, развлечения, дом, недвижимость, образование, мода, текущие события, игры, технологии, финансы, всего 10 категорий. Адрес сетевого диска:
Ссылка на сайт:Disk.Baidu.com/Yes/1A стал O для…Код извлечения: rtnv
Построить векторы слов
Первый шаг — прочитать ожидания и провести сегментацию слов.
Идеи:
1. Создайте объект сегментации слов по умолчанию seg.
2. Откройте файл и прочитайте статью построчно.
3. Удалите пробелы в конце и отделите метку от статьи.
4. Поместите сегментированную статью в src_data, а метку в labels.
5. Вернуть результат.
Я аннотировал код следующим образом:
def read_corpus(file_path):
"""读取语料
:param file_path:
:param type:
:return:
"""
src_data = []
labels = []
seg = pkuseg.pkuseg() #使用默认分词方式。
with codecs.open(file_path,'r',encoding='utf-8') as fout:
for line in tqdm(fout.readlines(),desc='reading corpus'):
if line is not None:
# line.strip()的意思是去掉每句话句首句尾的空格
# .split(‘\t’)的意思是根据'\t'把label和文章内容分开,label和内容是通过‘\t’隔开的。
# \t表示空四个字符,也称缩进,相当于按一下Tab键
pair = line.strip().split('\t')
if len(pair) != 2:
print(pair)
continue
src_data.append(seg.cut(pair[1]))# 对文章内容分词。
labels.append(pair[0])
return (src_data, labels) #返回文章内容的分词结果和labels
После этого шага получаются статьи после меток и сегментации слов. Следующий код:
src_sents, labels = read_corpus('cnews/cnews.train.txt')
Сопоставление меток:
labels = {label: idx for idx, label in enumerate(labels)}
Получить словарь idx, соответствующий меткам, значение idx — это значение последней вставленной метки.
Второй шаг — построить вектор слов
Этот шаг в основном использует метод from_corpus vocab.py.
Идеи:
1. Создайте объект vocab_entry.
2. Подсчитайте частоту слов в статье после сегментации слов и создайте словарь, состоящий из слов и частот слов.
3. Берём Top size - 2 элемента из словаря.
4. Получить слово элемента.
5. Выполните метод добавления, чтобы поместить слово в vocab_entry, сгенерировать слово и идентификатор, а идентификатор — это векторное значение, соответствующее слову.
код показывает, как показано ниже:
@staticmethod
def from_corpus(corpus, size, min_feq=3):
"""从给定语料中创建VocabEntry"""
vocab_entry = VocabEntry()
# chain函数来自于itertools库,itertools库提供了非常有用的基于迭代对象的函数,而chain函数则是可以串联多个迭代对象来形成一个更大的迭代对象
# *的作用:返回单个迭代器。
# word_freq是个字典,key=词,value=词频
word_freq = Counter(chain(*corpus)) # Counter 是实现的 dict 的一个子类,可以用来方便地计数,统计词频
valid_words = word_freq.most_common(size - 2) # most_common()函数用来实现Top n 功能,在这里选出Top size-2个词
valid_words = [word for word, value in valid_words if value >= min_feq] # 把符合要求的词找出来放到list里面。
print('number of word types: {}, number of word types w/ frequency >= {}: {}'
.format(len(word_freq), min_feq, len(valid_words)))
for word in valid_words: # 将词放进VocabEntry里面。
vocab_entry.add(word)
return vocab_entry
После завершения создания сохраните вектор слов в файл json.
vocab = Vocab.build(src_sents, labels, 50000, 3)
print('generated vocabulary, source %d words' % (len(vocab.vocab)))
vocab.save('./vocab.json')
тренироваться
Для обучения используйте Train_RNN.py, сначала посмотрите параметры основного метода.
параметр
parse = argparse.ArgumentParser()
parse.add_argument("--train_data_dir", default='./cnews/cnews.train.txt', type=str, required=False)
parse.add_argument("--dev_data_dir", default='./cnews/cnews.val.txt', type=str, required=False)
parse.add_argument("--test_data_dir", default='./cnews/cnews.test.txt', type=str, required=False)
parse.add_argument("--output_file", default='deep_model.log', type=str, required=False)
parse.add_argument("--batch_size", default=4, type=int)
parse.add_argument("--do_train", default=True, action="store_true", help="Whether to run training.")
parse.add_argument("--do_test", default=True, action="store_true", help="Whether to run training.")
parse.add_argument("--learnning_rate", default=5e-4, type=float)
parse.add_argument("--num_epoch", default=50, type=int)
parse.add_argument("--max_vocab_size", default=50000, type=int)
parse.add_argument("--min_freq", default=2, type=int)
parse.add_argument("--hidden_size", default=256, type=int)
parse.add_argument("--embed_size", default=300, type=int)
parse.add_argument("--dropout_rate", default=0.2, type=float)
parse.add_argument("--warmup_steps", default=0, type=int, help="Linear warmup over warmup_steps.")
parse.add_argument("--GRAD_CLIP", default=1, type=float)
parse.add_argument("--vocab_path", default='vocab.json', type=str)
Описание параметра:
train_data_dir: путь к тренировочному набору.
dev_data_dir: путь набора проверки
test_data_dir: путь набора тестов
output_file: путь к выходному журналу
batch_size: Размер партии.
do_train: тренироваться ли, по умолчанию True,
do_test: тестировать ли, по умолчанию True
Learning_rate: скорость обучения
num_epoch: количество эпох
max_vocab_size: количество векторов слов
min_freq: частота слов, фильтровать слова ниже этого значения
hidden_size: количество скрытых слоев
embed_size: длина встраивания.
dropout_rate: значение отсева.
Warmup_steps: Установите значение разогрева.
vocab_path: путь, по которому сохраняется вектор слов
Построить векторы слов
vocab = build_vocab(args)
label_map = vocab.labels
print(label_map)
Метод build_vocab:
def build_vocab(args):
if not os.path.exists(args.vocab_path):
src_sents, labels = read_corpus(args.train_data_dir)
labels = {label: idx for idx, label in enumerate(labels)}
vocab = Vocab.build(src_sents, labels, args.max_vocab_size, args.min_freq)
vocab.save(args.vocab_path)
else:
vocab = Vocab.load(args.vocab_path)
return vocab
Создать модель
Создайте модель CNN, поместите модель в GPU, вызовите метод обучения и обучите.
rnn_model = RNN_ATTs(len(vocab.vocab), args.embed_size, args.hidden_size,
len(label_map), n_layers=1, bidirectional=True, dropout=args.dropout_rate)
rnn_model.to(device)
train(args, rnn_model, train_data, dev_data, vocab, dtype='RNN')
Некоторые аннотации сделаны по методу поезда, а именно:
def train(args, model, train_data, dev_data, vocab, dtype='CNN'):
LOG_FILE = args.output_file
#记录训练log
with open(LOG_FILE, "a") as fout:
fout.write('\n')
fout.write('==========' * 6)
fout.write('start trainning: {}'.format(dtype))
fout.write('\n')
time_start = time.time()
if not os.path.exists(os.path.join('./runs', dtype)):
os.makedirs(os.path.join('./runs', dtype))
tb_writer = SummaryWriter(os.path.join('./runs', dtype))
# 计算总的迭代次数
t_total = args.num_epoch * (math.ceil(len(train_data) / args.batch_size))
#optimizer = bnb.optim.Adam8bit(model.parameters(), lr=0.001, betas=(0.9, 0.995)) # add bnb optimizer
optimizer = AdamW(model.parameters(), lr=args.learnning_rate, eps=1e-8)#设置优化器
scheduler = get_linear_schedule_with_warmup(optimizer=optimizer, num_warmup_steps=args.warmup_steps,
num_training_steps=t_total) #设置预热。
criterion = nn.CrossEntropyLoss()# 设置loss为交叉熵
global_step = 0
total_loss = 0.
logg_loss = 0.
val_acces = []
train_epoch = trange(args.num_epoch, desc='train_epoch')
for epoch in train_epoch:#训练epoch
model.train()
for src_sents, labels in batch_iter(train_data, args.batch_size, shuffle=True):
src_sents = vocab.vocab.to_input_tensor(src_sents, args.device)
global_step += 1
optimizer.zero_grad()
logits = model(src_sents)
y_labels = torch.tensor(labels, device=args.device)
example_losses = criterion(logits, y_labels)
example_losses.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), args.GRAD_CLIP)
optimizer.step()
scheduler.step()
total_loss += example_losses.item()
if global_step % 100 == 0:
loss_scalar = (total_loss - logg_loss) / 100
logg_loss = total_loss
with open(LOG_FILE, "a") as fout:
fout.write("epoch: {}, iter: {}, loss: {},learn_rate: {}\n".format(epoch, global_step, loss_scalar,
scheduler.get_lr()[0]))
print("epoch: {}, iter: {}, loss: {}, learning_rate: {}".format(epoch, global_step, loss_scalar,
scheduler.get_lr()[0]))
tb_writer.add_scalar("lr", scheduler.get_lr()[0], global_step)
tb_writer.add_scalar("loss", loss_scalar, global_step)
print("Epoch", epoch, "Training loss", total_loss / global_step)
eval_loss, eval_result = evaluate(args, criterion, model, dev_data, vocab) # 评估模型
with open(LOG_FILE, "a") as fout:
fout.write("EVALUATE: epoch: {}, loss: {},eval_result: {}\n".format(epoch, eval_loss, eval_result))
eval_acc = eval_result['acc']
if len(val_acces) == 0 or eval_acc > max(val_acces):
# 如果比之前的acc要da,就保存模型
print("best model on epoch: {}, eval_acc: {}".format(epoch, eval_acc))
torch.save(model.state_dict(), "classifa-best-{}.th".format(dtype))
val_acces.append(eval_acc)
time_end = time.time()
print("run model of {},taking total {} m".format(dtype, (time_end - time_start) / 60))
with open(LOG_FILE, "a") as fout:
fout.write("run model of {},taking total {} m\n".format(dtype, (time_end - time_start) / 60))
Сосредоточьтесь на аннотировании метода batch_iter следующим образом:
def batch_iter(data, batch_size, shuffle=False):
"""
batch数据
:param data: list of tuple
:param batch_size:
:param shuffle:
:return:
"""
batch_num = math.ceil(len(data) / batch_size)# 计算迭代的次数
index_array = list(range(len(data))) #按照data的长度,映射list
if shuffle:#是否打乱顺序
random.shuffle(index_array)
for i in range(batch_num):
indices = index_array[i*batch_size:(i+1)*batch_size]# 选出batchsize个index
examples = [data[idx] for idx in indices]# 通过index找到对应的data
examples = sorted(examples,key=lambda x: len(x[1]),reverse=True)#按照label排序
src_sents = [e[0] for e in examples] #把data中的文章放到src_sents
labels = [label_map[e[1]] for e in examples] #将标题映射label_map对应的value
yield src_sents, labels
Следующий важный метод — это vocab.vocab.to_input_tensor, основная идея:
1. Преобразовать данные в значение, соответствующее слову, с помощью метода self.words2indices.
2. Найдите самые длинные данные в пакете и добавьте 0 к остальным данным, чтобы получить одинаковую длину.
3. Поместите результат, полученный на втором шаге, в torch.tensor
код показывает, как показано ниже:
def to_input_tensor(self, sents: List[List[str]], device: torch.device):
"""
将原始句子list转为tensor,同时将句子PAD成max_len
:param sents: list of list<str>
:param device:
:return:
"""
sents = self.words2indices(sents)
sents = pad_sents(sents, self.word2id['<PAD>'])
sents_var = torch.tensor(sents, device=device)
return sents_var
Начать обучение:
проверять
Измените do_train на False и do_test на True, чтобы открыть модель проверки, и TextRNN может получить оценку 0,96.
parse.add_argument("--do_train", default=False, action="store_true", help="Whether to run training.")