Efficient GANをkerasで実装


概要

業務で異常検知手法を調査していた時にEfficientGANという手法を見つけたのですが、著者のソースコードはライブラリのバージョンが記載なく実行に手こずったので、勉強も兼ねてkerasで実装してみました。
なお、実装したのはテーブルデータ用のネットワークのみで、推論時のロス計算の内”feature-matching loss”は未実装です。
※EfficientGANについてはこの記事で解説しませんが、分かりやすい解説記事が有りましたので、以下に記載しておきます。

ソースコード:https://github.com/asm94/EfficientGAN

↓参考にしたもの
元論文      :https://arxiv.org/abs/1802.06222
著者のソースコード:https://github.com/houssamzenati/Efficient-GAN-Anomaly-Detection
解説記事     :https://qiita.com/masataka46/items/49dba2790fa59c29126b

実行環境

・Windows10 64bit
・Python 3.8.3
・numpy 1.18.5
・tensorflow 2.3.1
・scikit-learn 0.22.2

実装

1.全体構成(クラス)

今回、EfficientGANのネットワークと学習および推論機能を、一つのクラスとして定義しました。
全体像は以下の通りです。個々の関数は後述します。

class EfficientGAN(object):
    def __init__(self, input_dim=0, latent_dim=32):
        self.input_dim = int(input_dim)
        self.latent_dim = int(latent_dim)

    #Train model
    def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
            optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
            verbose=1):        
        #後述

    #Test model
    def predict(self, X_test, weight=0.9, degree=1):        
        #後述  

    ##Encoder
    def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
        #後述

    ##Generator
    def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
        #後述

    ##Discriminator
    def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
        #後述

2.ネットワーク

論文を参考に以下の様に実装しました。
・”input_dim”は、論文では使用したデータの次元数である121になっていますが、可変設定出来るように変更しています。
・Discriminatorの出力層の活性化関数は、論文ではlinear(線形)ですが、著者のソースコードを見ると、ロス計算時にシグモイド関数で変換しているため、今回はネットワークの方に組み込みました。

##Encoder
def get_encoder(self, initializer=tf.keras.initializers.GlorotUniform()):
    inputs = Input(shape=(self.input_dim,), name='input')
    net = inputs
    net = Dense(64, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_1')(net)
    outputs = Dense(self.latent_dim, activation='linear', kernel_initializer=initializer,
                    name='output')(net)

    return Model(inputs=inputs, outputs=outputs, name='Encoder')

##Generator
def get_generator(self, initializer=tf.keras.initializers.GlorotUniform()):
    inputs = Input(shape=(self.latent_dim,), name='input')
    net = inputs
    net = Dense(64, activation='relu', kernel_initializer=initializer,
                name='layer_1')(net)
    net = Dense(128, activation='relu', kernel_initializer=initializer,
                name='layer_2')(net)
    outputs = Dense(self.input_dim, activation='linear', kernel_initializer=initializer,
                    name='output')(net)

    return Model(inputs=inputs, outputs=outputs, name='Generator')

##Discriminator
def get_discriminator(self, initializer=tf.keras.initializers.GlorotUniform()):
    #D(x)
    inputs1 = Input(shape=(self.input_dim,), name='real')
    net = inputs1
    net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_1')(net)
    dx = Dropout(.2)(net)

    #D(z)
    inputs2 = Input(shape=(self.latent_dim,), name='noise')
    net = inputs2
    net = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                name='layer_2')(net)
    dz = Dropout(.2)(net)

    #D(x) と D(z) を結合
    conet = Concatenate(axis=1)([dx,dz])

    #D(x,z)
    conet = Dense(128, activation=LeakyReLU(alpha=0.1), kernel_initializer=initializer,
                  name='layer_3')(conet)
    conet = Dropout(.2)(conet)
    outputs = Dense(1, activation='sigmoid', kernel_initializer=initializer,
                    name='output')(conet)

    return Model(inputs=[inputs1,inputs2], outputs=outputs, name='Discriminator')

3.モデル学習

論文を参考に以下の様に実装しました。
・著者のソースコードでは、ロス計算時の直前にシグモイド関数で変換していますが、項2に記載の通りシグモイド関数による変換はネットワークに組み込んだので、ここでは変換しません。
・Discriminator等の各部分モデルは、EfficientGANクラスの定義時では無く学習時に定義され、また、入力の次元数が未定義なら、このタイミングで学習データの次元数が入力の次元数に設定されます。

