본문 바로가기

Python/Pytorch

[pytorch] pytorch로 구현하는 Attention이 적용된 Seq2Seq

반응형

이번 포스팅에서는 pytorch로 Attention 기법이 적용된 Sequence to Sequence 모델을 직접 구현해 보자. 그리고 구현에 앞서 (필자가..) 주로 헷갈리는 시퀀스 데이터가 모델에 들어갈 때의 입/출력 형상이 어떤 과정으로 진행되는지도 살펴보자.

 

오픈소스 딥러닝 프레임워크 중 하나인 Pytorch


1. 시퀀스 데이터는 모델의 입력으로 어떤 형태(shape)로 들어갈까?

Tabular 데이터, 이미지 데이터와는 다르게 시퀀스 데이터는 시퀀스 길이(윈도우 길이)라는 게 존재한다. 그래서 시퀀스 데이터를 처음 다루는 사람에게는 시퀀스 데이터를 딥러닝 모델의 입력으로 넣어줄 때, 어떠한 형상으로 넣어주어야 하는지 또는 모델이 출력을 내뱉었을 때, 그 출력의 형상이 왜 이렇게 되는지에 대해서 헷갈릴 수 있다. 이번 목차에서는 그 헷갈리는 부분을 명확히 짚고 넘어가 보도록 하자.

 

먼저 시퀀스 데이터가 필요한데, 우리는 구텐베르크 프로젝트에것 제공하는 이솝 우화 모음집의 텍스트 데이터를 사용할 것이다. 해당 데이터를 다운로드하는 방법은 아래 내용의 셸 스크립트를 작성하고 실행시키면 된다.

 

FILE=$1
NAME=$2
URL=http://www.gutenberg.org/cache/epub/$FILE/pg$FILE.txt
TARGET_DIR=./data/$NAME/
mkdir -p $TARGET_DIR
TXT_FILE=./data/$NAME/data.txt
wget -N $URL -O $TXT_FILE

 

$ bash download.sh 11339 aesop

 

성공적으로 다운로드가 되었다면 아래 코드를 실행해서 데이터 로드 및 전처리 시켜주도록 하자.

 

import re
from torchtext.data.utils import get_tokenizer

filename = "./dataset/gutenberg/data/aesop/data.txt"

with open(filename, encoding='utf-8-sig') as f:
    text = f.read()

seq_length = 20
start_story = "| " * seq_length

text = text.lower()
text = start_story + text
text = text.replace("\n\n\n\n\n", start_story)
text = text.replace("\n", " ")
text = re.sub("  +", ". ", text).strip()
text = text.replace("..", ".")
text = re.sub('(["#$%&()*+,-./:;<=>?@[\]^_`{|}~])', r' \1 ', text)
text = re.sub('\s{2,}', ' ', text)

tokenizer = get_tokenizer(tokenizer='basic_english', language='en')
tokens = tokenizer(text)
vocab_size = len(set(tokens))
print(len(tokens)), print(vocab_size) # (57593, 4761)

 

위 로직에 대해 간단히 설명하면 특수 문자와 같은 불용어를 제거한 후, 단어 단위로 토큰화시킨 것이다. 실제로 위 코드 내의 tokens라는 변수 출력 내용의 일부를 캡처한 것이다.

 

텍스트 데이터를 토큰화한 토큰들 일부

 

이제 데이터가 준비되었으니 이제 텍스트 데이터를 시퀀스 데이터로 변환시켜보자. 먼저 시퀀스 길이는 20이라고 가정하겠다. 아래 그림은 텍스트 데이터를 시퀀스 데이터로 변환할 때의 과정을 나타낸 그림이다.

 

텍스트 데이터를 시퀀스 데이터로 변환

 

