深さ学習アルゴリズム実践6---論理回帰アルゴリズム応用


前編では,深さ学習アルゴリズムの実装を紹介し,MNIST手書きデジタル認識を例に,このアルゴリズムの有効性を検証した.
しかし,論理回帰アルゴリズムを学習する目的は,アルゴリズム自体を学習するのではなく,我々の実際の問題を解決することである.論理回帰アルゴリズムの実際の応用はまだ広く、例えば医学分野の疾病予測では、疾病関連要素をリストし、ある患者の具体的な状況に基づいて、論理回帰アルゴリズムを応用して、その患者が何らかの病気を患っているかどうかを判断することができる.もちろん,論理回帰アルゴリズムには限界があり,線形に分けられる分類問題を処理するのに適しているが,線形に分けられない分類問題に対しては,このアルゴリズムの価値が大幅に低下する.しかし,論理回帰アルゴリズムを,隠蔽層のないフィードフォワードネットワークと見なすことができ,隠蔽層を増やすことで,種々の線形不可分問題を処理することができる.Theanoのフレームワークを借りて、BPネットワーク、多層ボリュームネットワーク(LeNet)を後述するが、Theanoではこれらのモデルを実現することは非常に簡単であることがわかる.
本題に戻ると、論理回帰アルゴリズムで実際の問題を解決するには、load_を主に変更する必要があります.data関数は、私たちが規定したデータソースからデータを読み出すようにします.ここではまず、seg_というファイル名のトレーニングデータ読み込みツールクラスSegLoaderを設計します.loader.py、コードは次のとおりです.
from __future__ import print_function

__docformat__ = 'restructedtext en'

import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T

class SegLoader(object):
    def load_data(self, dataset):
        samplesNumber = 6
        features = 2
        train_set = (numpy.ndarray(shape=(samplesNumber, features), dtype=numpy.float32), numpy.ndarray(shape=(samplesNumber), dtype=int))
        self.prepare_dataset(train_set)
        valid_set = (train_set[0].copy(), train_set[1].copy())
        test_set = (train_set[0].copy(), train_set[1].copy())
        test_set_x, test_set_y = self.shared_dataset(test_set)
        valid_set_x, valid_set_y = self.shared_dataset(valid_set)
        train_set_x, train_set_y = self.shared_dataset(train_set)
        rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]
        return rval

    def shared_dataset(self, data_xy, borrow=True):
        data_x, data_y = data_xy
        shared_x = theano.shared(numpy.asarray(data_x,
                                               dtype=theano.config.floatX),
                                 borrow=borrow)
        shared_y = theano.shared(numpy.asarray(data_y,
                                               dtype=theano.config.floatX),
                                 borrow=borrow)
        return shared_x, T.cast(shared_y, 'int32')

    def prepare_dataset(self, dataset):
        dataset[0][0][0] = 1.0
        dataset[0][0][1] = 1.0
        dataset[1][0] = 1

        dataset[0][1][0] = 2.0
        dataset[0][1][1] = 2.0
        dataset[1][1] = 1

        dataset[0][2][0] = 3.0
        dataset[0][2][1] = 3.0
        dataset[1][2] = 1

        dataset[0][3][0] = 1.5
        dataset[0][3][1] = 2.0
        dataset[1][3] = 0

        dataset[0][4][0] = 2.5
        dataset[0][4][1] = 4.0
        dataset[1][4] = 0

        dataset[0][5][0] = 3.5
        dataset[0][5][1] = 7.0
        dataset[1][5] = 0
上のコードは非常に簡単で、メタグループtrainを生成します.set、2つの要素を含み、1つ目の要素はfloat 32の2次元配列であり、各行はサンプルを表し、1つ目の列はX座標を表し、2つ目の列はY座標を表し、train_setメタグループの2番目の要素は1次元整数配列であり、各要素は1つのサンプルの分類結果を表し、ここには2つの大きなクラスがあり、1はY=Xの直線を表し、0はこの直線を表さず、prepare_Datasetは6つのトレーニングサンプルを用意しています.この問題は非常に簡単なので,6つのサンプルは基本的に十分であるが,実際の問題ではかなりのサンプル量が必要であることは明らかである.
次に、この線形分割の実行エンジンLrSegEngineを定義し、ソースファイルはlr_である.seg_engine.py、コードは次のとおりです.
from __future__ import print_function

