본문 바로가기

Machine Learning Models/Transformer

Transformer 구현

반응형

지난 포스트

[Machine Learning/Architecture] - Transformer


이번 포스트에서는 Transformer Pytorch 구현에 대해 알아보도록 하겠습니다. 먼저, 이번 포스트에서 다룰 코드는 고현웅님의 Transformer github 레파지토리에서 발췌한 것임을 미리 밝힙니다. (Transformer 의 각 구성 요소별로 코드 정리가 잘 되어있습니다.)

Scaled dot product attention

Figure 1

Figure 1을 구현하는 블락입니다. 이때, multi-head attention 을 위해 $Q, K, V$가 head 개수만큼 분리되어 [batch size, head, length (입력 길이), d (차원)] 의 4차원 텐서입니다. 코드에서 "@" 은 pytorch 의 "matmul" 함수와 기능이 같습니다. "q @ k_t" 부분이 attention score 를 계산하는 부분으로 q는 [batch size, head, length, d], k_t는 [batch size, head, d, length] 의 4차원 텐서로 matmul 결과 [batch size, head, length, length] 4차원 텐서가 출력됩니다. 또한, decoder 단의 auto-regressive 성질을 위해 mask 를 forward 함수의 아규먼트로 전달하면 score 텐서에 "masked_fill" 함수를 이용하여 주어진 mask 가 0인 부분을 $-\infty$로 채웁니다. 이후 softmax 함수를 전달하면 마지막 차원에 대해 확률이 계산되고 여기에 value 와의 weighted sum 을 수행하여 [batch size, head, length, d] 의 최종 attension score 행렬을 리턴합니다.

class ScaleDotProductAttention(nn.Module):
    """
    compute scale dot product attention

    Query : given sentence that we focused on (decoder)
    Key : every sentence to check relationship with Qeury(encoder)
    Value : every sentence same with Key (encoder)
    """

    def __init__(self):
        super(ScaleDotProductAttention, self).__init__()
        self.softmax = nn.Softmax()

    def forward(self, q, k, v, mask=None, e=1e-12):
        # input is 4 dimension tensor
        # [batch_size, head, length, d_tensor]
        batch_size, head, length, d_tensor = k.size()

        # 1. dot product Query with Key^T to compute similarity
        k_t = k.view(batch_size, head, d_tensor, length)  # transpose
        score = (q @ k_t) / math.sqrt(d_tensor)  # scaled dot product

        # 2. apply masking (opt)
        if mask is not None:
            score = score.masked_fill(mask == 0, -e)

        # 3. pass them softmax to make [0, 1] range
        score = self.softmax(score)

        # 4. multiply with Value
        v = score @ v

        return v, score

 

Multi-head attention

Figure 2

Figure 2를 구현하는 블락입니다. 먼저 "split" 함수를 통해 $W^Q, W^K, W^V$를 head 개수별로 쪼개주어 4차원 텐서를 만들고 scaled dot-product attention 을 수행하고 "concat" 함수를 통해 이어붙인 후 $W^O$와 (코드에서는 self.w_concat 변수) 선형 변환을 수행합니다.

class MultiHeadAttention(nn.Module):

    def __init__(self, d_model, n_head):
        super(MultiHeadAttention, self).__init__()
        self.n_head = n_head
        self.attention = ScaleDotProductAttention()
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_concat = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        # 1. dot product with weight matrices
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)

        # 2. split tensor by number of heads
        q, k, v = self.split(q), self.split(k), self.split(v)

        # 3. do scale dot product to compute similarity
        out, attention = self.attention(q, k, v, mask=mask)
        
        # 4. concat and pass to linear layer
        out = self.concat(out)
        out = self.w_concat(out)

        # 5. visualize attention map
        # TODO : we should implement visualization

        return out

    def split(self, tensor):
        """
        split tensor by number of head

        :param tensor: [batch_size, length, d_model]
        :return: [batch_size, head, length, d_tensor]
        """
        batch_size, length, d_model = tensor.size()

        d_tensor = d_model // self.n_head
        tensor = tensor.view(batch_size, self.n_head, length, d_tensor)
        # it is similar with group convolution (split by number of heads)

        return tensor

    def concat(self, tensor):
        """
        inverse function of self.split(tensor : torch.Tensor)

        :param tensor: [batch_size, head, length, d_tensor]
        :return: [batch_size, length, d_model]
        """
        batch_size, head, length, d_tensor = tensor.size()
        d_model = head * d_tensor

        tensor = tensor.view(batch_size, length, d_model)
        return tensor

 

