PyTorchに基づく畳み込みニューラルネットワーク(CNN)によるMNIST分類モデルの実現


最近初めてKaggleをプレイし、PyTorchで4つの似ていないボリュームニューラルネットワーク(少しInception v 1の構造がある)を手書きし、テスト精度は約99.2%だった.
# -*- coding: utf-8 -*-
"""Digit Recognizer.ipynb

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1sbq5hjhjO3I5jQQAN_5-mo7Hx4pSKcjH
"""

import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import csv

dataset = list()
with open('train.csv','r') as f:
    reader = csv.reader(f)
    t = 0
    for row in reader:
        if t > 0:
            dataset.append(row)
        t = t + 1

dataset = np.array(dataset, dtype = np.float32)
print(dataset.shape)
#print(dataset[0])
#print(dataset)
np.random.shuffle(dataset)
#print(dataset[0])
#print(dataset)

VALID_RATE = 0.01
seg = int((1 - VALID_RATE) * len(dataset))

raw_train_data, raw_valid_data = dataset[:seg], dataset[seg:]

print(raw_train_data.shape, raw_valid_data.shape)

train_data_x = torch.empty(seg, 1, 28, 28, dtype=torch.float32)
train_data_y = torch.empty(seg, 1, dtype=int)

for i in range(seg):
    train_data_x[i] = (torch.from_numpy(raw_train_data[i][1:].reshape(1, 28, 28)) / 255.0 - 0.5) / 0.5
    train_data_y[i] = int(raw_train_data[i][0])

valid_data_x = torch.empty(42000 - seg, 1, 28, 28, dtype=torch.float32)
valid_data_y = torch.empty(42000 - seg, 1, dtype=int)

for i in range(42000 - seg):
    valid_data_x[i] = (torch.from_numpy(raw_valid_data[i][1:].reshape(1, 28, 28)) / 255.0 - 0.5) / 0.5
    valid_data_y[i] = int(raw_valid_data[i][0])

train_data_y = torch.squeeze(train_data_y)
valid_data_y = torch.squeeze(valid_data_y)
#print(valid_data_y[0].dtype)

def get_num_correct(a, b):
  return a.argmax(dim=1).eq(b).sum().item()


class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=6, kernel_size=5, padding=0)
        self.conv2 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=5, padding=2)
        self.conv4 = nn.Conv2d(in_channels=6, out_channels=16, kernel_size=3, padding=1)

        self.bn1 = nn.BatchNorm2d(48)

        self.conv5 = nn.Conv2d(in_channels=48, out_channels=48, kernel_size=1, padding=0)

        self.conv6 = nn.Conv2d(in_channels=48, out_channels=64, kernel_size=3, padding=1)
        self.conv7 = nn.Conv2d(in_channels=48, out_channels=64, kernel_size=5, padding=2)
        self.conv8 = nn.Conv2d(in_channels=48, out_channels=64, kernel_size=3, padding=1)

        self.bn2 = nn.BatchNorm2d(192)

        self.conv9 = nn.Conv2d(in_channels=192, out_channels=128, kernel_size=1, padding=0)

        self.fc = nn.Linear(in_features=128 * 4 * 4, out_features=128)
        self.out = nn.Linear(in_features=128, out_features=10)
        self.drop = nn.Dropout(p=0.5)

    def forward(self, t):
        t = F.relu(self.conv1(t))
        t = F.max_pool2d(t, kernel_size=2, stride=2, padding=0)

        t1 = F.relu(self.conv2(t))

        t2 = F.relu(self.conv3(t))

        t3 = F.max_pool2d(t, kernel_size=3, stride=1, padding=1)
        t3 = F.relu(self.conv4(t))

        t = torch.cat([t1, t2, t3], dim=1)
        t = self.bn1(t)

        t = F.relu(self.conv5(t))

        t1 = F.relu(self.conv6(t))

        t2 = F.relu(self.conv7(t))

        t3 = F.max_pool2d(t, kernel_size=3, stride=1, padding=1)
        t3 = F.relu(self.conv8(t))

        t = torch.cat([t1, t2, t3], dim=1)
        t = self.bn2(t)

        t = F.relu(self.conv9(t))

        t = F.max_pool2d(t, kernel_size=3, stride=3, padding=0)

        t = t.reshape(-1, 128 * 4 * 4)

        t = F.relu(self.fc(t))
        t = self.drop(t)
        t = self.out(t)
        t = self.drop(t)

        return t

net = Model()
bsize = 20

opt = optim.SGD(net.parameters(), lr=0.008, momentum=0.8)

def check_valid():
    net.eval()

    with torch.no_grad():
        _total_loss = 0
        _total_correct = 0

        for i in range(0, 42000 - seg, bsize):
            x, y = valid_data_x[i:i + bsize], valid_data_y[i:i + bsize]

            yp = net.forward(x)

            _loss = F.cross_entropy(yp, y)

            _total_loss += _loss.item()
            _total_correct += get_num_correct(yp, y)

        print("Valid Accuracy:", _total_correct / (42000 - seg), "loss:", _total_loss)
    return _total_correct / (42000 - seg)


def train():
    idx = list()
    acc_train = list()
    acc_valid = list()

    print("Train:")

    net.train()

    for epoch in range(20):
        total_loss = 0
        total_correct = 0
        #t = 0
        for i in range(0, seg, bsize):
            # print(i, i + bsize - 1)
            #print(t*bsize, (t+1)*bsize-1)
            #t = t + 1
            images, labels = train_data_x[i:i + bsize], train_data_y[i:i + bsize]

            preds = net.forward(images)

            loss = F.cross_entropy(preds, labels)

            opt.zero_grad()
            loss.backward()
            opt.step()

            total_loss += loss.item()
            total_correct += get_num_correct(preds, labels)

        print("Epoch", epoch + 1, "
Train Accuracy:", total_correct / seg, "Loss:", total_loss) idx.append(epoch + 1) acc_train.append(total_correct / seg) acc_valid.append(check_valid()) plt.plot(idx, acc_train, ls="-", lw=2, label="train", c='red') plt.plot(idx, acc_valid, ls="-", lw=2, label="valid", c='orange') plt.legend() plt.show() train() testset = list() with open('test.csv','r') as f: reader = csv.reader(f) t = 0 for row in reader: if t > 0: testset.append(row) t = t + 1 testset = np.array(testset, dtype = np.float32) test_data_x = torch.empty(28000, 1, 28, 28, dtype=torch.float32) test_data_y_hat = torch.empty(28000, 1, dtype=int) for i in range(28000): test_data_x[i] = (torch.from_numpy(testset[i].reshape(1, 28, 28)) / 255.0 - 0.5) / 0.5 def test(): net.eval() with torch.no_grad(): for i in range(0, 28000): x = test_data_x[i] yp = net.forward(x.unsqueeze(dim=0)) test_data_y_hat[i] = yp.argmax(dim=1) test() test_data_y_hat = test_data_y_hat.squeeze() with open('out.csv','w',newline='') as f: writer = csv.writer(f) writer.writerow(["ImageId", "Label"]) for i in range(28000): row = list([i+1, test_data_y_hat[i].item()]) writer.writerow(row)