kube-proxyソースコード解析


広告|kubernetesオフラインインストールパッケージ、わずか3ステップ
kube-proxyソースコード解析
ipvsはiptablesモードに比べて高い性能と安定性を備えており、本稿ではこのモードのソースコード解析を主とし、iptablesモードの原理を理解したい場合は、その実現を参考にすることができ、アーキテクチャに差はない.
kube-proxyの主な機能は、サービスとendpointのイベントを傍受し、エージェントポリシーをマシンにドロップすることです.最下位はdocker/libnetworkを呼び出し、libnetworkは最終的にnetlinkとnetnsを呼び出してipvsの作成などの動作を実現する
構成の初期化
コードエントリ:cmd/kube-proxy/app/server.go Run()関数
コマンドラインパラメータによるproxyServerの構成の初期化
proxyServer, err := NewProxyServer(o)
type ProxyServer struct {
    // k8s client
    Client                 clientset.Interface
    EventClient            v1core.EventsGetter

    // ipvs     
    IptInterface           utiliptables.Interface
    IpvsInterface          utilipvs.Interface
    IpsetInterface         utilipset.Interface

    //          
    Proxier                proxy.ProxyProvider

    //     ,ipvs iptables userspace kernelspace(windows)  
    ProxyMode              string
    //       
    ConfigSyncPeriod       time.Duration

    // service   endpoint      
    ServiceEventHandler    config.ServiceHandler
    EndpointsEventHandler  config.EndpointsHandler
}

Proxierは主要なエントリで、2つの関数を抽象化しています.
type ProxyProvider interface {
    // Sync immediately synchronizes the ProxyProvider's current state to iptables.
    Sync()
    //     
    SyncLoop()
}

ipvsのinterfaceこれは重要です.
type Interface interface {
    //       
    Flush() error
    //     virtual server
    AddVirtualServer(*VirtualServer) error

    UpdateVirtualServer(*VirtualServer) error
    DeleteVirtualServer(*VirtualServer) error
    GetVirtualServer(*VirtualServer) (*VirtualServer, error)
    GetVirtualServers() ([]*VirtualServer, error)

    //  virtual server  realserver,   VirtualServer    clusterip realServer  pod(      endpoint)
    AddRealServer(*VirtualServer, *RealServer) error
    GetRealServers(*VirtualServer) ([]*RealServer, error)
    DeleteRealServer(*VirtualServer, *RealServer) error
}

後でipvs_を詳しく見てみましょうlinuxはどのように上のインタフェースを実現しますか?
virtual serverとrealserver、最も重要なのはip:port、それからsessionAffinityなどのエージェントのモードです.
type VirtualServer struct {
    Address   net.IP
    Protocol  string
    Port      uint16
    Scheduler string
    Flags     ServiceFlags
    Timeout   uint32
}

type RealServer struct {
    Address net.IP
    Port    uint16
    Weight  int
}

apiserver clientの作成
client, eventClient, err := createClients(config.ClientConnection, master)