위 사진을 예시로 들어보자. 우리가 사용한 전체 텍스트의 토큰 개수는 57,593개이다. 그리고 이 텍스트를 구성하는 고유한 언어 개수 즉, 토큰 개수(Vocabulary size)는 4,761개이다. 여기서 토큰 개수를 단어 개수라고 하지 않은 이유는 텍스트를 토큰화하는 데는 크게 단어 기준으로 토큰화, 문자 기준으로 토큰화하는 방법으로 나뉘기 때문이다. 결국 공통적으로는 텍스트를 하나의 토큰씩으로 분할하는 것이기 때문에 '토큰 개수'라고 표현하였다.

 

가장 먼저 해줄 것은 모델에 집어넣을 $ 데이터를 만들어주는 것이다. 이때, $는 시퀀스 데이터이고, $는 $ 시퀀스 데이터를 넣었을 때 다음에 나올 텍스트 토큰 1개를 의미한다(문제에 따라 $Y$ 길이가 달라질 수 는 있음). 위 예시에서는 시퀀스 길이가 20 임을 가정했다. 위처럼 $X, Y$ 데이터를 만드는 예시 코드는 아래와 같다.(직전 목차에서 사용된 전처리 함수에 이어지는 코드이다)

 

import torch.nn as nn
import torch
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.preprocessing import LabelEncoder
import numpy as np

def generate_sequences(tokens: list[str], seq_length, step):
    text2int_encoder = LabelEncoder()
    tokens_list = text2int_encoder.fit_transform(tokens)
    vocab_size = len(set(tokens))

    x = []
    y = []

    for i in range(0, len(tokens_list) - seq_length, step):
        x.append(tokens_list[i: i + seq_length])
        y.append(tokens_list[i + seq_length])

    x = torch.Tensor(x).to(torch.int64)
    y = torch.Tensor(y).to(torch.int64)
    y = F.one_hot(y, num_classes=vocab_size)
        
    return x, y, vocab_size, text2int_encoder

 

 

위처럼 만들어진 시퀀스 데이터 $X, Y$를 이제 모델에 넣으면 된다. 보통 모델에 넣을 때는 임베딩 레이어를 거친 후 RNN 계열의 모델에 집어넣는 것이 일반적이다. 그러므로 아래 그림은 임베딩 레이어를 사용해서 RNN 모델에 넣어준다는 가정 하에 만들어본 그림이다.

 

시퀀스 데이터가 임베딩 레이어가 포함된 모델에 들어갈 때의 형상

 

 

임베딩 계층을 거칠 때는 2가지 파라미터가 존재하는데, 하나는 몇 개의 임베딩 벡터로 변환할지를 결정하는 embedding size가 있고, 학습 데이터인 전체 텍스트를 구성하는 고유한 토큰 개수와 동일한 number of embedding 이 있다. 참고로 해당 파라미터의 이름은 실제 pytorch의 임베딩 계층 클래스의 embedding_dim과 num_embeddings 파라미터에 대응된다.

 

위 그림에서 임베딩 계층을 거치고 나기 전/후의 $의 형상을 잘 참고해 보도록 하자. 마지막으로는 RNN 모델에 입력시키는 것이다. 이 부분은 위 그림으로 직관적으로 이해할 수 있기 때문에 따로 설명은 생략하겠다.

2. Attention이 적용된 Seq2Seq 모델 구현하기

1번 목차에서 텍스트 데이터가 어떠한 형태로 모델에 들어가는지, 그리고 출력되는지 구조를 알 수 있었다. 그러면 이를 기반으로 해서 pytorch로 Attention이 적용된 Seq2Seq 모델을 구현해보도록 하자. 구현 시 사용할 텍스트 데이터 및 전처리 방식은 pytorch tutorial 문서를 참조하였다.

 

먼저 Attention이 적용된 Seq2Seq 모델로 입력이 들어갈 경우, 어떻게 변환되는지 살펴보자. 참고로 아래 그림에 적용된 Attention 기법은 이전에 배운 [밑바닥부터 시작하는 딥러닝 2권 포스팅] 내용을 기반으로 하였다.

 

Attention이 적용된 Seq2Seq 모델로 입력/출력 텐서 형상

 

