深さ学習(Pytorch)の道に着手する---3回目のカードを打つ


引用する
今日は2つの深い学習実戦任務を共有します:画像分類とテキスト分類.ボリュームニューラルネットワークとループニューラルネットワークをそれぞれ用いて今回の実戦任務を遂行した.
画像の分類
ここで使用するデータセットはCIFAR-10であり、試合データはトレーニングセットとテストセットに分けられる.トレーニングセットには50000画像が含まれています.テストセットには300000ピクチャが含まれています.2つのデータセットの画像フォーマットはいずれもPNGであり、高さと幅は32ピクセルであり、3つのカラーチャネル(RGB)を有する.画像は10種類のカテゴリをカバーしています:飛行機、自動車、鳥類、猫、鹿、犬、カエル、馬、船、トラック.
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import os
import time

#      
#     
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),  #     0,         32*32
    transforms.RandomHorizontalFlip(),  #         ,        
    transforms.ToTensor(),
    transforms.Normalize((0.4731, 0.4822, 0.4465), (0.2212, 0.1994, 0.2010)), #R,G,B              
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4731, 0.4822, 0.4465), (0.2212, 0.1994, 0.2010)),
])
train_dir = '/cifar-10/train'
test_dir = '/cifar-10/test'

trainset = torchvision.datasets.ImageFolder(root=train_dir, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=256, shuffle=True)

testset = torchvision.datasets.ImageFolder(root=test_dir, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=256, shuffle=False)

classes = ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'forg', 'horse', 'ship', 'truck']

ここでは,データ強化の考え方を用いた.ノイズの添加,反転などの訓練サンプルをランダムに変更することで,モデルのいくつかの属性への依存性を低減し,モデルの汎化能力を向上させることができる.例えば,画像を異なる方法で切り取り,興味のある物体を異なる位置に出現させ,モデルの物体出現位置への依存性を軽減することができる.輝度、色などの要因を調整して、モデルの色に対する感度を低下させることもできます.
class ResidualBlock(nn.Module):   #              torch.nn.Module      

    def __init__(self, inchannel, outchannel, stride=1):
        super(ResidualBlock, self).__init__()
        #torch.nn.Sequential   Sequential  ,                     。
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False), 
            #         ,   nn   Conv2d()
            nn.BatchNorm2d(outchannel), #           
            nn.ReLU(inplace=True), #       ,                 
            nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.shortcut = nn.Sequential() 
        if stride != 1 or inchannel != outchannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )
        #         ,   Y = self.left(X)      X  

    def forward(self, x): #             ,   ReLU           。
        out = self.left(x)
        out += self.shortcut(x)
        out = F.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, ResidualBlock, num_classes=10):
        super(ResNet, self).__init__()
        self.inchannel = 64
        self.conv1 = nn.Sequential( #  3 3x3      7x7    ,      
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        ) 
        self.layer1 = self.make_layer(ResidualBlock, 64,  2, stride=1)
        self.layer2 = self.make_layer(ResidualBlock, 128, 2, stride=2)
        self.layer3 = self.make_layer(ResidualBlock, 256, 2, stride=2)
        self.layer4 = self.make_layer(ResidualBlock, 512, 2, stride=2)
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, block, channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)   #   ResidualBlock    make_layer     stride  
        # ,   num_blocks-1 ResidualBlock   1
        layers = []
        for stride in strides:
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out
        
def ResNet18():
    return ResNet(ResidualBlock)

ResNet-18ネットワーク構造:ResNetフルネームResidual Network残差ネットワーク.Kaiming Heの『Deep Residual Learning for Image Recognition』がCVPRベスト論文を獲得した.彼が提案した深さの残差ネットワークは2015年に画像面の各試合を洗い流し、絶対的な優勢で複数の試合のチャンピオンを獲得したと言える.また,ネットワークの精度を保証する前提で,ネットワークの深さを152層に達し,その後さらに1000の深さに加えた.
#       GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

#      
EPOCH = 20   #       
pre_epoch = 0  #             
LR = 0.1        #   

#     -ResNet
net = ResNet18().to(device)

#            
criterion = nn.CrossEntropyLoss()  #        ,        
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9, weight_decay=5e-4) 
#     mini-batch momentum-SGD,   L2   (    )

#   
if __name__ == "__main__":
    print("Start Training, Resnet-18!")
    num_iters = 0
    for epoch in range(pre_epoch, EPOCH):
        print('
Epoch: %d'
% (epoch + 1)) net.train() sum_loss = 0.0 correct = 0.0 total = 0 for i, data in enumerate(trainloader, 0): # ( 、 ) , , # 0, enumerate( ) 。 num_iters += 1 inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() # # forward + backward outputs = net(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() sum_loss += loss.item() * labels.size(0) _, predicted = torch.max(outputs, 1) # total += labels.size(0) correct += (predicted == labels).sum().item() # 20 batch loss if (i + 1) % 20 == 0: print('[epoch:%d, iter:%d] Loss: %.03f | Acc: %.3f%% ' % (epoch + 1, num_iters, sum_loss / (i + 1), 100. * correct / total)) print("Training Finished, TotalEPOCH=%d" % EPOCH)

