Kubernetes学習ノートのCalico CNI Pluginソースコード解析(二)

18386 ワード

Overview
calicoプラグインコードウェアハウスはprojectcalico/cni-pluginにあり、sandbox containerのためにrouteと仮想ネットワークカードvirtual interface、veth pairなどのネットワークリソースを作成し、関連データをcalico datastoreデータベースに書き込む2つのバイナリファイルをコンパイルします.calico-ipamは、現在のpodに対して現在のノードのpodセグメント内からipアドレスを割り当てます.もちろん、現在のノードにはpodセグメントがありません.クラスタセグメントcluster cidrからノードのpod cidrを先に割り当て、関連データをcalico datastoreデータベースに書き込みます.ここでcluster cidrはユーザー自身が定義し、calico datastoreに事前に書き込まれています.またcluster cidrから区切られたblock sizeもカスタマイズ可能(新しいバージョンのcalico/nodeコンテナはカスタマイズをサポートでき、古いバージョンのcalicoはサポートされていない)であり、公式サイトドキュメントchange-block-sizeを参照することができる.
次にcalicoバイナリプラグインが具体的にどのように動作しているかを重点的に見て、calico-ipamバイナリプラグインがipアドレスをどのように割り当てているかを見てみましょう.
calico pluginソースコード解析
calicoプラグインはcni標準インタフェースに従い、ADDおよびDELコマンドを実現しています.ここでは、ADDコマンドの場合にどのように実現されるかに重点を置いています.calicoはまずADDコマンドとDELコマンドを登録します.コードはL614-L677です.

