RNN
우리가 사용하는 데이터중 Sequence data
가 있다.
음성인식이나 자연어 처리가 그렇다.
하나의 단어가 있고 이전에 했던 단어나 말들을 이해하고 새로운 단어가 이해가 되는데 그게 sequence data
다.
이전에는 X라는 입력이 있을 때 Y가 나오는 것이였는데, 여기서는 시간이 들어간다.
즉, 현재의 state를 다음 state로 연결한다.
TensorFlow
에서는 RNN의 output이 다음 cell
로 연결이 된다.
그걸 구현하는 것은 다음과 같다.
cell = tf.contrib.rnn.BasicRNNCell(num_units=hidden_size)
num_units 는 출력의 수 이다.
outputs, _states = tf.nn.dynamic_rnn(cell, x_data, dtype=tf.float32)
우리가 만든 cell을 넘기고, 입력데이터를 넘기면 dynamic_rnn
이 두가지 출력을 내는데, 하나는 아웃풋이고 해당 state의 값이다.
물론 나눈 이유는 여러가지가 있지만, cell을 생성시키는 부분과 구현부분을 나눔으로써,
cell = tf.contrib.rnn.BasicLSTMCell(num_units=hidden_size)
로, LSTM 도 적용해 볼 수 있다.
이제 실제 진행해보자.
hihello
를 학습시키려 하는데,
그럼 h 부터 o 까지 5개가 될것이다.
sequence 는 6개가 될 것이고, (h,i,h,e,l,l)
hidden 은 5개가 될 것이다. (i,h,e,l,l,o)
import tensorflow as tf
import numpy as np
num_classes = 5
input_dim = 5 # one-hot size
hidden_size = 5 # output from the LSTM. 5 to directly predict one-hot
batch_size = 1 # one sentence
sequence_length = 6 # |ihello| == 6
learning_rate = 0.1
idx2char = ['h', 'i', 'e', 'l', 'o']
# Teach hello: hihell -> ihello
x_data = [[0, 1, 0, 2, 3, 3]] # hihell
x_one_hot = [[[1, 0, 0, 0, 0], # h 0
[0, 1, 0, 0, 0], # i 1
[1, 0, 0, 0, 0], # h 0
[0, 0, 1, 0, 0], # e 2
[0, 0, 0, 1, 0], # l 3
[0, 0, 0, 1, 0]]] # l 3
y_data = [[1, 0, 2, 3, 3, 4]] # ihello
X = tf.placeholder(
tf.float32, [None, sequence_length, input_dim]) # X one-hot
Y = tf.placeholder(tf.int32, [None, sequence_length]) # Y label
cell = tf.contrib.rnn.BasicLSTMCell(num_units=hidden_size, state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)
outputs, _states = tf.nn.dynamic_rnn(
cell, X, initial_state=initial_state, dtype=tf.float32)
weights = tf.ones([batch_size, sequence_length])
sequence_loss = tf.contrib.seq2seq.sequence_loss(
logits=outputs, targets=Y, weights=weights)
loss = tf.reduce_mean(sequence_loss)
train = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
prediction = tf.argmax(outputs, axis=2)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(2000):
l, _ = sess.run([loss, train], feed_dict={X: x_one_hot, Y: y_data})
result = sess.run(prediction, feed_dict={X: x_one_hot})
print(i, "loss:", l, "prediction: ", result, "true Y: ", y_data)
# print char using dic
result_str = [idx2char[c] for c in np.squeeze(result)]
print("\tPrediction str: ", ''.join(result_str))
한번 돌려보면 50번만 돌려도 충분히 값을 출력해낸다.
이번에는 그냥 어떤 input이던지 가능하도록 코딩해보자.
import tensorflow as tf
import numpy as np
tf.set_random_seed(777) # reproducibility
sample = " 화이팅입니다"
idx2char = list(set(sample)) # index -> char
char2idx = {c: i for i, c in enumerate(idx2char)} # char -> idex
# hyper parameters
dic_size = len(char2idx) # RNN input size (one hot size)
hidden_size = len(char2idx) # RNN output size
num_classes = len(char2idx) # final output size (RNN or softmax, etc.)
batch_size = 1 # one sample data, one batch
sequence_length = len(sample) - 1 # number of lstm rollings (unit #)
learning_rate = 0.1
sample_idx = [char2idx[c] for c in sample] # char to index
x_data = [sample_idx[:-1]] # X data sample (0 ~ n-1) hello: hell
y_data = [sample_idx[1:]] # Y label sample (1 ~ n) hello: ello
X = tf.placeholder(tf.int32, [None, sequence_length]) # X data
Y = tf.placeholder(tf.int32, [None, sequence_length]) # Y label
x_one_hot = tf.one_hot(X, num_classes) # one hot: 1 -> 0 1 0 0 0 0 0 0 0 0
cell = tf.contrib.rnn.BasicLSTMCell(
num_units=hidden_size, state_is_tuple=True)
initial_state = cell.zero_state(batch_size, tf.float32)
outputs, _states = tf.nn.dynamic_rnn(
cell, x_one_hot, initial_state=initial_state, dtype=tf.float32)
weights = tf.ones([batch_size, sequence_length])
sequence_loss = tf.contrib.seq2seq.sequence_loss(
logits=outputs, targets=Y, weights=weights)
loss = tf.reduce_mean(sequence_loss)
train = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(loss)
prediction = tf.argmax(outputs, axis=2)
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
for i in range(50):
l, _ = sess.run([loss, train], feed_dict={X: x_data, Y: y_data})
result = sess.run(prediction, feed_dict={X: x_data})
# print char using dic
result_str = [idx2char[c] for c in np.squeeze(result)]
print(i, "loss:", l, "Prediction:", ''.join(result_str))
이번에는 정말 긴 input을 해보자. 근데 아마 잘 안될 것이다.
import tensorflow as tf
import numpy as np
from tensorflow.contrib import rnn
tf.set_random_seed(777) # reproducibility
sentence = ("if you want to build a ship, don't drum up people together to "
"collect wood and don't assign them tasks and work, but rather "
"teach them to long for the endless immensity of the sea.")
char_set = list(set(sentence))
char_dic = {w: i for i, w in enumerate(char_set)}
data_dim = len(char_set)
hidden_size = len(char_set)
num_classes = len(char_set)
sequence_length = 10 # Any arbitrary number
learning_rate = 0.1
dataX = []
dataY = []
for i in range(0, len(sentence) - sequence_length):
x_str = sentence[i:i + sequence_length]
y_str = sentence[i + 1: i + sequence_length + 1]
print(i, x_str, '->', y_str)
x = [char_dic[c] for c in x_str] # x str to index
y = [char_dic[c] for c in y_str] # y str to index
dataX.append(x)
dataY.append(y)
batch_size = len(dataX)
X = tf.placeholder(tf.int32, [None, sequence_length])
Y = tf.placeholder(tf.int32, [None, sequence_length])
# One-hot encoding
X_one_hot = tf.one_hot(X, num_classes)
print(X_one_hot) # check out the shape
cell = tf.contrib.rnn.BasicLSTMCell(
num_units=hidden_size, state_is_tuple=True)
# outputs: unfolding size x hidden size, state = hidden size
outputs, _states = tf.nn.dynamic_rnn(cell, X_one_hot, dtype=tf.float32)
# All weights are 1 (equal weights)
weights = tf.ones([batch_size, sequence_length])
sequence_loss = tf.contrib.seq2seq.sequence_loss(
logits=outputs, targets=Y, weights=weights)
mean_loss = tf.reduce_mean(sequence_loss)
train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(mean_loss)
sess = tf.Session()
sess.run(tf.global_variables_initializer())
for i in range(500):
_, l, results = sess.run(
[train_op, mean_loss, outputs], feed_dict={X: dataX, Y: dataY})
for j, result in enumerate(results):
index = np.argmax(result, axis=1)
print(i, j, ''.join([char_set[t] for t in index]), l)
# Let's print the last char of each result to check it works
results = sess.run(outputs, feed_dict={X: dataX})
for j, result in enumerate(results):
index = np.argmax(result, axis=1)
if j is 0: # print all for the first result to make a sentence
print(''.join([char_set[t] for t in index]), end='')
else:
print(char_set[index[-1]], end='')
그 이유는 logit 때문이다.
우리는 wide 하고 deep 하게 가야하는데 우리는 하나밖에 없었다.
그래서 stacked RNN
이라는 개념이 나온다.
말 그대로 쌓는 것이다.
MultiRNNCell
이라는 함수를 이용해, cell을 여러개 쌓는 것이다.
그리고 다음은 Softmax (FC) in Deep CNN
이다.
FC 는 Fully Connected
다.
즉, 마지막에 softmax를 붙여주는 것이다.
import tensorflow as tf
import numpy as np
from tensorflow.contrib import rnn
tf.set_random_seed(777) # reproducibility
sentence = ("if you want to build a ship, don't drum up people together to "
"collect wood and don't assign them tasks and work, but rather "
"teach them to long for the endless immensity of the sea.")
char_set = list(set(sentence))
char_dic = {w: i for i, w in enumerate(char_set)}
data_dim = len(char_set)
hidden_size = len(char_set)
num_classes = len(char_set)
sequence_length = 10 # Any arbitrary number
learning_rate = 0.1
dataX = []
dataY = []
for i in range(0, len(sentence) - sequence_length):
x_str = sentence[i:i + sequence_length]
y_str = sentence[i + 1: i + sequence_length + 1]
print(i, x_str, '->', y_str)
x = [char_dic[c] for c in x_str] # x str to index
y = [char_dic[c] for c in y_str] # y str to index
dataX.append(x)
dataY.append(y)
batch_size = len(dataX)
X = tf.placeholder(tf.int32, [None, sequence_length])
Y = tf.placeholder(tf.int32, [None, sequence_length])
# One-hot encoding
X_one_hot = tf.one_hot(X, num_classes)
print(X_one_hot) # check out the shape
# Make a lstm cell with hidden_size (each unit output vector size)
def lstm_cell():
cell = rnn.BasicLSTMCell(hidden_size, state_is_tuple=True)
return cell
multi_cells = rnn.MultiRNNCell([lstm_cell() for _ in range(2)], state_is_tuple=True)
# outputs: unfolding size x hidden size, state = hidden size
outputs, _states = tf.nn.dynamic_rnn(multi_cells, X_one_hot, dtype=tf.float32)
# All weights are 1 (equal weights)
weights = tf.ones([batch_size, sequence_length])
sequence_loss = tf.contrib.seq2seq.sequence_loss(
logits=outputs, targets=Y, weights=weights)
mean_loss = tf.reduce_mean(sequence_loss)
train_op = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(mean_loss)
sess = tf.Session()
sess.run(tf.global_variables_initializer())
for i in range(500):
_, l, results = sess.run(
[train_op, mean_loss, outputs], feed_dict={X: dataX, Y: dataY})
for j, result in enumerate(results):
index = np.argmax(result, axis=1)
print(i, j, ''.join([char_set[t] for t in index]), l)
# Let's print the last char of each result to check it works
results = sess.run(outputs, feed_dict={X: dataX})
for j, result in enumerate(results):
index = np.argmax(result, axis=1)
if j is 0: # print all for the first result to make a sentence
print(''.join([char_set[t] for t in index]), end='')
else:
print(char_set[index[-1]], end='')
그리고 tensorflow에서 지금까지 dynamic rnn
을 사용했다.
일정하지 않은 문자열의 갯수가 들어오면, 문자를 끊을때 항상 길이가 다르니 남는 것을 어떤 padding으로 매꾼다.
그것도 weight가 있기 때문에 학습이 되어버리는데 그걸 방지할 수 있다.
그리고 마지막으로 stock을 이용해 time series data를 예측해볼 것이다.
만약에 7일간의 어떤 데이터가 있을때 8일에는 어떻게 될지 many to one
을 예측하는 것이다.
이 문제는 https://github.com/hunkim/DeepLearningZeroToAll/blob/master/lab-12-5-rnn_stock_prediction.py 여기서 확인해보자.