SMOのPythonコード実装(参考のみ)

13439 ワード

一緒にMLの支持ベクトルマシン-SMOアルゴリズムのPython実現-コードを学びます
 
 
import numpy as np
import matplotlib.pyplot as plt

from sklearn.datasets import make_blobs,make_circles,make_moons
from sklearn.preprocessing import StandardScaler

class SMOStruct:
    """   John Platt     SMO     """
    def __init__(self, X, y, C, kernel, alphas, b, errors, user_linear_optim):
        self.X = X              #     
        self.y = y              #    label
        self.C = C              # regularization parameter       ,    ( )     
        self.kernel = kernel    # kernel function      ,        ,     (RBF)
        self.alphas = alphas    # lagrange multiplier       ,       
        self.b = b              # scalar bias term   ,   
        self.errors = errors    # error cache      alpha          ,         
        
        self.m, self.n = np.shape(self.X)    # store size(m) of training set and the number of features(n) for each example  
                                             #             features  

        self.user_linear_optim = user_linear_optim    #              
        self.w = np.zeros(self.n)     #      w  ,         
        #self.b = 0               


   

def linear_kernel(x,y,b=1):
    #     
    """ returns the linear combination of arrays 'x' and 'y' with
    the optional bias term 'b' (set to 1 by default). """
    result = x @ y.T + b
    return result # Note the @ operator for matrix multiplications


def gaussian_kernel(x,y, sigma=1):
    #     
    """Returns the gaussian similarity of arrays 'x' and 'y' with
    kernel width paramenter 'sigma' (set to 1 by default)"""

    if np.ndim(x) == 1 and np.ndim(y) == 1:
        result = np.exp(-(np.linalg.norm(x-y,2))**2/(2*sigma**2))
    elif(np.ndim(x)>1 and np.ndim(y) == 1) or (np.ndim(x) == 1 and np.ndim(y)>1):
        result = np.exp(-(np.linalg.norm(x-y, 2, axis=1)**2)/(2*sigma**2))
    elif np.ndim(x) > 1 and np.ndim(y) > 1 :
        result = np.exp(-(np.linalg.norm(x[:, np.newaxis]- y[np.newaxis, :], 2, axis = 2) ** 2)/(2*sigma**2))
    return result


#      ,      
def decision_function_output(model,i):
    if model.user_linear_optim:
        #Equation (J1)
        #return float(np.dot(model.w.T, model.X[i])) - model.b
        return float(model.w.T @ model.X[i]) - model.b
    else:
        #Equation (J10)
        return np.sum([model.alphas[j] * model.y[j] * model.kernel(model.X[j], model.X[i]) for j in range(model.m)]) - model.b


#      ,      
def decision_function(alphas, target, kernel, X_train, x_test, b):
    """ Applies the SVM decision function to the input feature vectors in 'x_test'.
    """
    result = (alphas * target) @ kernel(X_train, x_test) - b   # *,@   Operators   ?
    
    return result

def plot_decision_boundary(model, ax, resolution = 100, colors=('b','k','r'), levels = (-1, 0, 1)):
    """
               ,
             ,                  
 
    """

    #Generate coordinate grid of shape [resolution x resolution]
    #and evalute the model over the entire space
    xrange = np.linspace(model.X[:,0].min(), model.X[:, 0].max(), resolution)
    yrange = np.linspace(model.X[:,1].min(), model.X[:, 1].max(), resolution)
    grid = [[decision_function(model.alphas,model.y, model.kernel, model.X,
                               np.array([xr,yr]), model.b) for xr in xrange] for yr in yrange]
   
    grid = np.array(grid).reshape(len(xrange), len(yrange))


    #Plot decision contours using grid and
    #make a scatter plot of training data
    ax.contour(xrange, yrange, grid, levels=levels, linewidths = (1,1,1),
               linestyles = ('--', '-', '--'), colors=colors)
    ax.scatter(model.X[:,0], model.X[:, 1],
               c=model.y, cmap = plt.cm.viridis, lw=0, alpha =0.25)

    #Plot support vectors (non-zero alphas)
    #as circled points (linewidth >0)
    mask = np.round(model.alphas, decimals = 2) !=0.0
    ax.scatter(model.X[mask,0], model.X[mask,1],
               c=model.y[mask], cmap=plt.cm.viridis, lw=1, edgecolors='k')

    return grid, ax