テキストの分類
テキスト分類は自然言語処理の一般的なタスクであり、一定長のテキストシーケンスをテキストのカテゴリに変換します.今回のタスクは,テキストから著者の情緒,すなわちLSTMモデルを用いた二分類タスクを判断することである.スタンフォードのIMDbデータセット(Stanford’s Large Movie Review Data set)をテキスト感情分類のデータセットとして使用します.
import collections
import os
import random
import time
from tqdm import tqdm
import torch
from torch import nn
import torchtext.vocab as Vocab
import torch.utils.data as Data
import torch.nn.functional as F
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def read_imdb(folder='train', data_root="/aclImdb_v1/aclImdb"):
    data = []
    for label in ['pos', 'neg']:
        folder_name = os.path.join(data_root, folder, label)
        for file in tqdm(os.listdir(folder_name)):
            with open(os.path.join(folder_name, file), 'rb') as f:
                review = f.read().decode('utf-8').replace('
'
, '').lower() data.append([review, 1 if label == 'pos' else 0]) random.shuffle(data) return data DATA_ROOT = "/aclImdb_v1/" data_root = os.path.join(DATA_ROOT, "aclImdb") train_data, test_data = read_imdb('train', data_root), read_imdb('test', data_root) def get_tokenized_imdb(data): ''' @params: data: , [ ,0/1 ] @return: , ''' def tokenizer(text): return [tok.lower() for tok in text.split(' ')] return [tokenizer(review) for review, _ in data] def get_vocab_imdb(data): ''' @params: data: @return: ,Vocab (freqs, stoi, itos) ''' tokenized_data = get_tokenized_imdb(data) counter = collections.Counter([tk for st in tokenized_data for tk in st]) return Vocab.Vocab(counter, min_freq=5) vocab = get_vocab_imdb(train_data) def preprocess_imdb(data, vocab): ''' @params: data: , vocab: @return: features: , (n, max_l) labels: , (n,) 0/1 ''' max_l = 500 # 0, 500 def pad(x): return x[:max_l] if len(x) > max_l else x + [0] * (max_l - len(x)) tokenized_data = get_tokenized_imdb(data) features = torch.tensor([pad([vocab.stoi[word] for word in words]) for words in tokenized_data]) labels = torch.tensor([score for _, score in data]) return features, labels train_set = Data.TensorDataset(*preprocess_imdb(train_data, vocab)) test_set = Data.TensorDataset(*preprocess_imdb(test_data, vocab)) # # train_features, train_labels = preprocess_imdb(train_data, vocab) # test_features, test_labels = preprocess_imdb(test_data, vocab) # train_set = Data.TensorDataset(train_features, train_labels) # test_set = Data.TensorDataset(test_features, test_labels) # len(train_set) = features.shape[0] or labels.shape[0] # train_set[index] = (features[index], labels[index]) batch_size = 64 train_iter = Data.DataLoader(train_set, batch_size, shuffle=True) test_iter = Data.DataLoader(test_set, batch_size)

