kubernetesソース読解のkubelet(二)

18710 ワード

前にクbeletが起動したブログを書きましたが、興味があれば行ってみてください.多くの詳細は説明されていませんが、詳細について詳しく説明します.

kubelet podsソース


kubeletの最も重要な職責はpod(複数の容器)のライフサイクル管理ですが、podの源はどこですか?作成中に無視されがちな関数を見てみましょう
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
...
    if kubeDeps.PodConfig == nil {
        var err error
        kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
        if err != nil {
            return nil, err
        }
    }
...

makePodSourceConfigという関数です.podソースが定義されています.具体的には以下の通りです.
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName types.NodeName) (*config.PodConfig, error) {
    manifestURLHeader := make(http.Header)
    if kubeCfg.ManifestURLHeader != "" {
        pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
        if len(pieces) != 2 {
            return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
        }
        manifestURLHeader.Set(pieces[0], pieces[1])
    }

    // source of all configuration
    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)

    // define file config source
    if kubeCfg.PodManifestPath != "" {
        glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
        config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
    }

    // define url config source
    if kubeCfg.ManifestURL != "" {
        glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
        config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
    }
    if kubeDeps.KubeClient != nil {
        glog.Infof("Watching apiserver")
        config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
    }
    return cfg, nil
}

この関数ではPodManifestPath、ManifestURL、KubeClientの3つの方法でpodソースを取得することをサポートしていますが、実は前の2つはkubeletをstandaloneモードにし、k 8 sのapiと管理を離れ、単独で実行し、mainfestで作成したpodはローカルpodと呼ばれ、apiserverを通じて管理できません.最も重要な方法は、コンテナ情報をKubeClientで取得することです.ここでwatch apiserverは、apiのホスト上のコンテナの変化を監視します.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
    lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
    newSourceApiserverFromLW(lw, updates)
}

ここにlistwatchが作成され、ホスト上のすべてのネーミングスペースpodsの変化を監視します.変化の更新をupdatesのchannelに入れます.次に、構造体lwを作成する関数NewListWatchFromClientを詳しく見てみましょう.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
        return c.Get().
            Namespace(namespace).
            Resource(resource).
            VersionedParams(&options, metav1.ParameterCodec).
            FieldsSelectorParam(fieldSelector).
            Do().
            Get()
    }
    watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
        options.Watch = true
        return c.Get().
            Namespace(namespace).
            Resource(resource).
            VersionedParams(&options, metav1.ParameterCodec).
            FieldsSelectorParam(fieldSelector).
            Watch()
    }
    return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

リストFuncとwtachFuncが定義されています.次はnewSourceApiserverFromLWがupdatesに更新を入れる方法を見てみましょう.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan.Pod
        for _, o := range objs {
            pods = append(pods, o.(*v1.Pod))
        }
        updates .PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
    }
    cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}

sendメソッドでpods変化をupdates channelに送信します.send関数はどのようにトリガーされますか?続けて、まずreflectorを作成し、runを実行します.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
    r := &Reflector{
        name:          name,
        listerWatcher: lw,
        store:         store,
        expectedType:  reflect.TypeOf(expectedType),
        period:        time.Second,
        resyncPeriod:  resyncPeriod,
        clock:         &clock.RealClock{},
    }
    return r
}

上にreflectorの構造体が作成され、podの情報を格納するstoreがあります.上のsendメソッドはstoreの中のpushFuncです.このメソッドは興味深いです.storeデータが変化したときに呼び出され、監視の目的を達成します.
func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore {
    return &UndeltaStore{
        Store:    NewStore(keyFunc),
        PushFunc: pushFunc,
    }
}

reflectorを作成すると、Runが実行され、コードを見続けます.
func (r *Reflector) Run() {
    glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
    go wait.Until(func() {
        if err := r.ListAndWatch(wait.NeverStop); err != nil {
            utilruntime.HandleError(err)
        }
    }, r.period, wait.NeverStop)
}

Run内周期の呼び出しListAndWatchメソッドは,具体的なメソッド実装を見て,
func (r *Reflector) ListAndWatch(stopCh chan struct{}) error {
    glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
    var resourceVersion string
    resyncCh, cleanup := r.resyncChan()
    defer cleanup()

    // Explicitly set "0" as resource version - it's fine for the List()
    // to be served from cache and potentially be delayed relative to
    // etcd contents. Reflector framework will catch up via Watch() eventually.
    options := metav1.ListOptions{ResourceVersion: "0"}
    list, err := r.listerWatcher.List(options)
    if err != nil {
        return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
    }
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    }
    resourceVersion = listMetaInterface.GetResourceVersion()
    items, err := meta.ExtractList(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
    }
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
    }
    r.setLastSyncResourceVersion(resourceVersion)

    resyncerrc := make(chan error, 1)
    cancelCh := make(chan struct{})
    defer close(cancelCh)
    go func() {
        for {
            select {
            case case return
            case return
            }
            if r.ShouldResync == nil || r.ShouldResync() {
                glog.V(4).Infof("%s: forcing resync", r.name)
                if err := r.store.Resync(); err != nil {
                    resyncerrc return
                }
            }
            cleanup()
            resyncCh, cleanup = r.resyncChan()
        }
    }()

    for {
        timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
        options = metav1.ListOptions{
            ResourceVersion: resourceVersion,
            // We want to avoid situations of hanging watchers. Stop any wachers that do not
            // receive any events within the timeout window.
            TimeoutSeconds: &timemoutseconds,
        }

        w, err := r.listerWatcher.Watch(options)
        if err != nil {
            switch err {
            case io.EOF:
                // watch closed normally
            case io.ErrUnexpectedEOF:
                glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
            default:
                utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
            }
            // If this is "connection refused" error, it means that most likely apiserver is not responsive.
            // It doesn't make sense to re-list all objects because most likely we will be able to restart
            // watch where we ended.
            // If that's the case wait and resend watch request.
            if urlError, ok := err.(*url.Error); ok {
                if opError, ok := urlError.Err.(*net.OpError); ok {
                    if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
                        time.Sleep(time.Second)
                        continue
                    }
                }
            }
            return nil
        }

        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            if err != errorStopRequested {
                glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
            }
            return nil
        }
    }
}

具体的にはwatchHandlerメソッドを見て、重要な部分を切り取ります.
switch event.Type {
            case watch.Added:
                err := r.store.Add(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Modified:
                err := r.store.Update(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
                }
            case watch.Deleted:
                // TODO: Will any consumers need access to the "last known
                // state", which is passed in event.Object? If so, may need
                // to change this.
                err := r.store.Delete(event.Object)
                if err != nil {
                    utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
                }
            default:
                utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
            }

イベントタイプを判断してstoreのメソッドをそれぞれ呼び出し、storeデータを更新するとsendメソッドがトリガーされ、updates channelに更新情報が格納されます.OK、プロセス全体が終わりました!