ノイズの多い時系列データをスムージング(MAとSSA)して、予測モデルを構築(LSTM)する


Summary

  • 実ビジネスで使われる時系列データはノイズが非常に多い
  • その為、ローデータのまま予測モデルにフィットさせることは難しい
  • そこで、スムージングした後に予測モデルにフィットさせると予測精度が向上する
  • 一方で、スムージングし過ぎると、ビジネス上で役に立たない結果となってしまうので、予測精度とスムージングの度合いのバランスを見ながらモデルを構築していく必要がある
  • 本記事ではスムージングの方法(Moving AverageとSingular Spectrum Analysis)とシンプルな予測モデル(LSTM)を紹介する

分析ゴール

  • 2014~2016年データをインプットに2017年の為替レートを予測

対象データ

  • Yahoo finance から取得した為替レート(USD/JPY)
  • 2014~2017年末までのデータを取得
import fix_yahoo_finance as yf  
data = yf.download('JPY=X','2014-01-01','2018-01-01')

データ可視化

  • インプットデータ(青線):2014年〜2016年
  • 予測目的データ(オレンジ):2017年
Y = data.Close['2017']
X =  data.Close[:'2016']
plt.figure(figsize=(15, 3))
plt.plot(X, lw=.7)
plt.plot(Y, lw=.7)

標準化とスムージング

  • 0~1の値に標準化
  • Moving AverageとSSA(Singular Spectrum Analysis)でスムージング
def unit(x):
    return (x-np.min(x,0))/(np.max(x,0)-np.min(x,0))

def MA(x, wsize):
    return [x[i:i+wsize].mean() for i in range(len(x)-wsize)]

def SSA(x,wsize, fac):
    tra = np.array([x[j:j+wsize] for j in range(len(x)-wsize+1)])
    u,s,v = np.linalg.svd(tra)
    t = np.dot(np.dot(u[:,:fac], np.diag(s[:fac])), v[:fac])
    return np.concatenate([t[0], t[1:,-1]])

def SingVal(x,wsize):
    tra = np.array([x[j:j+wsize] for j in range(len(x)-wsize+1)])
    u,s,v = np.linalg.svd(tra)
    return s

Y = data.Close['2017'].values
X =  data.Close[:'2016'].values

Y = unit(Y)
X = unit(X)

X_MA = MA(X, wsize=5)
X_SSA = SSA(X, wsize=30, fac=3)

Y_MA = MA(Y, wsize=5)
Y_SSA = SSA(Y, wsize=30, fac=3)

スムージングしたデータを可視化

plt.figure(figsize=(15, 3))
plt.plot(X, label='Original', lw=.7)
plt.plot(X_MA, label='Moving average', lw=.7)
plt.plot(X_SSA, label='SSA', lw=.7)
plt.legend()

モデル構築

  • kerasによるLSTMの構築
  • 1ヶ月の周期性(土日はデータなし)を想定してwsize=20でデータ作成
  • n_hidden=100は適当(パラーメータ調整までしてないです)
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping

def StackData(data, wsize):
    X, Y = [], []
    for i in range(len(data)-wsize):
        X.append(data[i:i + wsize])
        Y.append(data[i + wsize])
        X_ = np.array(X).reshape(len(X), wsize, 1)
        Y_ = np.array(Y).reshape(len(X), 1)
    return X_, Y_

x, y = StackData(X, wsize=20)

length_of_sequence = x.shape[1] 
in_out_neurons = 1
n_hidden = 100

model = Sequential()
model.add(LSTM(n_hidden, batch_input_shape=(None, length_of_sequence, in_out_neurons), return_sequences=False))
model.add(Dense(in_out_neurons))
model.add(Activation("linear"))
optimizer = Adam(lr=0.001)
model.compile(loss="mean_squared_error", optimizer=optimizer)

学習データでの再現

  • モデル構築に使用した学習データとの比較
predicted = model.predict(x)

plt.figure(figsize=(15, 3))
plt.plot(range(0, len(X)), X, label="original", lw=.7)
plt.plot(range(20,len(predicted)+20),predicted, label="predict", lw=.7)
plt.legend()
plt.show()

テストデータとの比較

  • モデルによる予測値と実際の値の比較

x_, y_ = StackData(Y, wsize=20)
plt.figure(figsize=(15, 3))
plt.plot(range(0, len(Y)), Y, label="original", lw=.7)
plt.plot(range(20,len(predicted)+20),predicted, label="predict", lw=.7)
plt.legend()
plt.show()

オリジナルデータ・スムージングデータでの予測モデルの比較

  • 上記の予測モデル構築をオリジナルデータ・スムージングデータ(Moving Average)・スムージングデータ(SSA)で実行
  • 予測値と実際の値を比較
  • 予測精度の指標としてRMSEを算出

オリジナルデータによる予測

スムージングデータ(Moving Average)による予測

スムージングデータ(SSA)による予測

def test_model(X, Y):
    x, y = StackData(X, wsize=30)
    x_, y_ = StackData(Y, wsize=30)

    length_of_sequence = x.shape[1] 
    in_out_neurons = 1
    n_hidden = 100

    model = Sequential()
    model.add(LSTM(n_hidden, batch_input_shape=(None, length_of_sequence, in_out_neurons), return_sequences=False))
    model.add(Dense(in_out_neurons))
    model.add(Activation("linear"))
    optimizer = Adam(lr=0.001)
    model.compile(loss="mean_squared_error", optimizer=optimizer)

    early_stopping = EarlyStopping(monitor='val_loss', mode='auto', patience=20)
    model.fit(x, y,
              batch_size=300,
              epochs=100,
              validation_split=0.1,
              callbacks=[early_stopping],
              verbose=0
              )
    predicted = model.predict(x_)
    return predicted

def rmse(a, b):
    return np.sqrt(np.mean((a-b)**2))
predicted = test_model(X, Y)
plt.figure(figsize=(15, 3))
plt.plot(range(0, len(Y)), Y, label="original", lw=.7)
plt.plot(range(30,len(predicted)+30),predicted, label="predict", lw=.7)
plt.legend()
plt.show()

print('Compare with Original: ', rmse(predicted, Y))

predicted = test_model(X_MA, Y_MA)
plt.figure(figsize=(15, 3))
plt.plot(range(0, len(Y)), Y, label="original", lw=.7)
plt.plot(range(25,len(predicted)+25),predicted, label="predict", lw=.7)
plt.plot(range(0, len(Y_MA)), Y_MA, label="Smooth", lw=.7)
plt.legend()
plt.show()

print('Compare with Original: ', rmse(predicted, Y))
print('Compare with Smoothed: ', rmse(predicted, Y_MA))

predicted = test_model(X_SSA, Y_SSA)
plt.figure(figsize=(15, 3))
plt.plot(range(0, len(Y)), Y, label="original", lw=.7)
plt.plot(range(25,len(predicted)+25),predicted, label="predict", lw=.7)
plt.plot(range(0, len(Y_SSA)), Y_SSA, label="Smooth", lw=.7)
plt.legend()
plt.show()

print('Compare with Original: ', rmse(predicted, Y))
print('Compare with Smoothed: ', rmse(predicted, Y_SSA))

最後に

  • スムージングしたデータに対して予測モデルを構築した方が精度の向上が期待できそう
  • ただし、様々なパラメータ(スムージング時のパラメータ、予測モデルのパラメータ)の組み合わせで結果が異なるので、更なる検証が必要と思われる

補足

  • 色々と既知の方法を組み合わせで分析を行ってみました
  • 間違いや改善点等あればお気軽にコメント頂けますと幸いです