위 그림에서 주목할 점은 Encoder 부분에 들어가는 입력의 시퀀스는 모두 한 번에 들어간다. 하지만 Decoder는 예측하는 것이 목적이기 때문에 학습뿐만 아니라 inference를 할 때에도 하나씩 예측하도록 해야 한다. 그래서 위 그림을 보면 Encoder 그림은 시퀀스 길이 $T$만큼 한 번에 들어가는 구조로 설명했지만, Decoder 그림에서는 시퀀스 길이가 $T=1$로 하여 설명했다.

 

이제 튜토리얼에서 소개하는 [불어 → 영어] 번역 문제를 해결하는 모델을 만들기 위해 불어와 영어 시퀀스 데이터셋을 아래처럼 만들어주자.

 

from __future__ import unicode_literals, print_function, division
from io import open
from typing import List
import unicodedata
import string
import re
import random

import torch

class Lang:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: "SOS", 1: "EOS"}  # label <-> word mapping
        self.n_words = 2  # word에 대한 labeling 하기 위함

    def addSentence(self, sentence):
        for word in sentence.split(" "):
            self.addWord(word)

    def addWord(self, word):
        # word가 처음 등장한다면
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1  # 단어 등장 횟수 기록
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1

# unicode -> ascii 변환
def unicodeToAscii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )

# 소문자, 다듬기, 그리고 문자가 아닌 문자 제거
def normalizeString(s):
    s = unicodeToAscii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^a-zA-Z.!?]+", r" ", s)
    return s


def readLangs(lang1, lang2, reverse=False):
    print("Reading lines...")

    # 파일을 읽고 줄로 분리
    lines = open('dataset/data/%s-%s.txt' % (lang1, lang2), encoding='utf-8').\
        read().strip().split('\n')

    # 모든 줄을 쌍으로 분리하고 정규화
    pairs = [[normalizeString(s) for s in l.split('\t')] for l in lines]

    # 쌍을 뒤집고, Lang 인스턴스 생성
    if reverse:
        pairs = [list(reversed(p)) for p in pairs]
        input_lang = Lang(lang2)
        output_lang = Lang(lang1)
    else:
        input_lang = Lang(lang1)
        output_lang = Lang(lang2)

    return input_lang, output_lang, pairs


def filterPair(p):
    return len(p[0].split(' ')) < MAX_LENGTH and \
        len(p[1].split(' ')) < MAX_LENGTH and \
        p[1].startswith(eng_prefixes)


def filterPairs(pairs):
    return [pair for pair in pairs if filterPair(pair)]


def prepareData(lang1, lang2, reverse=False):
    input_lang, output_lang, pairs = readLangs(lang1, lang2, reverse)
    print("Read %s sentence pairs" % len(pairs))
    pairs = filterPairs(pairs)
    print("Trimmed to %s sentence pairs" % len(pairs))
    print("Counting words...")
    for pair in pairs:
        input_lang.addSentence(pair[0])
        output_lang.addSentence(pair[1])
    print("Counted words:")
    print(input_lang.name, input_lang.n_words)
    print(output_lang.name, output_lang.n_words)
    return input_lang, output_lang, pairs

def indexesFromSentence(lang, sentence):
    # 특정 단어의 index 번호를 리스트로 반환
    return [lang.word2index[word] for word in sentence.split(' ')]

def tensorFromSentence(lang, sentence, is_input=False):
    indexes: List[int] = indexesFromSentence(lang, sentence)
    indexes.append(EOS_TOKEN) # 문장의 끝이라고 표시하기 위한 토큰 끝에 추가
    tensor = torch.Tensor(indexes).to(torch.long)
    if is_input:
        return F.pad(tensor, pad=(0, MAX_LENGTH-tensor.size()[0])).view(1, -1)
    else:
        return tensor.view(1, -1, 1)