#Train model
def fit(self, X_train, epochs=50, batch_size=50, loss=tf.keras.losses.BinaryCrossentropy(),
        optimizer=tf.keras.optimizers.Adam(lr=1e-5, beta_1=0.5), test=tuple(), early_stop_num=50,
        verbose=1):

    #学習データをnumpy型に変換
    X_train = np.array(X_train)

    #"input_dim"が1以上で無いなら(未定義想定)、学習データの次元数を設定
    if not self.input_dim >= 1: self.input_dim = X_train.shape[1]

    #Discriminatorモデル定義
    self.dis = self.get_discriminator()
    self.dis.compile(loss=loss, optimizer=optimizer)        

    #Encoder学習用のモデル定義(Encoder → Discriminator)
    self.enc = self.get_encoder()
    x = Input(shape=(self.input_dim,))
    z_gen = self.enc(x)
    valid = self.dis([x, z_gen])
    enc_dis = Model(inputs=x, outputs=valid, name='enc_to_dis')
    enc_dis.compile(loss=loss, optimizer=optimizer) 

    #Generator学習用のモデル定義(Generator → Discriminator)
    self.gen = self.get_generator()
    z = Input(shape=(self.latent_dim,))
    x_gen = self.gen(z)
    valid = self.dis([x_gen, z])
    gen_dis = Model(inputs=z, outputs=valid, name='gen_to_dis')
    gen_dis.compile(loss=loss, optimizer=optimizer)          

    #Training
    min_val_loss = float('inf')
    stop_count = 0
    for i in range(epochs):    
        #Discriminatorを学習機能をオンに
        self.dis.trainable = True

        #学習データから、"batch_size"の半数をランダムに取得
        idx = np.random.randint(0, X_train.shape[0], batch_size//2)
        real_data = X_train[idx]

        #"batch_size"の半数だけノイズを生成し、各生成ノイズからデータ生成
        noise = np.random.normal(0, 1, (len(idx), self.latent_dim))
        gen_data = self.gen.predict(noise)

        #取得した各学習データから、ノイズを生成
        enc_noise = self.enc.predict(real_data)

        #Discriminator学習
        d_enc_loss = self.dis.train_on_batch([real_data, enc_noise], np.ones((len(real_data), 1)))
        d_gen_loss = self.dis.train_on_batch([gen_data, noise], np.zeros((len(gen_data), 1)))
        d_loss = d_enc_loss + d_gen_loss

        #Discriminatorの学習機能をオフに
        self.dis.trainable = False

        #Encoder学習
        e_loss = enc_dis.train_on_batch(real_data, np.zeros((len(real_data), 1)))

        #Generator学習
        g_loss = gen_dis.train_on_batch(noise, np.ones((len(noise), 1)))

        #評価用データの設定有れば、当該データのloss計算とearly stop検討
        if len(test)>0:
            #評価用データ取得
            X_test = test[0]
            y_true = test[1]

            #評価用データの推論
            proba = self.predict(X_test)
            proba = minmax_scale(proba)

            #loss計算
            val_loss = tf.keras.losses.binary_crossentropy(y_true, proba).numpy()

            #評価用データのlossが今までより減衰していれば、最小lossを更新し、early stopカウントをリセット
            if min_val_loss > val_loss:                                        
                min_val_loss = val_loss #Update "min_val_loss" to "val_loss"
                stop_count = 0 #Change "stop_count" to 0
            #If "stop_count" is equal or more than "early_stop_num", training is end
            #指定回数の間で、評価用データのlossが減衰しなければ、学習ストップ
            elif stop_count >= early_stop_num:
                break
            else:
                stop_count += 1               

        #学習状況の表示
        if verbose==1 and i%100==0:
            if len(test)==0: print(f'epoch{i}-> d_loss:{d_loss}  e_loss:{e_loss}  g_loss:{g_loss}')
            else: print(f'epoch{i}-> d_loss:{d_loss}  e_loss:{e_loss}  g_loss:{g_loss}  val_loss:{val_loss}')

4.モデル推論(異常検知)

論文を参考に以下の様に実装しました。
異常スコアは論文の通り、下記式にて算出しています(高いほど異常)。

A(x)=αL_G(x)+(1-α)L_D(x)・・・Anomaly Score
L_G(x)=||x-G(E(x))||_1・・・Generator Loss
L_D(x)=σ(D(x,E(x)),1)・・・Discriminator Loss

ちなみに、著者のソースコードではDiscriminatorLossが以下の様になっており、論文の内容とどちらを取ればよいか迷いましたが、今回は論文の通り上記式で実装しました。

L_D(x)=σ(D(G(E(x)),E(x)),1)
#Test model
def predict(self, X_test, weight=0.9, degree=1):

    #評価用データをnumpy型に変換
    X_test = np.array(X_test)

    #評価用データからノイズ生成
    z_gen = self.enc.predict(X_test)

    #評価用データから生成したノイズで、再度データを生成
    reconstructs = self.gen.predict(z_gen)

    #元のデータと再度生成したデータの差を各説明変数毎に計算しそれらを合算       
    #学習データと同様のデータであれば、上手く学習出来たEncoderとGeneratorで、入力データを再生成できるはず。
    delta = X_test - reconstructs
    gen_score = tf.norm(delta, ord=degree, axis=1).numpy()

    #DiscriminatorでEncoderの入出力を推論
    l_encoder = self.dis.predict([X_test, z_gen])

    #上記推論結果と、全て1の配列とのクロスエントロピーを算出       
    #学習データと同様のデータであれば、Encoderの入出力をDiscriminatorで推論した結果は1となるはず
    dis_score = tf.keras.losses.binary_crossentropy(np.ones((len(X_test), 1)), l_encoder).numpy()

    #Return anomality calculated "gen_score" and "dis_score"
    return weight*gen_score + (1-weight)*dis_score

 

以上、ご閲覧頂きありがとうございました。
何か気になる点などあれば、ご指摘頂けると幸いです。