画像に比べて、テキストのデータの前処理はより複雑です.主に次のステップがあります.
  • テキスト
  • を読み込む
  • 分詞.各文を分詞し,すなわち1つの文を複数の語(token)に分割し,1つの語のシーケンスに変換する.
  • は辞書を確立し、各語を一意のインデックス(index)にマッピングする.モデル処理を容易にするためには、文字列を数値に変換する必要があります.そのため、まず辞書(vocabulary)を構築し、各語を一意のインデックス番号にマッピングする必要があります.
  • テキストを語のシーケンスからインデックスのシーケンスに変換し、モデルの入力を容易にする.辞書を使用すると、テキストの文を単語のシーケンスからインデックスのシーケンスに変換できます.

  • 次に、モデルの構築を開始します.
    class BiRNN(nn.Module):
        def __init__(self, vocab, embed_size, num_hiddens, num_layers):
            '''
            @params:
                vocab:           ,        
                embed_size:       
                num_hiddens:         
                num_layers:      
            '''
            super(BiRNN, self).__init__()
            self.embedding = nn.Embedding(len(vocab), embed_size)
            
            # encoder-decoder framework
            # bidirectional  True           
            self.encoder = nn.LSTM(input_size=embed_size, 
                                    hidden_size=num_hiddens, 
                                    num_layers=num_layers,
                                    bidirectional=True)
            self.decoder = nn.Linear(4*num_hiddens, 2) #                         
            
        def forward(self, inputs):
            '''
            @params:
                inputs:       ,    (batch_size, seq_len)      
            @return:
                outs:         ,    (batch_size, 2)    
            '''
            #   LSTM       (seq_len)     ,         
            embeddings = self.embedding(inputs.permute(1, 0)) # (seq_len, batch_size, d)
            # rnn.LSTM     、         ,    outputs, (h, c)
            outputs, _ = self.encoder(embeddings) # (seq_len, batch_size, 2*h)
            encoding = torch.cat((outputs[0], outputs[-1]), -1) # (batch_size, 4*h)
            outs = self.decoder(encoding) # (batch_size, 2)
            return outs
    
    embed_size, num_hiddens, num_layers = 100, 100, 2
    net = BiRNN(vocab, embed_size, num_hiddens, num_layers)
    

    ここでは,予備訓練に加えた語ベクトルである.各語を一定の長さのベクトルとして表し、コーパス上の事前訓練によって、これらのベクトルは異なる語間の類似と類比関係をよりよく表現することができ、一定の意味情報を導入し、モデルの学習能力を向上させる.
    cache_dir = "/home/kesci/input/GloVe6B5429"
    glove_vocab = Vocab.GloVe(name='6B', dim=100, cache=cache_dir)
    
    def load_pretrained_embedding(words, pretrained_vocab):
        '''
        @params:
            words:             ,  itos (index to string)        
            pretrained_vocab:       
        @return:
            embed:        
        '''
        embed = torch.zeros(len(words), pretrained_vocab.vectors[0].shape[0]) #     0
        oov_count = 0 # out of vocabulary
        for i, word in enumerate(words):
            try:
                idx = pretrained_vocab.stoi[word]
                embed[i, :] = pretrained_vocab.vectors[idx]
            except KeyError:
                oov_count += 1
        if oov_count > 0:
            print("There are %d oov words." % oov_count)
        return embed
    
    net.embedding.weight.data.copy_(load_pretrained_embedding(vocab.itos, glove_vocab))
    net.embedding.weight.requires_grad = False #          ,         
    

    トレーニングと検証モデル関数の定義
    def evaluate_accuracy(data_iter, net, device=None):
        if device is None and isinstance(net, torch.nn.Module):
            device = list(net.parameters())[0].device 
        acc_sum, n = 0.0, 0
        with torch.no_grad():
            for X, y in data_iter:
                if isinstance(net, torch.nn.Module):
                    net.eval()
                    acc_sum += (net(X.to(device)).argmax(dim=1) == y.to(device)).float().sum().cpu().item()
                    net.train()
                else:
                    if('is_training' in net.__code__.co_varnames):
                        acc_sum += (net(X, is_training=False).argmax(dim=1) == y).float().sum().item() 
                    else:
                        acc_sum += (net(X).argmax(dim=1) == y).float().sum().item() 
                n += y.shape[0]
        return acc_sum / n
    
    def train(train_iter, test_iter, net, loss, optimizer, device, num_epochs):
        net = net.to(device)
        print("training on ", device)
        batch_count = 0
        for epoch in range(num_epochs):
            train_l_sum, train_acc_sum, n, start = 0.0, 0.0, 0, time.time()
            for X, y in train_iter:
                X = X.to(device)
                y = y.to(device)
                y_hat = net(X)
                l = loss(y_hat, y) 
                optimizer.zero_grad()
                l.backward()
                optimizer.step()
                train_l_sum += l.cpu().item()
                train_acc_sum += (y_hat.argmax(dim=1) == y).sum().cpu().item()
                n += y.shape[0]
                batch_count += 1
            test_acc = evaluate_accuracy(test_iter, net)
            print('epoch %d, loss %.4f, train acc %.3f, test acc %.3f, time %.1f sec'
                  % (epoch + 1, train_l_sum / batch_count, train_acc_sum / n, test_acc, time.time() - start))
    

    訓練を始める
    lr, num_epochs = 0.01, 5
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, net.parameters()), lr=lr)
    loss = nn.CrossEntropyLoss()
    
    train(train_iter, test_iter, net, loss, optimizer, device, num_epochs)
    

    予測モデル,Demo実装.
    def predict_sentiment(net, vocab, sentence):
        '''
        @params:
            net:       
            vocab:            ,                   ,      
            sentence:          ,          
        @return:      ,positive        ,negative        
        '''
        device = list(net.parameters())[0].device #          
        sentence = torch.tensor([vocab.stoi[word] for word in sentence], device=device)
        label = torch.argmax(net(sentence.view((1, -1))), dim=1)
        return 'positive' if label.item() == 1 else 'negative'
    
    predict_sentiment(net, vocab, ['this', 'movie', 'is', 'so', 'great'])