Keras DenseNet構造操作を実現します。


DenseNet構造は16年にHuagg GaoやLiu Zhuangなどによって提出され、CRP 2017で最優秀論文に選ばれました。ネットワークのコア構造は、以下のようなDenseブロックであり、各Denseブロックには、複数のDense層、すなわち、下図のようなH 1〜H 4が存在する。各Dense層の間は互いに接続されています。すなわちH 1の入力はx 0で、出力はx 1で、H 2の入力は[x 0,x 1]で、出力はx 2で、順次類推します。最終的なDenseブロックの出力は[x 0,x 1,x 2,x 3,x 4]です。このような構造は個人的には生物学におけるニューロン接続のように感じられ、ネットワークにおける特徴情報の利用効率をより効果的に向上させることができるはずである。

DenseNetの他の構造は非常に一般的な畳み込み神経ネットワーク構造に似ています。論文で提供されたネットワーク構造図(下図)を参照してください。しかし、個人的な感覚では、DenseNetのこのような構造は、Denseブロック内のそれぞれのDense層を直接相互に接続してネットワークの構造を縮小する必要がないかもしれないなど、さらなる最適化の方法が存在するはずである。隣接しないDenseブロック間で簡単なサンプリング動作により接続することもでき、さらにネットワークの異なるスケールの特徴の利用効率を向上させることができる。

DenseNetの密集的な接続方式のため、同じ容量のネットワークを構築する際に必要なパラメータの数は、その前に提案されたような構造よりも遥かに小さい。さらに、個人的な感覚では、Denseブロックは、より多くのパラメータを持つ畳み込み層に対して効率的に代替されるものと見なされるべきである。そのため、U-Netなどのネットワーク構造を結合して、さらにネットワーク性能を最適化することができます。例えば、U-netのすべての畳み込み層をDenseNetの構造に全部換えるだけで、ネットワークサイズを著しく圧縮することができます。
以下はKerasに基づいてDenseNet-BC構造を実現します。まずDense層を定義し、論文の説明に基づいて以下のように構築する。

def DenseLayer(x, nb_filter, bn_size=4, alpha=0.0, drop_rate=0.2):
 
 # Bottleneck layers
 x = BatchNormalization(axis=3)(x)
 x = LeakyReLU(alpha=alpha)(x)
 x = Conv2D(bn_size*nb_filter, (1, 1), strides=(1,1), padding='same')(x)
 
 # Composite function
 x = BatchNormalization(axis=3)(x)
 x = LeakyReLU(alpha=alpha)(x)
 x = Conv2D(nb_filter, (3, 3), strides=(1,1), padding='same')(x)
 
 if drop_rate: x = Dropout(drop_rate)(x)
 
 return x
論文の原文では、1*1巻の核を積み上げた畳層をボットネル層として使用して計算効率を最適化することを提案しています。原文で使用されている活性化関数はすべてレluですが、個人的な習慣はleakyreluで構築されています。
その後、Dense層でDenseブロックを構築します。以下の通りです。

def DenseBlock(x, nb_layers, growth_rate, drop_rate=0.2):
 
 for ii in range(nb_layers):
  conv = DenseLayer(x, nb_filter=growth_rate, drop_rate=drop_rate)
  x = concatenate([x, conv], axis=3)
 return x
論文で述べたように、各Dense層の出力は入力と融合した後、次のDense層の入力として密集接続を実現する。
最後は各Denseブロック間の遷移層であり、以下の通りである。

def TransitionLayer(x, compression=0.5, alpha=0.0, is_max=0):
 
 nb_filter = int(x.shape.as_list()[-1]*compression)
 x = BatchNormalization(axis=3)(x)
 x = LeakyReLU(alpha=alpha)(x)
 x = Conv2D(nb_filter, (1, 1), strides=(1,1), padding='same')(x)
 if is_max != 0: x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
 else: x = AveragePooling2D(pool_size=(2, 2), strides=2)(x)
 
 return x
本論文では、均一池化層を用いてサンプリングを行うことを提案したが、エッジ特徴抽出において最大池化層効果がより良いはずであり、ここでは関連インターフェースを追加した。
上記の構造を論文で提出した構造に従って綴り合わせます。ここで選択したパラメータは論文で述べたL=100、k=12で、ネットワーク接続は以下の通りです。

growth_rate = 12
inpt = Input(shape=(32,32,3))
 
x = Conv2D(growth_rate*2, (3, 3), strides=1, padding='same')(inpt)
x = BatchNormalization(axis=3)(x)
x = LeakyReLU(alpha=0.1)(x)
x = DenseBlock(x, 12, growth_rate, drop_rate=0.2)
x = TransitionLayer(x)
x = DenseBlock(x, 12, growth_rate, drop_rate=0.2)
x = TransitionLayer(x)
x = DenseBlock(x, 12, growth_rate, drop_rate=0.2)
x = BatchNormalization(axis=3)(x)
x = GlobalAveragePooling2D()(x)
x = Dense(10, activation='softmax')(x)
 
model = Model(inpt, x)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
私達はすでにネットワークの架設を完成しましたが、ネットワーク自体のパラメータ数も0.5 Mしかありません。このように実現されたネットワークはDenseブロックの中にありますので、毎回のconcatは新しいメモリ空間を開拓しなければなりません。実際に必要なメモリ空間が非常に大きいです。作者は17年の時に、専門的に関連する技術報告書を書きました。https://arxiv.org/abs/1707.06990はメモリの節約方法を説明していますが、単純にkersasで実現するのは面倒くさいです。次のブログはpytouchの枠組みで実現します。
最後にネットワークの完全コードを放出します。