#    alpha2, alpha1        ,    , “     ,   ”
#             
def take_step(i1, i2, model):
   
    #skip if chosen alphas are the same
    if i1 == i2:
        return 0, model
    # a1, a2    ,old value,           ,   new value
    #    alph1,2, y1,y2,E1, E2, s           ,       
    alph1 = model.alphas[i1]
    alph2 = model.alphas[i2]
   
    y1 = model.y[i1]
    y2 = model.y[i2]

    E1 = get_error(model, i1)
    E2 = get_error(model, i2)
    s = y1 * y2

    #   alpha   ,L, H
    # compute L & H, the bounds on new possible alpha values
    if(y1 != y2):   
        #y1,y2   ,   Equation (J13)
        L = max(0, alph2 - alph1)
        H = min(model.C, model.C + alph2 - alph1)
    elif (y1 == y2):
        #y1,y2   ,   Equation (J14)
        L = max(0, alph1+alph2 - model.C)
        H = min(model.C, alph1 + alph2)
    if (L==H):
        return 0, model

    #       1, 2        ,      eta
    #           ,      a2new
    k11 = model.kernel(model.X[i1], model.X[i1])
    k12 = model.kernel(model.X[i1], model.X[i2])
    k22 = model.kernel(model.X[i2], model.X[i2])
    #   eta,equation (J15)
    eta = k11 + k22 - 2*k12
    
    #      ,       eta  positive      0     a2 new
    
    if(eta>0): 
        #equation (J16)   alpha2
        a2 = alph2 + y2 * (E1 - E2)/eta
        #clip a2 based on bounds L & H
        # a2       equation (J17)
        if L < a2 < H:
            a2 = a2
        elif (a2 <= L):
            a2 = L
        elif (a2 >= H):
            a2 = H
    #  eta   (   0)
    #if eta is non-positive, move new a2 to bound with greater objective function value
    else:
        # Equation (J19)
        #       ,eta     not be positive
        f1 = y1*(E1 + model.b) - alph1*k11 - s*alph2*k12
        f2 = y2 * (E2 + model.b) - s* alph1 * k12 - alph2 * k22

        L1 = alph1 + s*(alph2 - L)
        H1 = alph1 + s*(alph2 - H)

        Lobj = L1 * f1 + L * f2 + 0.5 * (L1 ** 2) * k11 \
               + 0.5 * (L**2) * k22 + s * L * L1 * k12
               
        Hobj = H1 * f1 + H * f2 + 0.5 * (H1**2) * k11 \
               + 0.5 * (H**2) * k22 + s * H * H1 * k12
               
        if Lobj < Hobj - eps:
            a2 = L
        elif Lobj > Hobj + eps:
            a2 = H
        else:
            a2 = alph2

    # new a2        C 0 ,     C 0
    if a2 <1e-8:
        a2 = 0.0
    elif a2 > (model.C - 1e-8):
        a2 = model.C
    #          ,  
    #If examples can't be optimized within epsilon(eps), skip this pair
    if (np.abs(a2 - alph2) < eps * (a2 + alph2 + eps)):
        return 0, model

    #    a2     a1 Equation(J18)
    a1 = alph1 + s * (alph2 - a2)

    #   bias b   Equation (J20)
    b1 = E1 + y1*(a1 - alph1) * k11 + y2 * (a2 - alph2) * k12 + model.b
    #equation (J21)
    b2 = E2 + y1*(a1 - alph1) * k12 + y2 * (a2 - alph2) * k22 + model.b

    # Set new threshoold based on if a1 or a2 is bound by L and/or H
    if 0 < a1 and a1 < C:
        b_new =b1
    elif 0 < a2 and a2 < C:
        b_new = b2
    #Average thresholds if both are bound
    else:
        b_new = (b1 + b2) * 0.5

    #update model threshold
    model.b = b_new

    #              
    #Equation (J22)   w  
    if model.user_linear_optim:
         model.w = model.w + y1 * (a1 - alph1)*model.X[i1] + y2 * (a2 - alph2) * model.X[i2]
    # alphas       a1, a2  
    #Update model object with new alphas & threshold
    model.alphas[i1] = a1
    model.alphas[i2] = a2

    #    ,          
    #           
    model.errors[i1] = 0
    model.errors[i2] = 0
    #     Equation (12)
    for i in range(model.m):
        if 0 < model.alphas[i] < model.C:
            model.errors[i] += y1*(a1 - alph1)*model.kernel(model.X[i1],model.X[i]) + \
                            y2*(a2 - alph2)*model.kernel(model.X[i2], model.X[i]) + model.b - b_new

    return 1, model