def tensorsFromPair(pair):
    global input_lang, output_lang
    input_tensor = tensorFromSentence(input_lang, pair[0], is_input=True) # french
    target_tensor = tensorFromSentence(output_lang, pair[1], is_input=True) # english
    return input_tensor, target_tensor

def verbose_shape(*tensors):
    for t in tensors:
        print(t.shape, end=' ')

def verbose_grad_fn(*tensors):
    for t in tensors:
        print(t._grad_fn)


# data preprocessing
SOS_TOKEN = 0
EOS_TOKEN = 1
MAX_LENGTH = 10

eng_prefixes = (
    "i am ", "i m ",
    "he is", "he s ",
    "she is", "she s ",
    "you are", "you re ",
    "we are", "we re ",
    "they are", "they re "
)

input_lang, output_lang, pairs = prepareData('eng', 'fra', True)

tensors = (tensorsFromPair(p) for p in pairs)

fren, engs = [], []
for t in tensors:
    fren.append(t[0])
    engs.append(t[1])

x_french = torch.cat(fren, dim=0)
y_english = torch.cat(engs, dim=0)

 

그리고 파이토치의 DataLoader를 사용하기 위해 pytorch의 Dataset 클래스를 상속받자.

 

from torch.utils.data import Dataset, DataLoader

class FrenchEnglishDataset(Dataset):
    def __init__(self, x_french: torch.Tensor, y_english: List[torch.Tensor]):
        self.x_fren = x_french
        self.y_eng = y_english

    def __len__(self):
        return len(self.x_fren)

    def __getitem__(self, idx):
        x_fren: torch.Tensor = self.x_fren[idx]
        y_eng: torch.Tensor = self.y_eng[idx]
        return x_fren, y_eng
        
dataset = FrenchEnglishDataset(x_french, y_english)
dataloader = DataLoader(dataset, batch_size=128)

 

그리고 이제 모델을 빌드하는 코드는 아래와 같다.

 

import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super(Encoder, self).__init__()

        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size

        self.embed_layer = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embedding_size)
        self.lstm = nn.LSTM(input_size=self.embedding_size, hidden_size=self.hidden_size, num_layers=1, batch_first=True)

    def forward(self, x):
        x = self.embed_layer(x)
        hs, (h, c) = self.lstm(x)
        return hs, h, c


class Decoder(nn.Module):
    def __init__(self, vocab_size, embedding_size, hidden_size):
        super(Decoder, self).__init__()

        self.vocab_size = vocab_size
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size

        self.embed_layer = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embedding_size)
        self.lstm = nn.LSTM(input_size=self.embedding_size, hidden_size=self.hidden_size, num_layers=1, batch_first=True)
        self.affine = nn.Linear(in_features=self.hidden_size*2, out_features=1)

    def forward(self, enc_hs, enc_h, enc_c, y):
        """ 여기서는 1개의 input sequence가 들어왔다고 가정하고 작성되어야 함
        """
        x = self.embed_layer(y)
        dec_hs, (dec_h, dec_c) = self.lstm(x, (enc_h, enc_c))

        #==========
        # Attention
        #==========
        # 1.가중치 계산
        s = torch.bmm(enc_hs, dec_hs.transpose(1, 2))
        w = F.softmax(s, dim=1)
        # 2.선택 작업
        c = torch.sum(enc_hs * w, dim=1, keepdim=True)
        verbose_shape(enc_hs, w, c)

        ch = torch.cat((c, dec_hs), dim=-1)
        z = self.affine(ch).view(-1,1)
        return z, w
    

def save_model(encoder: nn.Module, decoder: nn.Module, epoch: int, loss, attn_weights: torch.Tensor, x_fren: torch.Tensor, y_eng: torch.Tensor):
    ckpt = {"epoch": epoch,
            "loss": loss.data.item(),
            "encoder_state_dict": encoder.state_dict(),
            "decoder_state_dict": decoder.state_dict(),
            "attn_weights": attn_weights,
            "x_fren": x_fren,
            "y_eng": y_eng
           }
    
    torch.save(ckpt, f"seq2seq_ckpt/epoch_{epoch}.pt")
    print(f"ㄴ Epoch {epoch} checkpoint is successfully saved!")

 