Proxierを作成ipvsモードのみに注目したproxier
else if proxyMode == proxyModeIPVS {
        glog.V(0).Info("Using ipvs Proxier.")
        proxierIPVS, err := ipvs.NewProxier(
            iptInterface,
            ipvsInterface,
            ipsetInterface,
            utilsysctl.New(),
            execer,
            config.IPVS.SyncPeriod.Duration,
            config.IPVS.MinSyncPeriod.Duration,
            config.IPTables.MasqueradeAll,
            int(*config.IPTables.MasqueradeBit),
            config.ClusterCIDR,
            hostname,
            getNodeIP(client, hostname),
            recorder,
            healthzServer,
            config.IPVS.Scheduler,
        )
...
        proxier = proxierIPVS
        serviceEventHandler = proxierIPVS
        endpointsEventHandler = proxierIPVS

このProxierには、次の方法があります.
   +OnEndpointsAdd(endpoints *api.Endpoints)
   +OnEndpointsDelete(endpoints *api.Endpoints)
   +OnEndpointsSynced()
   +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
   +OnServiceAdd(service *api.Service)
   +OnServiceDelete(service *api.Service)
   +OnServiceSynced()
   +OnServiceUpdate(oldService, service *api.Service)
   +Sync()
   +SyncLoop()

だからipvsのこのProxierは私たちが必要とするほとんどのインタフェースを実現しました
まとめてみます.
     +-----------> endpointHandler
     |
     +-----------> serviceHandler
     |                ^
     |                | +-------------> sync      
     |                | |
ProxyServer---------> Proxier --------> service                
     |                  |                                                
     |                  +-------------> endpoint              
     |                                             |    
     +-----> ipvs interface ipvs handler     

ProxyServerの起動
  • clean upパラメータが付いているかどうかを確認し、持っている場合はすべてのルールをクリアして
  • を終了します.
  • OOM adjusterは実現していないようで、
  • を無視します
  • resouceContainerも実現せず、
  • を無視
  • metricsサーバを起動します.これは重要です.例えば、promethusのmetricsを含むこのパラメータを監視したいときに入力することができます.metrics-bind-addressパラメータ
  • informerを起動し、イベントの傍受を開始し、それぞれコヒーレント処理を開始する.

  • 1 2 3 4私たちはあまり注目する必要はありません.5をよく見てください.
    informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)
    
    serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
    //    service handler   
    serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
    //        ServiceEventHandler   informer   
    go serviceConfig.Run(wait.NeverStop)
    
    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
    //   endpoint 
    endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
    go endpointsConfig.Run(wait.NeverStop)
    
    go informerFactory.Start(wait.NeverStop)

    serviceConfig.RunとendpointConfig.Runはコールバック関数に値を割り当てるだけなので、登録したhandlerはinformerに与えられ、informerはイベントを傍受するとコールバックします.
    for i := range c.eventHandlers {
        glog.V(3).Infof("Calling handler.OnServiceSynced()")
        c.eventHandlers[i].OnServiceSynced()
    }

    では質問ですが、登録されているこのhandlerは何ですか?前文の
            serviceEventHandler = proxierIPVS
            endpointsEventHandler = proxierIPVS

    だからこのproxierIPVS
    handlerのコールバック関数は、informerがこれらの関数をコールバックするので、自分で開発したときにこのinterfaceを登録すればいいです.
    type ServiceHandler interface {
        // OnServiceAdd is called whenever creation of new service object
        // is observed.
        OnServiceAdd(service *api.Service)
        // OnServiceUpdate is called whenever modification of an existing
        // service object is observed.
        OnServiceUpdate(oldService, service *api.Service)
        // OnServiceDelete is called whenever deletion of an existing service
        // object is observed.
        OnServiceDelete(service *api.Service)
        // OnServiceSynced is called once all the initial even handlers were
        // called and the state is fully propagated to local cache.
        OnServiceSynced()
    }

    リスニングの開始
    go informerFactory.Start(wait.NeverStop)

    ここで実行すると、サービスendpointの削除などの動作が傍受され、コールバックされ、上の図を振り返ると、最終的にはProxierによって実現されるので、後でProxierに注目すればいいです.
    s.Proxier.SyncLoop()

    それからSyncLoopを始めます.
    Proxier実装
    サービスを作成するときにOnServiceAddメソッドが呼び出されます.ここでは、前のステータスと現在のステータスの2つを記録し、syncRunnerに信号を送って処理させます.
    func (proxier *Proxier) OnServiceAdd(service *api.Service) {
        namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
        if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
            proxier.syncRunner.Run()
        }
    }

    サービス情報を記録すると、何もしていないことがわかります.サービスをmapに存在させ、map情報を直接削除しなければ何も処理しません.
    change, exists := scm.items[*namespacedName]
    if !exists {
        change = &serviceChange{}
        //   service  
        change.previous = serviceToServiceMap(previous)
        scm.items[*namespacedName] = change
    }
    //       service  
    change.current = serviceToServiceMap(current)
    
        ,    
    if reflect.DeepEqual(change.previous, change.current) {
        delete(scm.items, *namespacedName)
    }

    proxier.syncRunner.Run()の中から一つの信号が送られてきました
    select {
    case bfr.run 

    この信号は処理されています
    s.Proxier.SyncLoop()
    
    func (proxier *Proxier) SyncLoop() {
        // Update healthz timestamp at beginning in case Sync() never succeeds.
        if proxier.healthzServer != nil {
            proxier.healthzServer.UpdateTimestamp()
        }
        proxier.syncRunner.Loop(wait.NeverStop)
    }

    runnerでは信号実行が受信され、受信されていない信号は定期的に実行されます.
    func (bfr *BoundedFrequencyRunner) Loop(stop 

    このbfr runnerで私たちが最も考えなければならないのはコールバック関数で、tryRunでこのコールバックがスケジューリングされた条件を満たしているかどうかをチェックします.
    type BoundedFrequencyRunner struct {
        name        string        // the name of this instance
        minInterval time.Duration // the min time between runs, modulo bursts
        maxInterval time.Duration // the max time between runs
    
        run chan struct{} // try an async run
    
        mu      sync.Mutex  // guards runs of fn and all mutations
        fn      func()      // function to run,     
        lastRun time.Time   // time of last run
        timer   timer       // timer for deferred runs
        limiter rateLimiter // rate limiter for on-demand runs
    }
    
    //    proxier.syncProxyRules    
    proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

    これは600行程度の揉み関数であり,主な論理を処理する場所でもある.
    syncProxyRules
  • markやcomment
  • などのiptablesルールを設定します.
  • マシンにNICがあると判断し、ipvsは上の
  • にアドレスをバインドする必要がある.
  • はipsetがあることを決定し、ipsetはiptablesの拡張であり、一連のアドレスにiptablesルール
  • を設定することができる.
    ...(臭いし長いし、重複コードが多くて見られないし、細部の問題は自分で見ましょう)
  • 最も注目しているのは、VirtualServerの
  • をどのように処理するかです.
    serv := &utilipvs.VirtualServer{
        Address:   net.ParseIP(ingress.IP),
        Port:      uint16(svcInfo.port),
        Protocol:  string(svcInfo.protocol),
        Scheduler: proxier.ipvsScheduler,
    }
    if err := proxier.syncService(svcNameString, serv, false); err == nil {
        if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
        }
    }

    実装を見て、存在しない場合は作成し、存在する場合は更新し、NICにserviceのcluster ipをバインドします.
    func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
        appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
        if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
            if appliedVirtualServer == nil {
                if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
                    return err
                }
            } else {
                if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
                    return err
                }
            }
        }
    
        // bind service address to dummy interface even if service not changed,
        // in case that service IP was removed by other processes
        if bindAddr {
            _, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
            if err != nil {
                return err
            }
        }
        return nil
    }

    サービスインプリメンテーションの作成
    ipvsのAddVirtualServerの実装を見ることができます.主にsocketを利用してカーネルプロセスと通信しています.pkg/util/ipvs/ipvs_linux.go里runner構造体はこれらの方法を実現し、ここでdocker/libnetwork/ipvsライブラリを使用した.
    // runner implements Interface.
    type runner struct {
        exec       utilexec.Interface
        ipvsHandle *ipvs.Handle
    }
    
    // New returns a new Interface which will call ipvs APIs.
    func New(exec utilexec.Interface) Interface {
        ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
        if err != nil {
            glog.Errorf("IPVS interface can't be initialized, error: %v", err)
            return nil
        }
        return &runner{
            exec:       exec,
            ipvsHandle: ihandle,
        }
    }

    Newの時に特殊なsocketを作成しました.ここでは私たちの普通のsocketのプログラミングと違いはありません.肝心なのはsyscallです.AF_NETLINKというパラメータは、カーネルプロセスとの通信を表します.
    sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)
    
    func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
        fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
        if err != nil {
            return nil, err
        }
        s := &NetlinkSocket{
            fd: int32(fd),
        }
        s.lsa.Family = syscall.AF_NETLINK
        if err := syscall.Bind(fd, &s.lsa); err != nil {
            syscall.Close(fd)
            return nil, err
        }
    
        return s, nil
    }

    サービスを作成し、dockerサービス形式に変換し、直接呼び出します.
    // AddVirtualServer is part of Interface.
    func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
        eSvc, err := toBackendService(vs)
        if err != nil {
            return err
        }
        return runner.ipvsHandle.NewService(eSvc)
    }

    そしてサービス情報をパッケージ化し、socketに書けばいいです.
    
    func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
        req := newIPVSRequest(cmd)
        req.Seq = atomic.AddUint32(&i.seq, 1)
    
        if s == nil {
            req.Flags |= syscall.NLM_F_DUMP                    //Flag to dump all messages
            req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
        } else {
            req.AddData(fillService(s))
        } //  service     
    
        if d == nil {
            if cmd == ipvsCmdGetDest {
                req.Flags |= syscall.NLM_F_DUMP
            }
    
        } else {
            req.AddData(fillDestinaton(d))
        }
    
        //        service  
        res, err := execute(i.sock, req, 0)
        if err != nil {
            return [][]byte{}, err
        }
    
        return res, nil
    }

    コンストラクションリクエスト
    func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
        return newGenlRequest(ipvsFamily, cmd)
    }

    リクエストの構築時に渡されるのはipvsプロトコルクラスタです
    カーネルと通信するメッセージヘッダを構築します
    func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
        return &NetlinkRequest{
            NlMsghdr: syscall.NlMsghdr{
                Len:   uint32(syscall.SizeofNlMsghdr),
                Type:  uint16(proto),
                Flags: syscall.NLM_F_REQUEST | uint16(flags),
                Seq:   atomic.AddUint32(&nextSeqNr, 1),
            },
        }
    }

    メッセージにDataを追加します.このDataは配列です.2つの方法が必要です.
    type NetlinkRequestData interface {
        Len() int  //   
        Serialize() []byte //    ,               ,service       
    }

    例えばheaderはこのようにシーケンス化されていて、一見呆然として、長い間考えてやっと理解しました:([unsafe.Sizeof(hdr)]byte)1つの*[]byteタイプ、長さは構造体の大きさ(unsafe.Pointer(hdr))構造体をbyteポインタタイプに変換して*を加えてその値を[:]byteに変換して返します
    func (hdr *genlMsgHdr) Serialize() []byte {
        return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
    }

    サービス情報をカーネルに送信
    普通のsocketが受信データを送信する
    func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
        var (
            err error
        )
    
        if err := s.Send(req); err != nil {
            return nil, err
        }
    
        pid, err := s.GetPid()
        if err != nil {
            return nil, err
        }
    
        var res [][]byte
    
    done:
        for {
            msgs, err := s.Receive()
            if err != nil {
                return nil, err
            }
            for _, m := range msgs {
                if m.Header.Seq != req.Seq {
                    continue
                }
                if m.Header.Pid != pid {
                    return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
                }
                if m.Header.Type == syscall.NLMSG_DONE {
                    break done
                }
                if m.Header.Type == syscall.NLMSG_ERROR {
                    error := int32(native.Uint32(m.Data[0:4]))
                    if error == 0 {
                        break done
                    }
                    return nil, syscall.Errno(-error)
                }
                if resType != 0 && m.Header.Type != resType {
                    continue
                }
                res = append(res, m.Data)
                if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
                    break done
                }
            }
        }
        return res, nil
    }

    サービスデータパッケージ
    ここでは細かく、コア思想はカーネルが一定のフォーマットの標準データしか認識していないことであり、サービス情報をその標準に従ってカーネルにパッケージして送信すればよい.
    どのように梱包するかについては詳しく説明しません.
    func fillService(s *Service) nl.NetlinkRequestData {
        cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
        if s.FWMark != 0 {
            nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
        } else {
            nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
            nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
    
            // Port needs to be in network byte order.
            portBuf := new(bytes.Buffer)
            binary.Write(portBuf, binary.BigEndian, s.Port)
            nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
        }
    
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
        if s.PEName != "" {
            nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
        }
        f := &ipvsFlags{
            flags: s.Flags,
            mask:  0xFFFFFFFF,
        }
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
        nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
        return cmdAttr
    }

    まとめ
    サービスは全体的にコードが簡単ですが、実装が少し迂回しているところがあり、直接的ではないと思います.全体的にapiserverイベントを傍受し、処理よりも定期的に同期ポリシーを実行する.