__docformat__ = 'restructedtext en'

import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T
from logistic_regression import LogisticRegression
from seg_loader import SegLoader

class LrSegEngine(object):
    def __init__(self):
        print("Logistic Regression MNIST Engine")
        self.learning_rate = 0.13
        self.n_epochs = 1000
        self.batch_size = 1 # 600
        self.dataset = 'mnist.pkl.gz'

    def train(self):
        print("Yantao:train the model")
        loader = SegLoader()
        datasets = loader.load_data(self.dataset)
        train_set_x, train_set_y = datasets[0]
        valid_set_x, valid_set_y = datasets[1]
        test_set_x, test_set_y = datasets[2]
        n_train_batches = train_set_x.get_value(borrow=True).shape[0] // self.batch_size
        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0] // self.batch_size
        n_test_batches = test_set_x.get_value(borrow=True).shape[0] // self.batch_size
        index = T.lscalar()
        x = T.matrix('x')
        y = T.ivector('y')
        # in:x,y out: 1 in y=x otherwise 0
        classifier = LogisticRegression(input=x, n_in=2, n_out=2)
        cost = classifier.negative_log_likelihood(y)
        test_model = theano.function(
            inputs=[index],
            outputs=classifier.errors(y),
            givens={
                x: test_set_x[index * self.batch_size: (index + 1) * self.batch_size],
                y: test_set_y[index * self.batch_size: (index + 1) * self.batch_size]
            }
        )
        validate_model = theano.function(
            inputs=[index],
            outputs=classifier.errors(y),
            givens={
                x: valid_set_x[index * self.batch_size: (index + 1) * self.batch_size],
                y: valid_set_y[index * self.batch_size: (index + 1) * self.batch_size]
            }
        )
        g_W = T.grad(cost=cost, wrt=classifier.W)
        g_b = T.grad(cost=cost, wrt=classifier.b)
        updates = [(classifier.W, classifier.W - self.learning_rate * g_W),
               (classifier.b, classifier.b - self.learning_rate * g_b)]
        train_model = theano.function(
            inputs=[index],
            outputs=cost,
            updates=updates,
            givens={
                x: train_set_x[index * self.batch_size: (index + 1) * self.batch_size],
                y: train_set_y[index * self.batch_size: (index + 1) * self.batch_size]
            }
        )
        patience = 5000  
        patience_increase = 2  
        improvement_threshold = 0.995  
        validation_frequency = min(n_train_batches, patience // 2)
        best_validation_loss = numpy.inf
        test_score = 0.
        start_time = timeit.default_timer()
        done_looping = False
        epoch = 0
        while (epoch < self.n_epochs) and (not done_looping):
            epoch = epoch + 1
            for minibatch_index in range(n_train_batches):
                minibatch_avg_cost = train_model(minibatch_index)
                # iteration number
                iter = (epoch - 1) * n_train_batches + minibatch_index
                if (iter + 1) % validation_frequency == 0:
                    # compute zero-one loss on validation set
                    validation_losses = [validate_model(i)
                                     for i in range(n_valid_batches)]
                    this_validation_loss = numpy.mean(validation_losses)
                    print(
                        'epoch %i, minibatch %i/%i, validation error %f %%' %
                        (
                            epoch,
                            minibatch_index + 1,
                            n_train_batches,
                            this_validation_loss * 100.
                        )
                    )
                    if this_validation_loss < best_validation_loss:
                        #improve patience if loss improvement is good enough
                        if this_validation_loss < best_validation_loss * improvement_threshold:
                            patience = max(patience, iter * patience_increase)
                        best_validation_loss = this_validation_loss
                        # test it on the test set
                        test_losses = [test_model(i)
                                   for i in range(n_test_batches)]
                        test_score = numpy.mean(test_losses)
                        print(
                            (
                                '     epoch %i, minibatch %i/%i, test error of'
                                ' best model %f %%'
                            ) %
                            (
                                epoch,
                                minibatch_index + 1,
                                n_train_batches,
                                test_score * 100.
                            )
                        )
                        # save the best model
                        with open('best_model.pkl', 'wb') as f:
                            pickle.dump(classifier, f)
                if patience <= iter:
                    done_looping = True
                    break
        end_time = timeit.default_timer()
        print(
            (
                'Optimization complete with best validation score of %f %%,'
                'with test performance %f %%'
            )
            % (best_validation_loss * 100., test_score * 100.)
        )
        print('The code run for %d epochs, with %f epochs/sec' % (
            epoch, 1. * epoch / (end_time - start_time)))
        print(('The code for file ' +
               os.path.split(__file__)[1] +
               ' ran for %.1fs' % ((end_time - start_time))), file=sys.stderr)

    def run(self, data):
        print("run the model")
        classifier = pickle.load(open('best_model.pkl', 'rb'))
        predict_model = theano.function(
            inputs=[classifier.input],
            outputs=classifier.y_pred
        )
        rst = predict_model(data)
        print(rst)
ここでのtrain方法は、前編の博文処理MNIST手書きデジタル認識のコードとほぼ一致しており、以下の点に注意する必要がある.まず、6つのサンプルしかないため、サンプルロットのサイズを1に設定する(MNIST手書きデジタル認識では、6万個のトレーニングサンプルがあるため、ロットサイズは600).次に、論理回帰モデルを初期化するときに、次元n_を入力するinは、2に設定され、サンプルがx,y座標の2つの特徴しかないことを示し、出力次元も2であり、2つのカテゴリがあることを示し、1はy=x線であり、0はオンラインではないことを表す.
次に論理回帰モデルクラスLogistic Regressionを定義し、ソースファイルはlogistic_regression.py、コードは次のとおりです.
from __future__ import print_function

__docformat__ = 'restructedtext en'

import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T

class LogisticRegression(object):  
    def __init__(self, input, n_in, n_out):  
        self.W = theano.shared(  
            value=numpy.zeros(  
                (n_in, n_out),  
                dtype=theano.config.floatX  
            ),  
            name='W',  
            borrow=True  
        )  
        self.b = theano.shared(  
            value=numpy.zeros(  
                (n_out,),  
                dtype=theano.config.floatX  
            ),  
            name='b',  
            borrow=True  
        )  
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)  
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)  
        self.params = [self.W, self.b]  
        self.input = input  
        print("Yantao: ***********************************")
  
    def negative_log_likelihood(self, y):  
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])  
  
    def errors(self, y):  
        if y.ndim != self.y_pred.ndim:  
            raise TypeError(  
                'y should have the same shape as self.y_pred',  
                ('y', y.type, 'y_pred', self.y_pred.type)  
            )  
        if y.dtype.startswith('int'):  
            return T.mean(T.neq(self.y_pred, y))  
        else:  
            raise NotImplementedError()  