Layer normalization

각 sublayer 에서 residual connection 을 한 이후에 layer normalization 을 수행합니다. ($LayerNorm(x+SubLayer(x))$) Layer Normalization 은 batch normalization 과 유사하나 batch normalization 이 미니배치 축에 대해 정규화를 수행한다면 layer normalization 은 채널 축에 대해 정규화를 수행합니다. Figure 3과 같이 $\gamma, \beta$를 통한 scaling 은 같습니다. 특히, 채널 단위로 정규화를 수행하다보니 batch normalization 과 같이 테스트시에 필요한 moving mean / std 가 필요하지 않습니다.

class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-12):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        std = x.std(-1, keepdim=True)
        # '-1' means last dimension. 

        out = (x - mean) / (std + self.eps)
        out = self.gamma * out + self.beta
        return out

Figure 3

 

Position-wise feed-forward networks

이 부분은 단순한 feed forward networks 로 입력, 출력 모두 [batch size, length, 512] 의 3차원 텐서입니다. 코드에서 hidden 아규먼트는 논문에서의 $d_{ff}=2048$ 입니다.

class PositionwiseFeedForward(nn.Module):

    def __init__(self, d_model, hidden, drop_prob=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, hidden)
        self.linear2 = nn.Linear(hidden, d_model)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=drop_prob)

    def forward(self, x):
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.linear2(x)
        return x

 

Positional encoding

시퀀스의 위치 정보를 알려주는 positional encoding 부분입니다. 먼저 [max_len, d] 차원의 positional encoding 을 담을 텐서를 생성하고 학습시키지 않으니 텐서의 requires_grad 를 False 로 설정합니다. 이후에는 논문처럼 각 위치, 차원 별로 sinusoidal 함수를 적용합니다.

class PositionalEncoding(nn.Module):
    """
    compute sinusoid encoding.
    """
    def __init__(self, d_model, max_len, device):
        """
        constructor of sinusoid encoding class

        :param d_model: dimension of model
        :param max_len: max sequence length
        :param device: hardware device setting
        """
        super(PositionalEncoding, self).__init__()

        # same size with input matrix (for adding with input matrix)
        self.encoding = torch.zeros(max_len, d_model, device=device)
        self.encoding.requires_grad = False  # we don't need to compute gradient

        pos = torch.arange(0, max_len, device=device)
        pos = pos.float().unsqueeze(dim=1)
        # 1D => 2D unsqueeze to represent word's position

        _2i = torch.arange(0, d_model, step=2, device=device).float()
        # 'i' means index of d_model (e.g. embedding size = 50, 'i' = [0,50])
        # "step=2" means 'i' multiplied with two (same with 2 * i)

        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / d_model)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / d_model)))
        # compute positional encoding to consider positional information of words

    def forward(self, x):
        # self.encoding
        # [max_len = 512, d_model = 512]

        batch_size, seq_len = x.size()
        # [batch_size = 128, seq_len = 30]

        return self.encoding[:seq_len, :]
        # [seq_len = 30, d_model = 512]
        # it will add with tok_emb : [128, 30, 512]   

 

Encoder

먼저 하나의 encoder layer 를 구현합니다. Encoder 는 1) multi-head attention, 2) feed-forward neural networks sublayer 로 구성되어 있고 각 sublayer 는 residual connection 이후 layer normalization 이 적용됩니다.

class EncoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm1 = LayerNorm(d_model=d_model)
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm2 = LayerNorm(d_model=d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)

    def forward(self, x, src_mask):
        # 1. compute self attention
        _x = x
        x = self.attention(q=x, k=x, v=x, mask=src_mask)
        
        # 2. add and norm
        x = self.norm1(x + _x)
        x = self.dropout1(x)
        
        # 3. positionwise feed forward network
        _x = x
        x = self.ffn(x)
      
        # 4. add and norm
        x = self.norm2(x + _x)
        x = self.dropout2(x)
        return x