func Main(version string) {
    // ...
    err := flagSet.Parse(os.Args[1:])
    // ...
    //    `ADD`   `DEL`   
    skel.PluginMain(cmdAdd, nil, cmdDel,
        cniSpecVersion.PluginSupports("0.1.0", "0.2.0", "0.3.0", "0.3.1"),
        "Calico CNI plugin "+version)
}
ADD命令では、主に3つの論理を行いました.
  • クエリーcalico datastoreにWorkloadEndpointオブジェクトと現在のpod名前が一致しているか、一致していないかを問い合わせると、host network namespace内のpodのNIC名前とpod ipアドレス、container network namespaceのNIC名前などの情報が主に保存される新しいWorkloadEndpointオブジェクトが作成されます.オブジェクトの例は次のとおりです.
  • veth pairを作成し、そのうちの1つのNICをホスト側ネットワークネーミングスペースに、もう1つはコンテナ側ネットワークネーミングスペースに配置します.container network namespace内にeth 0などのネットワークカードを作成し、calico-ipamを呼び出して得られたIPアドレスをeth 0ネットワークカードに付与する.host network namespace内にNICを作成します.NIC名は"cali" + sha1(namespace.pod)[:11]で、MACアドレス「ee:ee:ee:ee:ee:ee」を設定します.
  • は、コンテナ端子およびシンクホスト端子にルーティングを作成する.容器端では、デフォルトゲートウェイが169.254.1.1に設定され、このゲートウェイアドレスコードが書かれている.ホスト側では、10.217.120.85 dev calid0bda9976d5 scope linkのようなルーティングが追加され、10.217.120.85はpod ipアドレスであり、calid0bda9976d5はホスト側のpodのネットワークカード、すなわちveth pairがホスト側のvirtual ethernetインターネットインターネットインターネットインターネット仮想ネットワークデバイスである.

  • WorkloadEndpointオブジェクトの例は、k 8 s podオブジェクトがcalicoのworkloadendpointオブジェクトに対応し、calicoctl get wep -o wideですべてのworkloadendpointを表示できます.calico datastoreをkubernetesに設定することを覚えておいてください.~/.zshrcで環境変数を簡単に設定できます.
    # calico
    export CALICO_DATASTORE_TYPE=kubernetes
    export  CALICO_KUBECONFIG=~/.kube/config
    
    apiVersion: projectcalico.org/v3
    kind: WorkloadEndpoint
    metadata:
      creationTimestamp: "2021-01-09T08:38:56Z"
      generateName: nginx-demo-1-7f67f8bdd8-
      labels:
        app: nginx-demo-1
        pod-template-hash: 7f67f8bdd8
        projectcalico.org/namespace: default
        projectcalico.org/orchestrator: k8s
        projectcalico.org/serviceaccount: default
      name: minikube-k8s-nginx--demo--1--7f67f8bdd8--d5wsc-eth0
      namespace: default
      resourceVersion: "557760"
      uid: 85d1d33f-f55f-4f28-a89d-0a55394311db
    spec:
      endpoint: eth0
      interfaceName: calife8e5922caa
      ipNetworks:
      - 10.217.120.84/32
      node: minikube
      orchestrator: k8s
      pod: nginx-demo-1-7f67f8bdd8-d5wsc
      profiles:
      - kns.default
      - ksa.default.default
    

    以上の3つの主要な論理に基づいて、cmdAdd関数コードを見てみましょう.
    
    func cmdAdd(args *skel.CmdArgs) (err error) {
        // ...
        //  args.StdinData       ,          
        // `--cni-conf-dir`         , cni    ,      
        // types.NetConf            cni        
        conf := types.NetConf{}
        if err := json.Unmarshal(args.StdinData, &conf); err != nil {
            return fmt.Errorf("failed to load netconf: %v", err)
        }
        //       cni    , calico              
        // "log_level": "debug", "log_file_path": "/var/log/calico/cni/cni.log",
        utils.ConfigureLogging(conf)
        
        // ...
        
        //    cni     MTU, Max Transmit Unit      ,       
        if mtu, err := utils.MTUFromFile("/var/lib/calico/mtu"); err != nil {
            return fmt.Errorf("failed to read MTU file: %s", err)
        } else if conf.MTU == 0 && mtu != 0 {
            conf.MTU = mtu
        }
    
        //     WEPIdentifiers  ,   
        nodename := utils.DetermineNodename(conf)
        wepIDs, err := utils.GetIdentifiers(args, nodename)
        calicoClient, err := utils.CreateClient(conf)
        
        //   datastore    ready ,   `calicoctl get clusterinformation default -o yaml`   
        ci, err := calicoClient.ClusterInformation().Get(ctx, "default", options.GetOptions{})
        if !*ci.Spec.DatastoreReady {
            return
        }
    
        // list    wepPrefix workloadEndpoint,  pod    workloadEndpoint,          workloadEndpoint,     workloadEndpoint
        //        pod network   ,  calico       workloadEndpoint
        wepPrefix, err := wepIDs.CalculateWorkloadEndpointName(true)
        endpoints, err := calicoClient.WorkloadEndpoints().List(ctx, options.ListOptions{Name: wepPrefix, Namespace: wepIDs.Namespace, Prefix: true})
        if err != nil {
            return
        }
    
        //      pod,    calico datastore         workloadendpoint  
        var endpoint *api.WorkloadEndpoint
    
        //           pod,           workloadEndpoint  ,  endpoints   nil 
        if len(endpoints.Items) > 0 {
            // ...
        }
    
        //   endpoint nil,   WEPIdentifiers     ,  args.IfName kubelet      ,         ,   eth0
        //   WEPName    :{node_name}-k8s-{strings.replace(pod_name, "-", "--")}-{wepIDs.Endpoint},    
        // minikube-k8s-nginx--demo--1--7f67f8bdd8--d5wsc-eth0 WorkloadEndpoint  
        if endpoint == nil {
            wepIDs.Endpoint = args.IfName
            wepIDs.WEPName, err = wepIDs.CalculateWorkloadEndpointName(false)
        }
    
        // Orchestrator k8s
        if wepIDs.Orchestrator == api.OrchestratorKubernetes {
            // k8s.CmdAddK8s             
            if result, err = k8s.CmdAddK8s(ctx, args, conf, *wepIDs, calicoClient, endpoint); err != nil {
                return
            }
        } else {
            // ...
        }
    
        //          policy.type  k8s,        
        if conf.Policy.PolicyType == "" {
            // ...
        }
    
        // Print result to stdout, in the format defined by the requested cniVersion.
        err = cnitypes.PrintResult(result, conf.CNIVersion)
        return
    }
    

    以上のcmdAdd()関数の基本構造はcni規格の関数構造に合致し、最後にstdoutに結果を印刷します.k8s.CmdAddK8s()関数の主な論理を見てみましょう.
    //       :
    // 1.  calico store   WorkloadEndpoint  , pod  
    // 2.   veth pair,      ,   IP/MAC  ;       ,  MAC  
    // 3.     ,           ;       pod ip/mac   
    func CmdAddK8s(ctx context.Context, args *skel.CmdArgs, conf types.NetConf, epIDs utils.WEPIdentifiers, calicoClient calicoclient.Interface, endpoint *api.WorkloadEndpoint) (*current.Result, error) {
        // ...
        //                  data plane,   linuxDataplane  
        d, err := dataplane.GetDataplane(conf, logger)
        //   k8s client
        client, err := NewK8sClient(conf, logger)
        
        //          ipam.type=calico-ipam
        if conf.IPAM.Type == "host-local" {
            // ...
        }
    
        // ...
        
        //       pod namespace annotation: cni.projectcalico.org/ipv4pools
        //       ,      
        if conf.Policy.PolicyType == "k8s" {
            annotNS, err := getK8sNSInfo(client, epIDs.Namespace)
            labels, annot, ports, profiles, generateName, err = getK8sPodInfo(client, epIDs.Pod, epIDs.Namespace)
            // ...
            if conf.IPAM.Type == "calico-ipam" {
                var v4pools, v6pools string
                // Sets  the Namespace annotation for IP pools as default
                v4pools = annotNS["cni.projectcalico.org/ipv4pools"]
                v6pools = annotNS["cni.projectcalico.org/ipv6pools"]
                // Gets the POD annotation for IP Pools and overwrites Namespace annotation if it exists
                v4poolpod := annot["cni.projectcalico.org/ipv4pools"]
                if len(v4poolpod) != 0 {
                    v4pools = v4poolpod
                }
                // ...
            }
        }
    
        ipAddrsNoIpam := annot["cni.projectcalico.org/ipAddrsNoIpam"]
        ipAddrs := annot["cni.projectcalico.org/ipAddrs"]
        
        switch {
        //        :  calico-ipam      IP  
        case ipAddrs == "" && ipAddrsNoIpam == "":
            //    pod    annotation "cni.projectcalico.org/ipAddrsNoIpam" "cni.projectcalico.org/ipAddrs" 
            //     calico-ipam    pod ip 
            //   calico-ipam      pod ip ,        
            result, err = utils.AddIPAM(conf, args, logger)
            // ...
        case ipAddrs != "" && ipAddrsNoIpam != "":
            // Can't have both ipAddrs and ipAddrsNoIpam annotations at the same time.
            e := fmt.Errorf("can't have both annotations: 'ipAddrs' and 'ipAddrsNoIpam' in use at the same time")
            logger.Error(e)
            return nil, e
        case ipAddrsNoIpam != "":
            // ...
        case ipAddrs != "":
            // ...
        }
        
        //     WorkloadEndpoint  ,      
        endpoint.Name = epIDs.WEPName
        endpoint.Namespace = epIDs.Namespace
        endpoint.Labels = labels
        endpoint.GenerateName = generateName
        endpoint.Spec.Endpoint = epIDs.Endpoint
        endpoint.Spec.Node = epIDs.Node
        endpoint.Spec.Orchestrator = epIDs.Orchestrator
        endpoint.Spec.Pod = epIDs.Pod
        endpoint.Spec.Ports = ports
        endpoint.Spec.IPNetworks = []string{}
        if conf.Policy.PolicyType == "k8s" {
            endpoint.Spec.Profiles = profiles
        } else {
            endpoint.Spec.Profiles = []string{conf.Name}
        }
    
        // calico-ipam   ip   ,  endpoint.Spec.IPNetworks 
        if err = utils.PopulateEndpointNets(endpoint, result); err != nil {
            // ...
        }
    
        //   desiredVethName      :`"cali" + sha1(namespace.pod)[:11]` ,            
        desiredVethName := k8sconversion.NewConverter().VethNameForWorkload(epIDs.Namespace, epIDs.Pod)
        
        // DoNetworking()     ,      veth pair   
        //      linuxDataplane   DoNetworking()  
        hostVethName, contVethMac, err := d.DoNetworking(
            ctx, calicoClient, args, result, desiredVethName, routes, endpoint, annot)
        
        // ...
        mac, err := net.ParseMAC(contVethMac)
        endpoint.Spec.MAC = mac.String()
        endpoint.Spec.InterfaceName = hostVethName
        endpoint.Spec.ContainerID = epIDs.ContainerID
    
        // ...
    
        //      WorkloadEndpoint  ,     ,        pod  , calico datastore       workloadendpoint  
        if _, err := utils.CreateOrUpdate(ctx, calicoClient, endpoint); err != nil {
            // ...
        }
    
        // Add the interface created above to the CNI result.
        result.Interfaces = append(result.Interfaces, &current.Interface{
            Name: endpoint.Spec.InterfaceName},
        )
    
        return result, nil
    }
    

    上記のコードは最後にworkloadendpointオブジェクトを作成し、DoNetworking()関数が重要です.この関数ではルーティングとveth pairが作成されます.次に、linuxDataplaneオブジェクトのDoNetworking()関数を見て、veth pairとroutesをどのように作成しますか.ここでは主にgithub.com/vishvananda/netlink golangパケットを呼び出してネットワークカードやルーティングなどの操作を削除したり変更したりすることができ、ip link add/delete/set xxxなどの命令を実行することに等しい.このgolangパケットも使いやすいパケットであり、k 8 sプロジェクトのような多くの主要プロジェクトで使用されており、linuxネットワークに関する知識を学ぶ際にこのパケットを利用して関連demoを書くことができ、効率も高い.ここでcalicoがnetlinkというパッケージを使用してroutesとveth pairを作成する方法を見てみましょう.
    
    func (d *linuxDataplane) DoNetworking(
        ctx context.Context,
        calicoClient calicoclient.Interface,
        args *skel.CmdArgs,
        result *current.Result,
        desiredVethName string,
        routes []*net.IPNet,
        endpoint *api.WorkloadEndpoint,
        annotations map[string]string,
    ) (hostVethName, contVethMAC string, err error) {
        //   desiredVethName      :`"cali" + sha1(namespace.pod)[:11]` ,            
        hostVethName = desiredVethName
        //           eth0
        contVethName := args.IfName
    
        err = ns.WithNetNSPath(args.Netns, func(hostNS ns.NetNS) error {
            veth := &netlink.Veth{
                LinkAttrs: netlink.LinkAttrs{
                    Name: contVethName,
                    MTU:  d.mtu,
                },
                PeerName: hostVethName,
            }
            //   veth peer,       eth0,        "cali" + sha1(namespace.pod)[:11]
            //    ip link add xxx type veth peer name xxx   
            if err := netlink.LinkAdd(veth); err != nil {
            }
            hostVeth, err := netlink.LinkByName(hostVethName)
            if mac, err := net.ParseMAC("EE:EE:EE:EE:EE:EE"); err != nil {
            } else {
                //          mac  ,  ee:ee:ee:ee:ee:ee
                if err = netlink.LinkSetHardwareAddr(hostVeth, mac); err != nil {
                    d.logger.Warnf("failed to Set MAC of %q: %v. Using kernel generated MAC.", hostVethName, err)
                }
            }
    
            // ...
            hasIPv4 = true
    
            // ip link set up           
            if err = netlink.LinkSetUp(hostVeth); err != nil {
            }
            // ip link set up          
            contVeth, err := netlink.LinkByName(contVethName)
            if err = netlink.LinkSetUp(contVeth); err != nil {
            }
            // Fetch the MAC from the container Veth. This is needed by Calico.
            contVethMAC = contVeth.Attrs().HardwareAddr.String()
            if hasIPv4 {
                //              , :
                // default via 169.254.1.1 dev eth0
                // 169.254.1.1 dev eth0 scope link
                gw := net.IPv4(169, 254, 1, 1)
                gwNet := &net.IPNet{IP: gw, Mask: net.CIDRMask(32, 32)}
                err := netlink.RouteAdd(
                    &netlink.Route{
                        LinkIndex: contVeth.Attrs().Index,
                        Scope:     netlink.SCOPE_LINK,
                        Dst:       gwNet,
                    },
                )
            }
    
            //   calico-ipam      pod ip             
            for _, addr := range result.IPs {
                if err = netlink.AddrAdd(contVeth, &netlink.Addr{IPNet: &addr.Address}); err != nil {
                    return fmt.Errorf("failed to add IP addr to %q: %v", contVeth, err)
                }
            }
            // ...
            //        network namespace
            if err = netlink.LinkSetNsFd(hostVeth, int(hostNS.Fd())); err != nil {
                return fmt.Errorf("failed to move veth to host netns: %v", err)
            }
    
            return nil
        })
    
        //   veth pair       sysctls  ,           arp_proxy
        err = d.configureSysctls(hostVethName, hasIPv4, hasIPv6)
    
        // ip link set up        veth pair   
        hostVeth, err := netlink.LinkByName(hostVethName)
        if err = netlink.LinkSetUp(hostVeth); err != nil {
            return "", "", fmt.Errorf("failed to set %q up: %v", hostVethName, err)
        }
    
        //           
        err = SetupRoutes(hostVeth, result)
    
        return hostVethName, contVethMAC, err
    }
    
    func SetupRoutes(hostVeth netlink.Link, result *current.Result) error {
        //            ,       pod ip 10.217.120.85,     calid0bda9976d5  ,   :
        // 10.217.120.85 dev calid0bda9976d5 scope link
        for _, ipAddr := range result.IPs {
            route := netlink.Route{
                LinkIndex: hostVeth.Attrs().Index,
                Scope:     netlink.SCOPE_LINK,
                Dst:       &ipAddr.Address,
            }
            err := netlink.RouteAdd(&route)
            // ...
        }
        return nil
    }
    
    //            ,           。
    
    // configureSysctls configures necessary sysctls required for the host side of the veth pair for IPv4 and/or IPv6.
    func (d *linuxDataplane) configureSysctls(hostVethName string, hasIPv4, hasIPv6 bool) error {
      var err error
      if hasIPv4 {
        // Normally, the kernel has a delay before responding to proxy ARP but we know
        // that's not needed in a Calico network so we disable it.
        if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/neigh/%s/proxy_delay", hostVethName), "0"); err != nil {
            return fmt.Errorf("failed to set net.ipv4.neigh.%s.proxy_delay=0: %s", hostVethName, err)
        }
        
        // Enable proxy ARP, this makes the host respond to all ARP requests with its own
        // MAC. We install explicit routes into the containers network
        // namespace and we use a link-local address for the gateway.  Turing on proxy ARP
        // means that we don't need to assign the link local address explicitly to each
        // host side of the veth, which is one fewer thing to maintain and one fewer
        // thing we may clash over.
        if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/proxy_arp", hostVethName), "1"); err != nil {
            return fmt.Errorf("failed to set net.ipv4.conf.%s.proxy_arp=1: %s", hostVethName, err)
        }
        
        // Enable IP forwarding of packets coming _from_ this interface.  For packets to
        // be forwarded in both directions we need this flag to be set on the fabric-facing
        // interface too (or for the global default to be set).
        if err = writeProcSys(fmt.Sprintf("/proc/sys/net/ipv4/conf/%s/forwarding", hostVethName), "1"); err != nil {
            return fmt.Errorf("failed to set net.ipv4.conf.%s.forwarding=1: %s", hostVethName, err)
        }
      }
    
      if hasIPv6 {
         // ...    
      }
      
      return nil
    }

    まとめ
    これにより、calicoバイナリプラグインはsandbox containerのネットワークリソース、すなわちveth pairを作成し、ホスト側とコンテナ側のネットワークカードに対応するMACアドレスを設定し、コンテナセグメントにIPアドレスを構成するとともに、コンテナ側にルーティングデフォルトゲートウェイを構成し、ホスト側にルーティングを構成し、ターゲットアドレスをsandbox container ipのシンクホスト側veth pairネットワークカードにするとともに、シンクホスト側ネットワークカードにarp proxyとpacket forwarding機能を構成し、最後に、これらのネットワークデータに基づいてworkloadendpointオブジェクトを生成してcalico datastoreに格納します.
    しかし、calico-ipamがどのようにIPアドレスを割り当てているのか、その後、記録を学ぶ暇がある重要な論理が欠けています.