上のコードは前編のブログとほとんど変わりませんが、1つのファイルに単独で保存されているだけです.
次はモデルトレーニングlr_train.py、コードは次のとおりです.
from __future__ import print_function

__docformat__ = 'restructedtext en'

import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T

from logistic_regression import LogisticRegression
from seg_loader import SegLoader
from lr_seg_engine import LrSegEngine

if __name__ == '__main__':
    engine = LrSegEngine()
    engine.train()
上のコードは論理回帰分割のエンジンクラスのtrainメソッドを簡単に呼び出し、モデルのトレーニングを完了し、最適な結果をbest_に保存します.model.pklファイルにあります.
モデルの訓練が終わったら、モデルを分類することができます.lr_run.pyのコードは次のとおりです.
from seg_loader import SegLoader
from lr_seg_engine import LrSegEngine

if __name__ == '__main__':
    print("test program v1.0")
    engine = LrSegEngine()
    data = [[2.0, 2.0]]
    print(data)
    engine.run(data)
上のコードは、まず2次元配列を初期化し、サンプル要素が1つしかなく、座標が(2.0,2.0)であり、論理回帰分割エンジンのrunメソッドを呼び出し、分類結果を与え、このプログラムを実行すると、以下に示すような結果が得られます.
test program v1.0
Logistic Regression MNIST Engine
[[2.0, 2.0]]
run the model
[1]