I am currently studying LSTM. I found code from a book to predict a mixed sin & cos curve. However, I am stuck in my output function.
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import os
LOG_DIR = os.path.join(os.path.dirname(__file__), "log")
if os.path.exists(LOG_DIR) is False:
os.mkdir(LOG_DIR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
def inference(x, n_batch, maxlen=None, n_hidden=None, n_out=None):
def weight_variable(shape):
initial = tf.truncated_normal(shape, stddev=0.01)
return tf.Variable(initial)
def bias_variable(shape):
initial = tf.zeros(shape, dtype=tf.float32)
return tf.Variable(initial)
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
initial_state = cell.zero_state(n_batch, tf.float32)
state = initial_state
outputs = []
with tf.variable_scope('LSTM'):
for t in range(maxlen):
if t > 0:
tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(x[:, t, :], state)
outputs.append(cell_output)
output = outputs[-1]
V = weight_variable([n_hidden, n_out])
c = bias_variable([n_out])
y = tf.matmul(output, V) + c
return y
def loss(y, t):
mse = tf.reduce_mean(tf.square(y - t))
return mse
def training(loss):
optimizer = tf.train.AdamOptimizer(learning_rate=0.001, beta1=0.9, beta2=0.999)
train_step = optimizer.minimize(loss)
return train_step
class EarlyStopping():
def __init__(self, patience=0, verbose=0):
self._step = 0
self._loss = float('inf')
self.patience = patience
self.verbose = verbose
def validate(self, loss):
if self._loss < loss:
self._step += 1
if self._step > self.patience:
if self.verbose:
print('early stopping')
return True
else:
self._step = 0
self._loss = loss
return False
def sin(x, T):
return np.sin(12.0 * np.pi * x / T)
def cos(x, T):
return np.cos(17.0 * np.pi * x / T) / 3
def toy_problem(x, T, ampl=0.05):
x = np.arange(0, 2 * T + 1)
noise = ampl * np.random.uniform(low=-1.0, high=1.0, size=len(x))
return sin(x, T) + cos(x, T) + noise
if __name__ == '__main__':
T = 500
x = np.arange(0, 2 * T + 1)
length_of_sequence = 2 * T
maxlen = 300
f = toy_problem(x, T)
data = []
target = []
for i in range(0, length_of_sequence - maxlen + 1):
data.append(f[i:i + maxlen])
target.append(f[i + maxlen])
X = np.array(data).reshape(len(data), maxlen, 1)
Y = np.array(target).reshape(len(data), 1)
"""
divide training data and validation data
"""
N_train = int(len(data) * 0.9)
N_validation = len(data) - N_train
X_train, X_validation, Y_train, Y_validation = \
train_test_split(X, Y, test_size=N_validation)
"""
Model Configuration
"""
n_in = len(X[0][0])
n_hidden = 30
n_out = len(Y[0])
x = tf.placeholder(tf.float32, shape=[None, maxlen, n_in])
t = tf.placeholder(tf.float32, shape=[None, n_out])
n_batch = tf.placeholder(tf.int32, shape=[])
y = inference(x, n_batch, maxlen=maxlen, n_hidden=n_hidden, n_out=n_out)
loss = loss(y, t)
train_step = training(loss)
early_stopping = EarlyStopping(patience=10, verbose=1)
history = {
'val_loss': []
}
"""
Model Learning
"""
epochs = 500
batch_size = 10
init = tf.global_variables_initializer()
sess = tf.Session()
tf.summary.FileWriter(LOG_DIR, sess.graph)
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter("./log/RNN2/", sess.graph_def)
sess.run(init)
n_batches = N_train // batch_size
for epoch in range(epochs):
X_, Y_ = shuffle(X_train, Y_train)
for i in range(n_batches):
start = i * batch_size
end = start + batch_size
sess.run(train_step, feed_dict={
x: X_[start:end],
t: Y_[start:end],
n_batch: batch_size
})
val_loss = loss.eval(session=sess, feed_dict={
x: X_validation,
t: Y_validation,
n_batch: N_validation
})
history['val_loss'].append(val_loss)
print('epoch:', epoch, ' validation loss:', val_loss)
if early_stopping.validate(val_loss):
break
'''
Prediction with output
'''
truncate = maxlen
Z = X[:1]
print "Z", Z
original = [f[i] for i in range(maxlen)]
predicted = [None for i in range(maxlen)]
z_ = Z[-1:]
for i in range(length_of_sequence - maxlen + 1):
y_ = y.eval(session=sess, feed_dict={
x: z_,
n_batch: 1
})
sequence_ = np.concatenate((z_.reshape(maxlen, n_in)[1:], y_), axis=0).reshape(1, maxlen, n_in)
z_ = sequence_
predicted.append(y_.reshape(-1))
'''
Visualization
'''
plt.rc('font', family='serif')
plt.figure()
plt.ylim([-2.0, 2.0])
plt.plot(toy_problem(x, T), linestyle='dotted', color='#aaaaaa')
plt.plot(original, color='black')
plt.plot(predicted, color='red')
plt.show()
In the output function, we have
cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)
initial_state = cell.zero_state(n_batch, tf.float32)
state = initial_state
outputs = []
with tf.variable_scope('LSTM'):
for t in range(maxlen):
if t > 0:
tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(x[:, t, :], state)
outputs.append(cell_output)
output = outputs[-1]
But I have no idea what they are doing cell = tf.contrib.rnn.BasicLSTMCell(n_hidden)and (cell_output, state) = cell(x[:, t, :], state).
I try to understand, but if someone has a key.