import numpy as np
import keras
from keras.models import Model, save_model, load_model
from keras.layers import Input, Dense, Dropout, BatchNormalization, LeakyReLU, concatenate
from keras.layers import Conv2D, MaxPooling2D, AveragePooling2D, GlobalAveragePooling2D
 
## data
import pickle
 
data_batch_1 = pickle.load(open("cifar-10-batches-py/data_batch_1", 'rb'), encoding='bytes')
data_batch_2 = pickle.load(open("cifar-10-batches-py/data_batch_2", 'rb'), encoding='bytes')
data_batch_3 = pickle.load(open("cifar-10-batches-py/data_batch_3", 'rb'), encoding='bytes')
data_batch_4 = pickle.load(open("cifar-10-batches-py/data_batch_4", 'rb'), encoding='bytes')
data_batch_5 = pickle.load(open("cifar-10-batches-py/data_batch_5", 'rb'), encoding='bytes')
 
train_X_1 = data_batch_1[b'data']
train_X_1 = train_X_1.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
train_Y_1 = data_batch_1[b'labels']
 
train_X_2 = data_batch_2[b'data']
train_X_2 = train_X_2.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
train_Y_2 = data_batch_2[b'labels']
 
train_X_3 = data_batch_3[b'data']
train_X_3 = train_X_3.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
train_Y_3 = data_batch_3[b'labels']
 
train_X_4 = data_batch_4[b'data']
train_X_4 = train_X_4.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
train_Y_4 = data_batch_4[b'labels']
 
train_X_5 = data_batch_5[b'data']
train_X_5 = train_X_5.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
train_Y_5 = data_batch_5[b'labels']
 
train_X = np.row_stack((train_X_1, train_X_2))
train_X = np.row_stack((train_X, train_X_3))
train_X = np.row_stack((train_X, train_X_4))
train_X = np.row_stack((train_X, train_X_5))
 
train_Y = np.row_stack((train_Y_1, train_Y_2))
train_Y = np.row_stack((train_Y, train_Y_3))
train_Y = np.row_stack((train_Y, train_Y_4))
train_Y = np.row_stack((train_Y, train_Y_5))
train_Y = train_Y.reshape(50000, 1).transpose(0, 1).astype("int32")
train_Y = keras.utils.to_categorical(train_Y)
 
test_batch = pickle.load(open("cifar-10-batches-py/test_batch", 'rb'), encoding='bytes')
test_X = test_batch[b'data']
test_X = test_X.reshape(10000, 3, 32, 32).transpose(0, 2, 3, 1).astype("float")
test_Y = test_batch[b'labels']
test_Y = keras.utils.to_categorical(test_Y)
 
train_X /= 255
test_X /= 255
 
# model
 
def DenseLayer(x, nb_filter, bn_size=4, alpha=0.0, drop_rate=0.2):
 
 # Bottleneck layers
 x = BatchNormalization(axis=3)(x)
 x = LeakyReLU(alpha=alpha)(x)
 x = Conv2D(bn_size*nb_filter, (1, 1), strides=(1,1), padding='same')(x)
 
 # Composite function
 x = BatchNormalization(axis=3)(x)
 x = LeakyReLU(alpha=alpha)(x)
 x = Conv2D(nb_filter, (3, 3), strides=(1,1), padding='same')(x)
 
 if drop_rate: x = Dropout(drop_rate)(x)
 
 return x
 
def DenseBlock(x, nb_layers, growth_rate, drop_rate=0.2):
 
 for ii in range(nb_layers):
  conv = DenseLayer(x, nb_filter=growth_rate, drop_rate=drop_rate)
  x = concatenate([x, conv], axis=3)
  
 return x
 
def TransitionLayer(x, compression=0.5, alpha=0.0, is_max=0):
 
 nb_filter = int(x.shape.as_list()[-1]*compression)
 x = BatchNormalization(axis=3)(x)
 x = LeakyReLU(alpha=alpha)(x)
 x = Conv2D(nb_filter, (1, 1), strides=(1,1), padding='same')(x)
 if is_max != 0: x = MaxPooling2D(pool_size=(2, 2), strides=2)(x)
 else: x = AveragePooling2D(pool_size=(2, 2), strides=2)(x)
 
 return x
 
growth_rate = 12
 
inpt = Input(shape=(32,32,3))
 
x = Conv2D(growth_rate*2, (3, 3), strides=1, padding='same')(inpt)
x = BatchNormalization(axis=3)(x)
x = LeakyReLU(alpha=0.1)(x)
x = DenseBlock(x, 12, growth_rate, drop_rate=0.2)
x = TransitionLayer(x)
x = DenseBlock(x, 12, growth_rate, drop_rate=0.2)
x = TransitionLayer(x)
x = DenseBlock(x, 12, growth_rate, drop_rate=0.2)
x = BatchNormalization(axis=3)(x)
x = GlobalAveragePooling2D()(x)
x = Dense(10, activation='softmax')(x)
 
model = Model(inpt, x)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
 
model.summary()
 
for ii in range(10):
 print("Epoch:", ii+1)
 model.fit(train_X, train_Y, batch_size=100, epochs=1, verbose=1)
 score = model.evaluate(test_X, test_Y, verbose=1)
 print('Test loss =', score[0])
 print('Test accuracy =', score[1])
 
save_model(model, 'DenseNet.h5')
model = load_model('DenseNet.h5')
 
pred_Y = model.predict(test_X)
score = model.evaluate(test_X, test_Y, verbose=0)
print('Test loss =', score[0])
print('Test accuracy =', score[1])
以上のこのKerasはDenseNetの構造操作を実現しました。つまり、小編集は皆さんに全部の内容を共有しました。参考にしてほしいです。皆さんもよろしくお願いします。