def get_error(model, i1):
    if 0< model.alphas[i1]  tol and alph2 > 0)):
        if len(model.alphas[(model.alphas != 0) & (model.alphas != model.C)]) > 1:
            #  Ei             
            #   |E1-E2|  ,    E2   ,     Ei  E1
            #  E2        Ei  E1
            if model.errors[i2] > 0:
                i1 = np.argmin(model.errors)
            elif model.errors[i2] <= 0:
                i1 = np.argmax(model.errors)

            step_result,model = take_step(i1,i2, model)
            if step_result:
                return 1, model

        #      0  C alphas     ,       
        for i1 in np.roll(np.where((model.alphas != 0) & (model.alphas != model.C))[0],
                          np.random.choice(np.arange(model.m))):
            step_result, model = take_step(i1, i2, model)
            if step_result:
                return 1, model
        
        #a2      ,    a1?     (m-1) alphas,        
        for i1 in np.roll(np.arange(model.m), np.random.choice(np.arange(model.m))):
            #print("what is the first i1",i1)
            step_result, model = take_step(i1, i2, model)
           
            if step_result:
                return 1, model
    #      if  ,  if     ,  KKT     ,         ,       ,  
    return 0, model


def fit(model):
   
    numChanged = 0
    examineAll = 1

    #loop num record
    #   ,          
    loopnum = 0
    loopnum1 = 0
    loopnum2 = 0

    #  numChanged = 0 and examineAll = 0      
    #               ,      if    ,
    #    else          alpha,       :      ,    KKT  
    #      ,    3000       ,   
    #   :          alpha2,   old a 2,    alpha2   ,old a2 old a1  heuristically   
    while(numChanged > 0) or (examineAll): 
        numChanged = 0
        if loopnum == 2000:
           break
        loopnum = loopnum + 1
        if examineAll:
            loopnum1 = loopnum1 + 1 #           alpha2      
            # #  0,1,2,3,...,m    a2 ,  examine_example  alpha1,  m(m-1)   
            for i in range(model.alphas.shape[0]): 
                examine_result, model = examine_example(i, model)
                numChanged += examine_result
        else:  #  if m(m-1)        
            loopnum2 = loopnum2 + 1
            # loop over examples where alphas are not already at their limits
            for i in np.where((model.alphas != 0) & (model.alphas != model.C))[0]:
                examine_result, model = examine_example(i, model)
                numChanged += examine_result
        if examineAll == 1:
            examineAll = 0
        elif numChanged == 0:
            examineAll = 1
    print("loopnum012",loopnum,":", loopnum1,":", loopnum2)   
    return model

# can be replaced as per different model u want to show


#make_blobs      

#       ,    
X_train, y = make_blobs(n_samples = 1000, centers =2, n_features=2, random_state = 2)
#StandardScaler()  fit_transfrom           
scaler = StandardScaler()   #     ,               ,    0,    1
                            #                      ,              
X_train_scaled = scaler.fit_transform(X_train, y)
y[y == 0] = -1

# set model parameters and initial values
C = 20.0
m = len(X_train_scaled)
initial_alphas = np.zeros(m)
initial_b = 0.0

#set tolerances
tol = 0.01 # error tolerance
eps = 0.01 # alpha tolerance

#Instaantiate model

model = SMOStruct(X_train_scaled, y, C, linear_kernel, initial_alphas, initial_b, np.zeros(m),user_linear_optim=True)
#print("model created ...")
#initialize error cache

initial_error = decision_function(model.alphas, model.y, model.kernel, model.X, model.X, model.b) - model.y
model.errors = initial_error
np.random.seed(0)

# stop here
'''

'''
#X_train, y = make_circles(n_samples=500, noise=0.2,
#                          factor=0.1,
#                          random_state=1)
'''
X_train,y = make_moons(n_samples = 500, noise=0.2,
                        random_state =1)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train, y)
y[y == 0] = -1

#print('X_train',':', X_train)
# print('y',':',y)

#Set model parameters and initial values
C = 1.0
m = len(X_train_scaled)
initial_alphas = np.zeros(m)
initial_b = 0.0

# Set tolerances
tol = 0.01 # error tolerance
eps = 0.01 # alpha tolerance

# instantiate model
model = SMOStruct(X_train_scaled, y, C, lambda x, y: gaussian_kernel(x,y,sigma=0.5),
                  initial_alphas, initial_b, np.zeros(m), user_linear_optim=False)

#initialize error cache
#       
initial_error = decision_function(model.alphas, model.y, model.kernel,
                                   model.X, model.X, model.b) - model.y
# initial_error = np.zeros(m)
# #print('initial error',':', initial_error)
model.errors = initial_error 

'''
print("starting to fit...")
#    
output = fit(model)
#     ,        
fig, ax = plt.subplots()
grid, ax = plot_decision_boundary(output, ax)
plt.show()