Decoder 부분에서 입력 시퀀스와 아웃풋 시퀀스 간의 관계 즉, Attention 값을 살펴보도록 하기 위해서 Decoder의 forward(순전파) 시 리턴하는 값을 Attention matrix도 반환하도록 했다. 이제 학습하는 코드는 아래와 같다.

 

# params
french_vocab_size = input_lang.n_words
english_vocab_size = output_lang.n_words
embedding_size = 128
hidden_size = 256
learning_rate = 0.001
epochs = 2

# model
encoder = Encoder(french_vocab_size, embedding_size, hidden_size)
decoder = Decoder(english_vocab_size, embedding_size, hidden_size)

# Loss
criterion = nn.CrossEntropyLoss()

# optimizer
encoder_optimizer = Adam(encoder.parameters(), lr=learning_rate)
decoder_optimizer = Adam(decoder.parameters(), lr=learning_rate)

# train
n_batch = 1
for epoch in range(epochs):
    losses = []
    for x_fren, y_eng in dataloader:
        # init gradients
        encoder_optimizer.zero_grad()
        decoder_optimizer.zero_grad()
        
        # Encoder-forward
        enc_hs, enc_h, enc_c = encoder(x_fren)
    
        # Decoder-forward (1-step씩)
        B, T = y_eng.size()
        decoder_input = torch.zeros(B, 1, dtype=torch.long)
        decoder_outputs = []
        attn_weights = []
        for t in range(T):
            decoder_output, attn_w = decoder(enc_hs, enc_h, enc_c, decoder_input)
            decoder_outputs.append(decoder_output)
            attn_weights.append(attn_w)
            decoder_input = y_eng[:,t].view(-1,1) # Teacher-forcing
            
        decoder_outputs = torch.cat(decoder_outputs, dim=1).to(torch.float32)
        attn_weights = torch.cat(attn_weights, dim=-1)
    
        # loss
        decoder_labels = y_eng.to(torch.float32)
        loss = criterion(decoder_outputs, decoder_labels)
        loss.backward()
        losses.append(loss.item())
    
        # update params
        encoder_optimizer.step()
        decoder_optimizer.step()
    
        n_batch += 1
    print(f"Epoch({epoch+1})| N({n_batch}) -> Loss: {sum(losses)/len(losses): .3f}")
    if (epoch+1) % 5000 == 0:
        save_model(encoder, decoder, epoch+1, loss, attn_weights, x_fren, y_eng)

3. 모델이 학습한 Attention matrix 살펴보기

이제 모델이 학습한 후, 중간 중간 체크포인트로 남겨놓은 Attention Matrix를 살펴보자. 이것을 보면 입력 시퀀스와 출력 시퀀스 간에 어떤 단어들끼리 큰 연관을 갖고 있는지 살펴볼 수 있다.

 

Attention matrix 시각화 예시

 

입/출력 시퀀스 문장은 다음과 같다. 참고로 출력은 모델의 예측 값이 아닌 정답이다. 즉, 주어진 데이터셋이다.

  • 입력 : il entreprend des experiences dans son laboratoire.
  • 출력 : he is carrying out experiments in his laboratory.

이럴 때, Attention matrix를 보자. 색깔이 밝을수록 서로 관계가 크다는 뜻이다. 아직 모델 학습 초반기에 추출한 Attention matrix라서 관계를 아직 잘 학습하지 못한 부분도 있긴 하지만, 불어의 laboratoire과 영어의 laboratory, 불어의 laboratoire 과 영어의 experiments 단어가 서로 큰 관계를 보임을 알 수 있다.

반응형

'Python > Pytorch' 카테고리의 다른 글

[Pytorch] 텐서를 복사하는 방법 : clone, detach  (1) 2023.11.04