import tensorflow as tf
import tensorflow_datasets as tfds
import os
import re
import numpy as np
import matplotlib.pyplot as plt
print("슝=3")
슝=3
들어가며¶
우리가 떠올리는 인공지능은 무엇인가? 인간 언어를 이해하고 인간과 자연어로 대화할 수 있는 기계를 우리는 자연스럽게 떠올리게 된다. 하지만 우리가 주변에서 흔히 보는 챗봇들이 모두 대화형인 것은 아니다.
링크를 보면 여러 챗봇들이 있다. 하지만 대화형 챗봇이 아니면 가지는 한계는 명확하다. 바로 사용자가 어떤 말을 하더라도 알아듣고 적절히 대응할 수 없다는 점이다.
챗봇과 딥러닝¶
간략한 챗봇의 역사를 확인해 보면, 트랜스포머는 LSTM에 훨씬 뛰어난 처리 속도를 보이면서도 LSTM 등 RNN 모델이 가지는 장기 의존성에 robust한 특징 때문에 매우 긴 길이의 문장을 처리하는 데 유리하다는 좋은 특징을 보였고, 이후 자연어처리 분야의 혁신을 가져온 발판이 되어 주었다.
그래서 오늘은 트랜스포머 모델을 기반으로 한 인코더-디코더 구조를 바탕으로 챗봇을 제작해 보자.
학습 목표¶
- 트랜스포머의 인코더 디코더 구조 이해하기
- 내부 단어 토크나이저 사용하기
- 셀프 어텐션 이해하기
- 한국어에도 적용해보기
준비물: 디렉토리 생성¶
$ mkdir -p ~/aiffel/songys_chatbot
트랜스포머의 입력 이해하기¶
많은 자연어 처리 모델들은 텍스트 문장을 입력으로 받기 위해 단어를 임베딩 벡터로 변환하는 벡터화 과정을 거친다. 트랜스포머 또한 그 점에서는 다른 모델들과 다르지 않다. 하지만 트랜스포머 모델의 입력 데이터 처리에는 RNN 계열의 모델들과 다른 점이 하나 있다. 바로 임베팅 벡터에 어떤 값을 더해준 후 입력으로 사용한다는 점이다. 그 값은 positional Encoding에 해당하는 부분이다.
인코더 입력부분을 조금 확대해보자.
이렇게 하는 이유는 트랜스포머는 입력을 받을 때, 문장 내에 있는 단어들을 1개씩 순차적으로 받는 게 아니라, 문장이 있는 모든 단어를 한꺼번에 입력으로 받기 때문이다. 이 부분이 RNN과 결정적으로 다른 부분이다.
RNN은 단어들이 어순대로 모델에 입력되므로 모델에게 어순 정보를 알려줄 필요가 없다. 하지만 트랜스포머는 모든 단어를 한꺼번에 입력받기 때문에 어순을 알려줘야 한다.
positional Encoding의 값은 수식에 의해 정해진다. 트랜스포머는 사인, 코사인 함수의 값을 임베딩 벡터에 더해줌으로써 단어의 순서 정보를 더해준다.
위 함수를 이해하기 위해서는 임베딩 벡터와 positional Encoding의 덧셈은 사실 임베딩 벡터가 모여 만들어진 문장 벡터 행렬과 positional Encoding 행렬의 덧셈 연산을 통해 이뤄진다는 점을 이해해야 한다.
$d_model$은 임베딩 벡터 차원을 의미하고, pos는 입력 문장에서의 임베딩 벡터의 위치를 나타내며, i는 임베딩 벡터 내의 차원의 인덱스를 의미한다.
positional matrix를 직접 구현해 눈으로 확인해보자!
# 포지셔널 인코딩 레이어
class PositionalEncoding(tf.keras.layers.Layer):
def __init__(self, position, d_model):
super(PositionalEncoding, self).__init__()
self.pos_encoding = self.positional_encoding(position, d_model)
def get_angles(self, position, i, d_model):
angles = 1 / tf.pow(10000, (2 * (i // 2)) / tf.cast(d_model, tf.float32))
return position * angles
def positional_encoding(self, position, d_model):
# 각도 배열 생성
angle_rads = self.get_angles(
position=tf.range(position, dtype=tf.float32)[:, tf.newaxis],
i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :],
d_model=d_model)
# 배열의 짝수 인덱스에는 sin 함수 적용
sines = tf.math.sin(angle_rads[:, 0::2])
# 배열의 홀수 인덱스에는 cosine 함수 적용
cosines = tf.math.cos(angle_rads[:, 1::2])
# sin과 cosine이 교차되도록 재배열
pos_encoding = tf.stack([sines, cosines], axis=0)
pos_encoding = tf.transpose(pos_encoding,[1, 2, 0])
pos_encoding = tf.reshape(pos_encoding, [position, d_model])
pos_encoding = pos_encoding[tf.newaxis, ...]
return tf.cast(pos_encoding, tf.float32)
def call(self, inputs):
return inputs + self.pos_encoding[:, :tf.shape(inputs)[1], :]
print("슝=3")
슝=3
행 크기 50, 열 크기 512인 행렬을 그려보자. 이는 최대 문장 길이 50, 워드 임베팅 차원 512로 하는 모델의 입력 벡터 모양이 이와 같을 것이다.
sample_pos_encoding = PositionalEncoding(50, 512)
plt.pcolormesh(sample_pos_encoding.pos_encoding.numpy()[0], cmap='RdBu')
plt.xlabel('Depth')
plt.xlim((0, 512))
plt.ylabel('Position')
plt.colorbar()
plt.show()
실제 논문에서는 다음과 같이 포지셔널 인코딩을 표현했다.
어텐션¶
어텐션 메커니즘은 아래 그림과 같이 표현 가능하다.
어텐션 함수는 주어진 Query
에 대해 모든 key
와의 유사도를 구한다. 그리고 이 유사도를 key와 맵핑되어 있는 각각의 값
에 반영해준다. 그리고 유사도가 반영된 값
을 모두 더해 뭉쳐주면 최종 결과인 어텐션 값(Attention Value)가 된다.
트랜스포머에서 사용된 어텐션¶
총 3가지 어텐션을 사용한다.
- 인코더 셀프 어텐션: 인코더에서 이뤄짐
- 디코더 셀프 어텐션: 디코더에서 이뤄짐
- 인코더-디코더 어텐션: 디코더에서 이뤄짐
쿼리, 키, 벨류는 기본적으로 단어(정보를 함축한) 벡터이다.
단어 벡터란 초기 입력으로 사용된 임베딩 벡터가 아니라, 트랜스포머의 여러 연산을 거친 후의 단어 벡터이다.
세 어텐션이 하는 일
- 인코더 셀프 어텐션: 인코더 입력으로 들어간 문자 내 단어들의 유사도를 구함
- 디코더 셀프 어텐션: 단어를 1개씩 생성하는 디코더가 이미 생성된 앞 단어와의 유사도를 구함
- 인코더-디코더 어텐션: 디코더가 잘 예측하기 위해 인코더에 입력된 단어들과 유사도를 구한다.
이 중 2가지가 셀프 어텐션
이다. 셀프 어텐션은 어떤 의미를 지니는지, 왜 중요한지 알아보자.
셀프 어텐션¶
- 유사도를 구하는 대상이 다른 문장의 단어가 아니라 현재 문장 내의 단어들의 유사도를 구하는 것을 의미한다.
출처: https://ai.googleblog.com/2017/08/transformer-novel-neural-network.html
유사도는 어떻게 구할까?
Scaled Dot Product Attention¶
트랜스포머에서는 다음과 같이 어텐션 값을 구한다.
Q = Query, K = key, V = value
앞서 말한 정의를 다시 보자.
어텐션 함수는 주어진 Query
에 대해 모든 key
와의 유사도를 구한다. 그리고 이 유사도를 key와 맵핑되어 있는 각각의 값
에 반영해준다. 그리고 유사도가 반영된 값
을 모두 더해 뭉쳐주면 최종 결과인 어텐션 값(Attention Value)가 된다.
이 정의와 아래 세 가지 내용만 기억하면 수식을 그림으로 정리할 수 있다.
- Q, K, V는 단어 벡터를 행으로 하는 문장 행렬이다.
- 벡터의 내적(dot product) 은 벡터의 유사도를 의미한다.
- 특정 값을 분모로 사용하는 것은 값의 크기를 조절하는 스케일링(Scaling)을 위함이다.
Q, K의 전치 행렬을 곱하는 것은 다음과 같이 표현된다
이렇게 각 단어 벡터의 유사도가 기록된 유사도 행렬이 생성된다.
이 유사도 값을 스케일링 하기 위해 행렬 전체를 특정 값으로 나누고, 유사도를 0과 1사이의 값으로 Normalize하기 위해서 소프트맥스 함수를 사용한다. 여기서 문장 행렬 V를 곱하면 Attention Value를 얻는다.
이 식은 dot product를 통해 단어벡터 간 유사도를 구한 후에 특정 값을 분모로 나눠주는 방식으로 Q, K의 유사도를 구했다고 하여 Scaled dot product Attention이라고 한다.
Scaled dot product Attention 함수를 구현해 보자.
# 스케일드 닷 프로덕트 어텐션 함수
def scaled_dot_product_attention(query, key, value, mask):
# 어텐션 가중치는 Q와 K의 닷 프로덕트
matmul_qk = tf.matmul(query, key, transpose_b=True)
# 가중치를 정규화
depth = tf.cast(tf.shape(key)[-1], tf.float32)
logits = matmul_qk / tf.math.sqrt(depth)
# 패딩에 마스크 추가
if mask is not None:
logits += (mask * -1e9)
# softmax적용
attention_weights = tf.nn.softmax(logits, axis=-1)
# 최종 어텐션은 가중치와 V의 닷 프로덕트
output = tf.matmul(attention_weights, value)
return output
print("슝=3")
슝=3
머리가 여러 개인 어텐션¶
병렬로 어텐션 수행하기¶
num_heads
라는 변수는 기계가 병렬적으로 몇 개의 어텐션 연산을 수행할지를 결정하는 하이퍼파라미터이다.
앞서 d_model
은 임베딩 차원의 벡터라고 언급하였다. 결국 트랜스포머의 초기 입력인 문장 행렬의 크기는 문장의 길이를 행으로, d_model을 열의 크기로 가진다.
트랜스포머는 이렇게 문장 행렬을 num_heads
수만큼 쪼개 어텐션을 수행하고, 이렇게 얻은 num_heads
수만큼 어텐션 값 행렬을 하나로 concatenate한다.
위 그림은 num_heads가 8개인 경우고, 다시 concatenate하면서 열 크기가 d_model이 된다.
멀티-헤드 어텐션¶
[출처 : http://jalammar.github.io/illustrated-transformer/]
위 그림은 num_heads
값이 8일 때, 병렬로 수행되는 어텐션이 서로 다른 셀프 어텐션 결과를 얻을 수 있음을 보여준다. 이와 같이 어텐션을 병렬로 수행하는 것은 멀티 헤드 어텐션이라고 부른다.
구현하기¶
다음과 같다. 내부적으로는 Scaled dot product Attention함수를 호출한다.
# searched: assert
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self, d_model, num_heads, name="multi_head_attention"):
super(MultiHeadAttention, self).__init__(name=name)
self.num_heads = num_heads
self.d_model = d_model
assert d_model % self.num_heads == 0
self.depth = d_model // self.num_heads
self.query_dense = tf.keras.layers.Dense(units=d_model)
self.key_dense = tf.keras.layers.Dense(units=d_model)
self.value_dense = tf.keras.layers.Dense(units=d_model)
self.dense = tf.keras.layers.Dense(units=d_model)
def split_heads(self, inputs, batch_size):
inputs = tf.reshape(
inputs, shape=(batch_size, -1, self.num_heads, self.depth))
return tf.transpose(inputs, perm=[0, 2, 1, 3])
def call(self, inputs):
query, key, value, mask = inputs['query'], inputs['key'], inputs[
'value'], inputs['mask']
batch_size = tf.shape(query)[0]
# Q, K, V에 각각 Dense를 적용합니다
query = self.query_dense(query)
key = self.key_dense(key)
value = self.value_dense(value)
# 병렬 연산을 위한 머리를 여러 개 만듭니다
query = self.split_heads(query, batch_size)
key = self.split_heads(key, batch_size)
value = self.split_heads(value, batch_size)
# 스케일드 닷 프로덕트 어텐션 함수
scaled_attention = scaled_dot_product_attention(query, key, value, mask)
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
# 어텐션 연산 후에 각 결과를 다시 연결(concatenate)합니다
concat_attention = tf.reshape(scaled_attention,
(batch_size, -1, self.d_model))
# 최종 결과에도 Dense를 한 번 더 적용합니다
outputs = self.dense(concat_attention)
return outputs
print("슝=3")
슝=3
def create_padding_mask(x):
mask = tf.cast(tf.math.equal(x, 0), tf.float32)
# (batch_size, 1, 1, sequence length)
return mask[:, tf.newaxis, tf.newaxis, :]
print("슝=3")
# 이 함수에 정수 시퀀스를 입력으로 하면, 숫자가 0인 부분을 체크한 벡터를 리턴한다.
슝=3
print(create_padding_mask(tf.constant([[1, 2, 0, 3, 0], [0, 0, 0, 4, 5]])))
tf.Tensor( [[[[0. 0. 1. 0. 1.]]] [[[1. 1. 1. 0. 0.]]]], shape=(2, 1, 1, 5), dtype=float32)
어텐션 연산 시에 패딩 마스킹을 이용하면 불필요하게 숫자 0을 참고하지 않을 수 있다!
룩 어헤드 마스킹(Look-ahead masking, 다음 단어 가리기)¶
RNN과 트랜스포머는 문장을 입력받는 방법이 전혀 다르다.
RNN은 step이라는 개념이 존재해서 각 step마다 단어가 순서대로 입력으로 들어가는 구조인 반면, 트랜스포머의 경우에는 문장 행렬을 만들어 한 번에 행렬 형태로 입력으로 들어간다는 특징이 있다. 이 특징 때문에 추가적인 Masking이 필요하다.
우리가 원하는 것은 이전 단어들로부터 다음 단어를 예측하는 훈련을 제대로 하는 것이다. 따라서 이런 문제를 해결하기 위해 자신보다 다음에 나올 단어를 참고하지 않도록 가리는 기법이 룩 어헤드 마스킹 기법이다.
- 어텐션을 수행할 때, Query 단어 뒤에 나오는 Key 단어들에 대해서는 마스킹한다.
https://www.youtube.com/watch?v=xhY7m8QVKjo
빨간 부분이 마스킹을 표현하고 있다. 빨간 부분을 마스킹 함수로 구현하면 다음과 같다.
#searched: band_part
def create_look_ahead_mask(x):
seq_len = tf.shape(x)[1]
look_ahead_mask = 1 - tf.linalg.band_part(tf.ones((seq_len, seq_len)), -1, 0)
padding_mask = create_padding_mask(x)
return tf.maximum(look_ahead_mask, padding_mask)
print("슝=3")
슝=3
print(create_look_ahead_mask(tf.constant([[1, 2, 3, 4, 5]])))
tf.Tensor( [[[[0. 1. 1. 1. 1.] [0. 0. 1. 1. 1.] [0. 0. 0. 1. 1.] [0. 0. 0. 0. 1.] [0. 0. 0. 0. 0.]]]], shape=(1, 1, 5, 5), dtype=float32)
이 마스킹과 패딩 마스킹은 별개이므로, 이 마스킹을 수행할 때 숫자 0인 단어가 있다면 이 또한 패딩해야 한다. 그래서 create_look_ahead_mask() 함수는 내부적으로 앞서 구현한 패딩 마스크 함수도 호출하고 있다.
숫자 0이 포함된 경우도 테스트 해보자.
print(create_look_ahead_mask(tf.constant([[0, 5, 1, 5, 5]])))
tf.Tensor( [[[[1. 1. 1. 1. 1.] [1. 0. 1. 1. 1.] [1. 0. 0. 1. 1.] [1. 0. 0. 0. 1.] [1. 0. 0. 0. 0.]]]], shape=(1, 1, 5, 5), dtype=float32)
인코더¶
하나의 인코더 층은 크게 총 2개의 sublayer로 나뉜다. 인코더 층을 구현하는 함수는 다음과 같다.
# 인코더 하나의 레이어를 함수로 구현.
# 이 하나의 레이어 안에는 두 개의 서브 레이어가 존재합니다.
def encoder_layer(units, d_model, num_heads, dropout, name="encoder_layer"):
inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
# 패딩 마스크 사용
padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
# 첫 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션)
attention = MultiHeadAttention(
d_model, num_heads, name="attention")({
'query': inputs,
'key': inputs,
'value': inputs,
'mask': padding_mask
})
# 어텐션의 결과는 Dropout과 Layer Normalization이라는 훈련을 돕는 테크닉을 수행
attention = tf.keras.layers.Dropout(rate=dropout)(attention)
attention = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(inputs + attention)
# 두 번째 서브 레이어 : 2개의 완전연결층
outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
# 완전연결층의 결과는 Dropout과 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(attention + outputs)
return tf.keras.Model(
inputs=[inputs, padding_mask], outputs=outputs, name=name)
print("슝=3")
슝=3
[출처 : http://jalammar.github.io/illustrated-transformer/]
인코더 층을 쌓아 인코더 만들기¶
이렇게 구현한 인코더 층을 임베팅 층과 포지셔널 인코딩을 연결하고, 사용자가 원하는 만큼 인코더 층을 쌓음으로써 트랜스포머의 인코더가 완성된다.
인코더/디코더 내부에서는 각 서브 층 이후 훈련을 돕는 Layer Normalization이라는 테크닉이 사용되었다.
트랜스포머는 하이퍼파라미터인 num_layer 개수의 인코더 층을 쌓는다.
def encoder(vocab_size,
num_layers,
units,
d_model,
num_heads,
dropout,
name="encoder"):
inputs = tf.keras.Input(shape=(None,), name="inputs")
# 패딩 마스크 사용
padding_mask = tf.keras.Input(shape=(1, 1, None), name="padding_mask")
# 임베딩 레이어
embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
# 포지셔널 인코딩
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
# num_layers만큼 쌓아올린 인코더의 층.
for i in range(num_layers):
outputs = encoder_layer(
units=units,
d_model=d_model,
num_heads=num_heads,
dropout=dropout,
name="encoder_layer_{}".format(i),
)([outputs, padding_mask])
return tf.keras.Model(
inputs=[inputs, padding_mask], outputs=outputs, name=name)
print("슝=3")
슝=3
디코더¶
-3개의 서브 층으로 구성됨: 셀프 어텐션 - 인코더/디코더 어텐션 - 피드 포워드 신경망
- 인코더 디코더 어텐션은 셀프 어텐션과 달리 Query가 디코더의 벡터인 반면에 Key, Value가 인코더의 벡터라는 특징이 있다.
디코더 층¶
[출처 : http://jalammar.github.io/illustrated-transformer/]
[출처 : https://medium.com/@shreyasikalra25/predict-movie-reviews-with-bert-88d8b79f5718]
디코더를 구현한 함수는 다음과 같다.
# 디코더 하나의 레이어를 함수로 구현.
# 이 하나의 레이어 안에는 세 개의 서브 레이어가 존재합니다.
def decoder_layer(units, d_model, num_heads, dropout, name="decoder_layer"):
inputs = tf.keras.Input(shape=(None, d_model), name="inputs")
enc_outputs = tf.keras.Input(shape=(None, d_model), name="encoder_outputs")
look_ahead_mask = tf.keras.Input(
shape=(1, None, None), name="look_ahead_mask")
padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
# 첫 번째 서브 레이어 : 멀티 헤드 어텐션 수행 (셀프 어텐션)
attention1 = MultiHeadAttention(
d_model, num_heads, name="attention_1")(inputs={
'query': inputs,
'key': inputs,
'value': inputs,
'mask': look_ahead_mask
})
# 멀티 헤드 어텐션의 결과는 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
attention1 = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(attention1 + inputs)
# 두 번째 서브 레이어 : 마스크드 멀티 헤드 어텐션 수행 (인코더-디코더 어텐션)
attention2 = MultiHeadAttention(
d_model, num_heads, name="attention_2")(inputs={
'query': attention1,
'key': enc_outputs,
'value': enc_outputs,
'mask': padding_mask
})
# 마스크드 멀티 헤드 어텐션의 결과는
# Dropout과 LayerNormalization이라는 훈련을 돕는 테크닉을 수행
attention2 = tf.keras.layers.Dropout(rate=dropout)(attention2)
attention2 = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(attention2 + attention1)
# 세 번째 서브 레이어 : 2개의 완전연결층
outputs = tf.keras.layers.Dense(units=units, activation='relu')(attention2)
outputs = tf.keras.layers.Dense(units=d_model)(outputs)
# 완전연결층의 결과는 Dropout과 LayerNormalization 수행
outputs = tf.keras.layers.Dropout(rate=dropout)(outputs)
outputs = tf.keras.layers.LayerNormalization(
epsilon=1e-6)(outputs + attention2)
return tf.keras.Model(
inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
outputs=outputs,
name=name)
print("슝=3")
슝=3
디코더 층 쌓아 디코더 만들기¶
이렇게 구현한 디코더 츠은 임베팅과 포지셔널 인코딩을 연결하고, 인코더와 동일하게 num_layer 개수의 디코더 층을 쌓아 디코더가 완성된다.
def decoder(vocab_size,
num_layers,
units,
d_model,
num_heads,
dropout,
name='decoder'):
inputs = tf.keras.Input(shape=(None,), name='inputs')
enc_outputs = tf.keras.Input(shape=(None, d_model), name='encoder_outputs')
look_ahead_mask = tf.keras.Input(
shape=(1, None, None), name='look_ahead_mask')
# 패딩 마스크
padding_mask = tf.keras.Input(shape=(1, 1, None), name='padding_mask')
# 임베딩 레이어
embeddings = tf.keras.layers.Embedding(vocab_size, d_model)(inputs)
embeddings *= tf.math.sqrt(tf.cast(d_model, tf.float32))
# 포지셔널 인코딩
embeddings = PositionalEncoding(vocab_size, d_model)(embeddings)
# Dropout이라는 훈련을 돕는 테크닉을 수행
outputs = tf.keras.layers.Dropout(rate=dropout)(embeddings)
for i in range(num_layers):
outputs = decoder_layer(
units=units,
d_model=d_model,
num_heads=num_heads,
dropout=dropout,
name='decoder_layer_{}'.format(i),
)(inputs=[outputs, enc_outputs, look_ahead_mask, padding_mask])
return tf.keras.Model(
inputs=[inputs, enc_outputs, look_ahead_mask, padding_mask],
outputs=outputs,
name=name)
print("슝=3")
슝=3
챗봇 데이터를 로드하고, 전처리한 후 인코더/디코더를 조합하여 모델을 만들자.
챗봇 병렬 데이터 받아오기¶
Cornell Movie-Dialogs Corpus라는 영화 및 TV 프로그램에서 사용되었던 대화의 쌍으로 구성된 데이터셋을 사용한다.
이번 스텝 목표
- 정해진 개수인 50,000개의 질문과 답변의 쌍을 추출한다.
- 문장에서 단어와 구두점 사이에 공백을 추가한다.
- 알파벳과 ! ? , . 이 4개의 구두점을 제외하고 다른 특수문자는 모두 제거한다.
path_to_zip = tf.keras.utils.get_file(
'cornell_movie_dialogs.zip',
origin='http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip',
extract=True)
path_to_dataset = os.path.join(
os.path.dirname(path_to_zip), "cornell movie-dialogs corpus")
path_to_movie_lines = os.path.join(path_to_dataset, 'movie_lines.txt')
path_to_movie_conversations = os.path.join(path_to_dataset,'movie_conversations.txt')
print("슝=3")
Downloading data from http://www.cs.cornell.edu/~cristian/data/cornell_movie_dialogs_corpus.zip 9920512/9916637 [==============================] - 1s 0us/step 9928704/9916637 [==============================] - 1s 0us/step 슝=3
# 사용할 샘플의 최대 개수
MAX_SAMPLES = 50000
print(MAX_SAMPLES)
50000
전처리는 정규 표현식을 사용해 구두점을 제거하여 단어를 토크나이징하는 일에 방해가 되지 않게 정제하자
# 전처리 함수
def preprocess_sentence(sentence):
sentence = sentence.lower().strip()
# 단어와 구두점(punctuation) 사이의 거리를 만듭니다.
# 예를 들어서 "I am a student." => "I am a student ."와 같이
# student와 온점 사이에 거리를 만듭니다.
sentence = re.sub(r"([?.!,])", r" \1 ", sentence)
sentence = re.sub(r'[" "]+', " ", sentence)
# (a-z, A-Z, ".", "?", "!", ",")를 제외한 모든 문자를 공백인 ' '로 대체합니다.
sentence = re.sub(r"[^a-zA-Z?.!,]+", " ", sentence)
sentence = sentence.strip()
return sentence
print("슝=3")
슝=3
# 데이터 로드 + 전처리 함수를 호출해 질문/답변의 쌍을 전처리함
# 질문과 답변의 쌍인 데이터셋을 구성하기 위한 데이터 로드 함수
def load_conversations():
id2line = {}
with open(path_to_movie_lines, errors='ignore') as file:
lines = file.readlines()
for line in lines:
parts = line.replace('\n', '').split(' +++$+++ ')
id2line[parts[0]] = parts[4]
inputs, outputs = [], []
with open(path_to_movie_conversations, 'r') as file:
lines = file.readlines()
for line in lines:
parts = line.replace('\n', '').split(' +++$+++ ')
conversation = [line[1:-1] for line in parts[3][1:-1].split(', ')]
for i in range(len(conversation) - 1):
# 전처리 함수를 질문에 해당되는 inputs와 답변에 해당되는 outputs에 적용.
inputs.append(preprocess_sentence(id2line[conversation[i]]))
outputs.append(preprocess_sentence(id2line[conversation[i + 1]]))
if len(inputs) >= MAX_SAMPLES:
return inputs, outputs
return inputs, outputs
print("슝=3")
슝=3
# 샘플 수 확인
# 데이터를 로드하고 전처리하여 질문을 questions, 답변을 answers에 저장합니다.
questions, answers = load_conversations()
print('전체 샘플 수 :', len(questions))
print('전체 샘플 수 :', len(answers))
전체 샘플 수 : 50000 전체 샘플 수 : 50000
# check
print('전처리 후의 22번째 질문 샘플: {}'.format(questions[21]))
print('전처리 후의 22번째 답변 샘플: {}'.format(answers[21]))
전처리 후의 22번째 질문 샘플: she s not a . . . 전처리 후의 22번째 답변 샘플: lesbian ? no . i found a picture of jared leto in one of her drawers , so i m pretty sure she s not harboring same sex tendencies .
병렬 데이터 전처리¶
이번 스텝 요약
- TensorFlow Datasets SubwordTextEncoder를 토크나이저로 사용한다. 단어보다 더 작은 단위인 Subword를 기준으로 토크나이징하고, 각 토큰을 고유한 정수로 인코딩한다.
- 각 문장을 토큰화하고 각 문장의 시작과 끝을 나타내는 START_TOKEN 및 END_TOKEN을 추가한다.
- 최대 길이 MAX_LENGTH인 40을 넘는 문장들은 필터링한다.
- MAX_LENGTH보다 길이가 짧은 문장들은 40에 맞도록 패딩 한다.
1. 단어장 만들기¶
각 단어에 고유한 정수 인덱스를 부여하기 위해 질문, 답변 데이터셋 모두 사용하여 만든다.
import tensorflow_datasets as tfds
print("살짝 오래 걸릴 수 있어요. 스트레칭 한 번 해볼까요? 👐")
# 질문과 답변 데이터셋에 대해서 Vocabulary 생성. (Tensorflow 2.3.0 이상) (클라우드는 2.4 입니다)
tokenizer = tfds.deprecated.text.SubwordTextEncoder.build_from_corpus(questions + answers, target_vocab_size=2**13)
print("슝=3 ")
살짝 오래 걸릴 수 있어요. 스트레칭 한 번 해볼까요? 👐 슝=3
import tensorflow_datasets as tfds
print("슝=3 ")
슝=3
시작, 종료 토큰에 대해서도 단어장에 추가해서 정수를 부여하자. 이미 생성된 단어와 겹치지 않게 단어장의 크기보다 1이 큰 수를 번호로 부여하자.
# 시작 토큰과 종료 토큰에 고유한 정수를 부여합니다.
START_TOKEN, END_TOKEN = [tokenizer.vocab_size], [tokenizer.vocab_size + 1]
print("슝=3")
슝=3
print('START_TOKEN의 번호 :' ,[tokenizer.vocab_size])
print('END_TOKEN의 번호 :' ,[tokenizer.vocab_size + 1])
START_TOKEN의 번호 : [8331] END_TOKEN의 번호 : [8332]
# 시작 토큰과 종료 토큰을 고려하여 +2를 하여 단어장의 크기를 산정합니다.
VOCAB_SIZE = tokenizer.vocab_size + 2
print(VOCAB_SIZE)
8333
2.각 단어를 고유한 정수로 인코딩(Integer encoding) & 패딩(padding)¶
위에서 tensorflow_datasets의 SubwordTextEncoder를 사용해서 tokenizer를 정의하고 Vocabulary를 만들었다면, tokenizer.encode()로 각 단어를 정수로 변환할 수 있고 또는 tokenizer.decode()를 통해 정수 시퀀스를 단어 시퀀스로 변환할 수 있다.
# 임의의 22번째 샘플에 대해서 정수 인코딩 작업을 수행.
# 각 토큰을 고유한 정수로 변환
print('정수 인코딩 후의 21번째 질문 샘플: {}'.format(tokenizer.encode(questions[21])))
print('정수 인코딩 후의 21번째 답변 샘플: {}'.format(tokenizer.encode(answers[21])))
정수 인코딩 후의 21번째 질문 샘플: [60, 8, 37, 8172, 49] 정수 인코딩 후의 21번째 답변 샘플: [7824, 1223, 19, 61, 2, 4, 336, 10, 1595, 14, 1104, 698, 3263, 263, 16, 71, 14, 107, 2133, 900, 3, 59, 4, 23, 355, 204, 60, 8, 37, 885, 2289, 8107, 344, 1001, 5179, 4214, 342, 1]
# 샘플의 최대 허용 길이 또는 패딩 후의 최종 길이
MAX_LENGTH = 40
print(MAX_LENGTH)
40
# 정수 인코딩, 최대 길이를 초과하는 샘플 제거, 패딩
def tokenize_and_filter(inputs, outputs):
tokenized_inputs, tokenized_outputs = [], []
for (sentence1, sentence2) in zip(inputs, outputs):
# 정수 인코딩 과정에서 시작 토큰과 종료 토큰을 추가
sentence1 = START_TOKEN + tokenizer.encode(sentence1) + END_TOKEN
sentence2 = START_TOKEN + tokenizer.encode(sentence2) + END_TOKEN
# 최대 길이 40 이하인 경우에만 데이터셋으로 허용
if len(sentence1) <= MAX_LENGTH and len(sentence2) <= MAX_LENGTH:
tokenized_inputs.append(sentence1)
tokenized_outputs.append(sentence2)
# 최대 길이 40으로 모든 데이터셋을 패딩
tokenized_inputs = tf.keras.preprocessing.sequence.pad_sequences(
tokenized_inputs, maxlen=MAX_LENGTH, padding='post')
tokenized_outputs = tf.keras.preprocessing.sequence.pad_sequences(
tokenized_outputs, maxlen=MAX_LENGTH, padding='post')
return tokenized_inputs, tokenized_outputs
print("슝=3")
슝=3
questions, answers = tokenize_and_filter(questions, answers)
print('단어장의 크기 :',(VOCAB_SIZE))
print('필터링 후의 질문 샘플 개수: {}'.format(len(questions)))
print('필터링 후의 답변 샘플 개수: {}'.format(len(answers)))
단어장의 크기 : 8333 필터링 후의 질문 샘플 개수: 44095 필터링 후의 답변 샘플 개수: 44095
3. Teacher Forcing 사용하기¶
tf.data.Dataset API는 훈련 프로세스의 속도가 빨라지도록 입력 파이프라인을 구축하는 API이다.
이를 적극 사용하기 위해서 질문과 답변의 쌍을 tf.data.Dataset의 입력으로 넣어주는 작업을 한다.
이때, 디코더의 입력과 실제값(레이블)을 정의해 주기 위해서는 교사 강요(Teacher Forcing) 이라는 언어 모델의 훈련 기법을 이해해야만 한다. 아래의 글을 통해 교사 강요에 대해 알아보자.
- 자기회귀 모델(auto-regressive model, AR): 이전 자신의 출력이 현재 자신의 상태를 결정하는 모델. RNN, 트랜스포머의 디코더 또한 자기회귀 모델이다.
질문/답변 쌍을 tf.data.Dataset API의 입력으로 사용하여 파이프라인을 구성한다. 이때 Teacher Force를 위해 answers[:, :-1]
를 디코더 입력값, answers[, 1:]
를 디코더의 레이블로 사용한다.
BATCH_SIZE = 64
BUFFER_SIZE = 20000
# 디코더는 이전의 target을 다음의 input으로 사용합니다.
# 이에 따라 outputs에서는 START_TOKEN을 제거하겠습니다.
dataset = tf.data.Dataset.from_tensor_slices((
{
'inputs': questions,
'dec_inputs': answers[:, :-1]
},
{
'outputs': answers[:, 1:]
},
))
dataset = dataset.cache()
dataset = dataset.shuffle(BUFFER_SIZE)
dataset = dataset.batch(BATCH_SIZE)
dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)
print("슝=3")
슝=3
모델 정의/학습¶
def transformer(vocab_size,
num_layers,
units,
d_model,
num_heads,
dropout,
name="transformer"):
inputs = tf.keras.Input(shape=(None,), name="inputs")
dec_inputs = tf.keras.Input(shape=(None,), name="dec_inputs")
# 인코더에서 패딩을 위한 마스크
enc_padding_mask = tf.keras.layers.Lambda(
create_padding_mask, output_shape=(1, 1, None),
name='enc_padding_mask')(inputs)
# 디코더에서 미래의 토큰을 마스크 하기 위해서 사용합니다.
# 내부적으로 패딩 마스크도 포함되어져 있습니다.
look_ahead_mask = tf.keras.layers.Lambda(
create_look_ahead_mask,
output_shape=(1, None, None),
name='look_ahead_mask')(dec_inputs)
# 두 번째 어텐션 블록에서 인코더의 벡터들을 마스킹
# 디코더에서 패딩을 위한 마스크
dec_padding_mask = tf.keras.layers.Lambda(
create_padding_mask, output_shape=(1, 1, None),
name='dec_padding_mask')(inputs)
# 인코더
enc_outputs = encoder(
vocab_size=vocab_size,
num_layers=num_layers,
units=units,
d_model=d_model,
num_heads=num_heads,
dropout=dropout,
)(inputs=[inputs, enc_padding_mask])
# 디코더
dec_outputs = decoder(
vocab_size=vocab_size,
num_layers=num_layers,
units=units,
d_model=d_model,
num_heads=num_heads,
dropout=dropout,
)(inputs=[dec_inputs, enc_outputs, look_ahead_mask, dec_padding_mask])
# 완전연결층
outputs = tf.keras.layers.Dense(units=vocab_size, name="outputs")(dec_outputs)
return tf.keras.Model(inputs=[inputs, dec_inputs], outputs=outputs, name=name)
print("슝=3")
슝=3
1. 모델 생성¶
tf.keras.backend.clear_session()
# 하이퍼파라미터
NUM_LAYERS = 2 # 인코더와 디코더의 층의 개수
D_MODEL = 256 # 인코더와 디코더 내부의 입, 출력의 고정 차원
NUM_HEADS = 8 # 멀티 헤드 어텐션에서의 헤드 수
UNITS = 512 # 피드 포워드 신경망의 은닉층의 크기
DROPOUT = 0.1 # 드롭아웃의 비율
model = transformer(
vocab_size=VOCAB_SIZE,
num_layers=NUM_LAYERS,
units=UNITS,
d_model=D_MODEL,
num_heads=NUM_HEADS,
dropout=DROPOUT)
model.summary()
Model: "transformer" __________________________________________________________________________________________________ Layer (type) Output Shape Param # Connected to ================================================================================================== inputs (InputLayer) [(None, None)] 0 __________________________________________________________________________________________________ dec_inputs (InputLayer) [(None, None)] 0 __________________________________________________________________________________________________ enc_padding_mask (Lambda) (None, 1, 1, None) 0 inputs[0][0] __________________________________________________________________________________________________ encoder (Functional) (None, None, 256) 3187456 inputs[0][0] enc_padding_mask[0][0] __________________________________________________________________________________________________ look_ahead_mask (Lambda) (None, 1, None, None 0 dec_inputs[0][0] __________________________________________________________________________________________________ dec_padding_mask (Lambda) (None, 1, 1, None) 0 inputs[0][0] __________________________________________________________________________________________________ decoder (Functional) (None, None, 256) 3714816 dec_inputs[0][0] encoder[0][0] look_ahead_mask[0][0] dec_padding_mask[0][0] __________________________________________________________________________________________________ outputs (Dense) (None, None, 8333) 2141581 decoder[0][0] ================================================================================================== Total params: 9,043,853 Trainable params: 9,043,853 Non-trainable params: 0 __________________________________________________________________________________________________
2. 손실함수¶
def loss_function(y_true, y_pred):
y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
loss = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')(y_true, y_pred)
mask = tf.cast(tf.not_equal(y_true, 0), tf.float32)
loss = tf.multiply(loss, mask)
return tf.reduce_mean(loss)
print("슝=3")
슝=3
3. 커스텀된 학습률¶
Custom Learning rate Scheduling: 초기에 lr를 급격히 높였다가 이후 train step이 진행됨에 따라 서서히 낮춰 안정적으로 수렴하게 하는 학습 기법
아담 옴티마이저를 쓰자
학습 초기에는 lr이 step_num에 비례해서 증가하다가 이후로는 감소하는 것을 확인할 수 있다.
class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, d_model, warmup_steps=4000):
super(CustomSchedule, self).__init__()
self.d_model = d_model
self.d_model = tf.cast(self.d_model, tf.float32)
self.warmup_steps = warmup_steps
def __call__(self, step):
arg1 = tf.math.rsqrt(step)
arg2 = step * (self.warmup_steps**-1.5)
return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)
print("슝=3")
슝=3
sample_learning_rate = CustomSchedule(d_model=128)
plt.plot(sample_learning_rate(tf.range(200000, dtype=tf.float32)))
plt.ylabel("Learning Rate")
plt.xlabel("Train Step")
Text(0.5, 0, 'Train Step')
4. 모델 컴파일¶
손실 함수와 커스텀 된 학습률을 사용하여 모델을 컴파일한다.
learning_rate = CustomSchedule(D_MODEL)
optimizer = tf.keras.optimizers.Adam(
learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)
def accuracy(y_true, y_pred):
y_true = tf.reshape(y_true, shape=(-1, MAX_LENGTH - 1))
return tf.keras.metrics.sparse_categorical_accuracy(y_true, y_pred)
model.compile(optimizer=optimizer, loss=loss_function, metrics=[accuracy])
print("슝=3")
슝=3
5. 훈련¶
EPOCHS = 20
model.fit(dataset, epochs=EPOCHS, verbose=1)
ERROR! Session/line number was not unique in database. History logging moved to new session 3 Epoch 1/20 689/689 [==============================] - 37s 54ms/step - loss: 2.1137 - accuracy: 0.0425 Epoch 2/20 689/689 [==============================] - 37s 54ms/step - loss: 1.4957 - accuracy: 0.0794 Epoch 3/20 689/689 [==============================] - 37s 54ms/step - loss: 1.3942 - accuracy: 0.0860 Epoch 4/20 689/689 [==============================] - 37s 54ms/step - loss: 1.3356 - accuracy: 0.0901 Epoch 5/20 689/689 [==============================] - 37s 54ms/step - loss: 1.2852 - accuracy: 0.0944 Epoch 6/20 689/689 [==============================] - 37s 54ms/step - loss: 1.2384 - accuracy: 0.0978 Epoch 7/20 689/689 [==============================] - 37s 54ms/step - loss: 1.1845 - accuracy: 0.1019 Epoch 8/20 689/689 [==============================] - 37s 54ms/step - loss: 1.1236 - accuracy: 0.1073 Epoch 9/20 689/689 [==============================] - 37s 54ms/step - loss: 1.0658 - accuracy: 0.1129 Epoch 10/20 689/689 [==============================] - 37s 54ms/step - loss: 1.0134 - accuracy: 0.1186 Epoch 11/20 689/689 [==============================] - 37s 54ms/step - loss: 0.9645 - accuracy: 0.1243 Epoch 12/20 689/689 [==============================] - 37s 54ms/step - loss: 0.9196 - accuracy: 0.1300 Epoch 13/20 689/689 [==============================] - 37s 54ms/step - loss: 0.8795 - accuracy: 0.1357 Epoch 14/20 689/689 [==============================] - 37s 54ms/step - loss: 0.8410 - accuracy: 0.1413 Epoch 15/20 689/689 [==============================] - 37s 54ms/step - loss: 0.8068 - accuracy: 0.1466 Epoch 16/20 689/689 [==============================] - 37s 54ms/step - loss: 0.7754 - accuracy: 0.1516 Epoch 17/20 689/689 [==============================] - 37s 54ms/step - loss: 0.7477 - accuracy: 0.1557 Epoch 18/20 689/689 [==============================] - 37s 54ms/step - loss: 0.7208 - accuracy: 0.1604 Epoch 19/20 689/689 [==============================] - 37s 54ms/step - loss: 0.6970 - accuracy: 0.1646 Epoch 20/20 689/689 [==============================] - 37s 54ms/step - loss: 0.6756 - accuracy: 0.1683
<keras.callbacks.History at 0x7fef282a4e20>
챗봇 테스트¶
- 새로운 입력 문장에 대해서는 훈련 때와 동일한 전처리를 거친다.
- 입력 문장을 토크나이징하고, START_TOKEN과 END_TOKEN을 추가한다.
- 패딩 마스킹과 룩 어헤드 마스킹을 계산한다.
- 디코더는 입력 시퀀스로부터 다음 단어를 예측한다.
- 디코더는 예측된 다음 단어를 기존의 입력 시퀀스에 추가하여 새로운 입력으로 사용한다.
- END_TOKEN이 예측되거나 문장의 최대 길이에 도달하면 디코더는 동작을 멈춘다.
위 과정을 모두 담은 decoder_inference()
함수를 만든다
def decoder_inference(sentence):
sentence = preprocess_sentence(sentence)
# 입력된 문장을 정수 인코딩 후, 시작 토큰과 종료 토큰을 앞뒤로 추가.
# ex) Where have you been? → [[8331 86 30 5 1059 7 8332]]
sentence = tf.expand_dims(
START_TOKEN + tokenizer.encode(sentence) + END_TOKEN, axis=0)
# 디코더의 현재까지의 예측한 출력 시퀀스가 지속적으로 저장되는 변수.
# 처음에는 예측한 내용이 없음으로 시작 토큰만 별도 저장. ex) 8331
output_sequence = tf.expand_dims(START_TOKEN, 0)
# 디코더의 인퍼런스 단계
for i in range(MAX_LENGTH):
# 디코더는 최대 MAX_LENGTH의 길이만큼 다음 단어 예측을 반복합니다.
predictions = model(inputs=[sentence, output_sequence], training=False)
predictions = predictions[:, -1:, :]
# 현재 예측한 단어의 정수
predicted_id = tf.cast(tf.argmax(predictions, axis=-1), tf.int32)
# 만약 현재 예측한 단어가 종료 토큰이라면 for문을 종료
if tf.equal(predicted_id, END_TOKEN[0]):
break
# 예측한 단어들은 지속적으로 output_sequence에 추가됩니다.
# 이 output_sequence는 다시 디코더의 입력이 됩니다.
output_sequence = tf.concat([output_sequence, predicted_id], axis=-1)
return tf.squeeze(output_sequence, axis=0)
print("슝=3")
슝=3
임의의 입력 문장에 대해 decoder_inference()함수를 호출해 챗봇의 대답을 얻는 sentence_generation()함수를 만든다.
def sentence_generation(sentence):
# 입력 문장에 대해서 디코더를 동작 시켜 예측된 정수 시퀀스를 리턴받습니다.
prediction = decoder_inference(sentence)
# 정수 시퀀스를 다시 텍스트 시퀀스로 변환합니다.
predicted_sentence = tokenizer.decode(
[i for i in prediction if i < tokenizer.vocab_size])
print('입력 : {}'.format(sentence))
print('출력 : {}'.format(predicted_sentence))
return predicted_sentence
print("슝=3")
슝=3
sentence_generation('Where have you been?')
입력 : Where have you been? 출력 : i m not working on it .
'i m not working on it .'
sentence_generation("It's a trap")
입력 : It's a trap 출력 : we ll get it back .
'we ll get it back .'
'Computer Science > AI Exploration' 카테고리의 다른 글
[E-14] Recommendation (0) | 2022.02.24 |
---|---|
[E-13] Bert (0) | 2022.02.22 |
[E-11] Stock prediction (0) | 2022.02.22 |
[E-10] Generative Modeling (0) | 2022.02.22 |
[E-09] Pneumonia (0) | 2022.02.22 |