이를 기반으로 전체 encoder 를 다음과 같이 구성합니다. 먼저 맨 처음 입력에 대해 임베딩을 수행하여 각 위치 별로 512 차원의 벡터를 생성하고 지정한 encoder layer 수만큼 encoder 를 구성합니다.

class Encoder(nn.Module):

    def __init__(self, enc_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super().__init__()
        self.emb = TransformerEmbedding(d_model=d_model,
                                        max_len=max_len,
                                        vocab_size=enc_voc_size,
                                        drop_prob=drop_prob,
                                        device=device)

        self.layers = nn.ModuleList([EncoderLayer(d_model=d_model,
                                                  ffn_hidden=ffn_hidden,
                                                  n_head=n_head,
                                                  drop_prob=drop_prob)
                                     for _ in range(n_layers)])

    def forward(self, x, src_mask):
        x = self.emb(x)

        for layer in self.layers:
            x = layer(x, src_mask)

        return x

 

Decoder

Decoder 는 encoder 와 거의 유사하나 encoder decoder multi-head attention sublayer 가 추가되고 auto-regressive 를 위한 마스크가 존재합니다. 코드의 forward 부분에서 encoder 의 출력을 "enc" 아규먼트로 받고 마스크 또한 아규먼트로 받습니다.

class DecoderLayer(nn.Module):

    def __init__(self, d_model, ffn_hidden, n_head, drop_prob):
        super(DecoderLayer, self).__init__()
        self.self_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm1 = LayerNorm(d_model=d_model)
        self.dropout1 = nn.Dropout(p=drop_prob)

        self.enc_dec_attention = MultiHeadAttention(d_model=d_model, n_head=n_head)
        self.norm2 = LayerNorm(d_model=d_model)
        self.dropout2 = nn.Dropout(p=drop_prob)

        self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
        self.norm3 = LayerNorm(d_model=d_model)
        self.dropout3 = nn.Dropout(p=drop_prob)

    def forward(self, dec, enc, trg_mask, src_mask):    
        # 1. compute self attention
        _x = dec
        x = self.self_attention(q=dec, k=dec, v=dec, mask=trg_mask)
        
        # 2. add and norm
        x = self.norm1(x + _x)
        x = self.dropout1(x)

        if enc is not None:
            # 3. compute encoder - decoder attention
            _x = x
            x = self.enc_dec_attention(q=x, k=enc, v=enc, mask=src_mask)
            
            # 4. add and norm
            x = self.norm2(x + _x)
            x = self.dropout2(x)

        # 5. positionwise feed forward network
        _x = x
        x = self.ffn(x)
        
        # 6. add and norm
        x = self.norm3(x + _x)
        x = self.dropout3(x)
        return x

이후에는 encoder 와 마찬가지로 정의한 layer 수만큼 decoder layer 를 쌓고 마지막에는 vocab_size 만큼의 선형 변환을 수행합니다.

class Decoder(nn.Module):
    def __init__(self, dec_voc_size, max_len, d_model, ffn_hidden, n_head, n_layers, drop_prob, device):
        super().__init__()
        self.emb = TransformerEmbedding(d_model=d_model,
                                        drop_prob=drop_prob,
                                        max_len=max_len,
                                        vocab_size=dec_voc_size,
                                        device=device)

        self.layers = nn.ModuleList([DecoderLayer(d_model=d_model,
                                                  ffn_hidden=ffn_hidden,
                                                  n_head=n_head,
                                                  drop_prob=drop_prob)
                                     for _ in range(n_layers)])

        self.linear = nn.Linear(d_model, dec_voc_size)

    def forward(self, trg, src, trg_mask, src_mask):
        trg = self.emb(trg)

        for layer in self.layers:
            trg = layer(trg, src, trg_mask, src_mask)

        # pass to LM head
        output = self.linear(trg)
        return output

 

참조

반응형

'Machine Learning Models > Transformer' 카테고리의 다른 글

Vision Transformer (3) - Attention Map  (3) 2021.06.17
Vision Transformer (2)  (0) 2021.06.16
Vision Transformer (1)  (3) 2021.06.16
Transformer Positional Encoding  (6) 2021.06.16
Transformer  (0) 2021.05.27