kubelet原理解析四:probeManager


概要
Kubernetesでは,システムとアプリケーションの健康診断タスクはkubeletによって達成され,本論文では主にkubeletにおけるprobemanagerに関する実現原理を議論する.
k 8 sの様々なprobeの使い方がまだ分からない場合は、私が前に書いたK 8 Sにおける健康診断メカニズムを見て、実践の観点から紹介します.
statusManager
クbeletが初期化されるとstatusManagerとprobeManagerが作成されます.どちらもpod状態に関するロジックです.kubelet原理解析一:pod管理の記事では、statusManagerがステータス情報の維持を担当し、Pod状態をApi-Serverにタイムリーに更新することについて言及しています.
ただし、podステータスの変化を監視するのではなく、probeManagerなどの他のコンポーネントの呼び出しに対応するインタフェースを提供します.probeManagerはpod内のコンテナの健康状態をタイミングよく監視し、状態が変化したことを発見したらstatusManagerが提供する方法を呼び出してpodの状態を更新します.
klet.statusManager = status.NewManager(kubeClient, klet.podManager)
klet.probeManager = prober.NewManager(
        klet.statusManager,
        klet.livenessManager,
        klet.runner,
        containerRefManager,
        kubeDeps.Recorder)

statusManagerコード:pkg/kubelet/status/status_manager.go
type PodStatusProvider interface {
    GetPodStatus(uid types.UID) (api.PodStatus, bool)
}

type Manager interface {
    PodStatusProvider
    Start()
    SetPodStatus(pod *api.Pod, status api.PodStatus)
    SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
    TerminatePod(pod *api.Pod)
    RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}

SetPodStatus:   pod         ,       ,        apiserver,    kubelet    pod           

SetContainerReadiness:         pod             ,       ,   pod      

TerminatePod:kubelet     pod    ,       ,  pod           terminated   

RemoveOrphanedStatuses:     pod,                  

Start()メソッドは、kubeletが実行されている間に呼び出され、goroutineが更新操作を実行するように起動します.
const syncPeriod = 10 * time.Second

func (m *manager) Start() {
    ......
    glog.Info("Starting to sync pod status with apiserver")
    syncTicker := time.Tick(syncPeriod)
    // syncPod and syncBatch share the same go routine to avoid sync races.
    go wait.Forever(func() {
        select {
        case syncRequest := 

このgoroutineは2つのchannelからデータを絶えず傍受して処理することができます:syncTickerはタイマで、つまりapiserverが自分のキャッシュした最新pod状態と一致することをタイミングよく保証します;podStatusChannelはすべてのpod状態更新が送信される場所であり、呼び出し元はこのchannelを直接操作するのではなく、上述した状態を修正する様々な方法を呼び出すことで、これらの方法の内部でこのchannelにデータを書き込む.
m.syncPodはパラメータのpodとその状態情報に基づいてapiserverのデータを更新し、podが削除されたことを発見した場合も内部データ構造から削除する.
probeManager
probeManagerはpod中の容器の健康状態を検査し、現在3種類のprobeがある.
  • liveness:Kubernetesにあなたのアプリケーションが健康かどうかを知らせます.もしあなたのアプリケーションが健康でない場合、KubernetesはPodを削除し、新しい置換を開始します(RestartPolicyに関連).LivenessプローブはKubernetesがいつ容器を再起動することによって自己治癒を実現するかを教えることができる.
  • readiness:readinessはlivenessと原理は同じですが、ReadinessプローブはKubernetesがいつコンテナをService負荷等化に加え、対外的にサービスを提供できるかを教えています.
  • startupProbe:1.16でサポートされている新しい特性を開始し、遅い起動容器の状態を検出します.具体的にはstartup-probes
  • を参照してください.
    すべてのpod中の容器に健康診断のプローブがあるわけではないが,ない場合は容器を検出せず,デフォルトでは容器が正常であると考えられる.新しいpodを作成するたびにkubeletはprobeManagerを呼び出します.pkg/kubelet/prober/prober_に対応するAddPod(pod)メソッドmanager.goファイル:
    func (m *manager) AddPod(pod *v1.Pod) {
        m.workerLock.Lock()
        defer m.workerLock.Unlock()
    
        key := probeKey{podUID: pod.UID}
        for _, c := range pod.Spec.Containers {
            key.containerName = c.Name
    
            if c.ReadinessProbe != nil {
                key.probeType = readiness
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Readiness probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, readiness, pod, c)
                m.workers[key] = w
                go w.run()
            }
    
            if c.LivenessProbe != nil {
                key.probeType = liveness
                if _, ok := m.workers[key]; ok {
                    klog.Errorf("Liveness probe already exists! %v - %v",
                        format.Pod(pod), c.Name)
                    return
                }
                w := newWorker(m, liveness, pod, c)
                m.workers[key] = w
                go w.run()
            }
        }
    }

    この方法では、kubeletはpodのすべてのcontainerを巡回し、probeを構成するとworkerを作成し、今回のプローブを非同期で処理します.
    // Creates and starts a new probe worker.
    func newWorker(
        m *manager,
        probeType probeType,
        pod *v1.Pod,
        container v1.Container) *worker {
    
        w := &worker{
            stopCh:       make(chan struct{}, 1), // Buffer so stop() can be non-blocking.
            pod:          pod,
            container:    container,
            probeType:    probeType,
            probeManager: m,
        }
    
        switch probeType {
        case readiness:
            w.spec = container.ReadinessProbe
            w.resultsManager = m.readinessManager
            w.initialValue = results.Failure
        case liveness:
            w.spec = container.LivenessProbe
            w.resultsManager = m.livenessManager
            w.initialValue = results.Success
        }
    
        w.proberResultsMetricLabels = prometheus.Labels{
            "probe_type":     w.probeType.String(),
            "container_name": w.container.Name,
            "pod_name":       w.pod.Name,
            "namespace":      w.pod.Namespace,
            "pod_uid":        string(w.pod.UID),
        }
    
        return w
    }
    

    workerがrunを開始するとdoProbeメソッドが呼び出されます
    func (w *worker) doProbe() (keepGoing bool) {
        defer func() { recover() }() 
        defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })
    
        // pod      ,        ,      ,       
        status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
        if !ok {
            glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
            return true
        }
    
        // pod     (         ),    ,    worker
        if status.Phase == api.PodFailed || status.Phase == api.PodSucceeded {
            glog.V(3).Infof("Pod %v %v, exiting probe worker",
                format.Pod(w.pod), status.Phase)
            return false
        }
    
        //       ,       ,    ,     ,       
        c, ok := api.GetContainerStatus(status.ContainerStatuses, w.container.Name)
        if !ok || len(c.ContainerID) == 0 {
            glog.V(3).Infof("Probe target container not found: %v - %v",
                format.Pod(w.pod), w.container.Name)
            return true 
        }
    
        // pod      ,         
        if w.containerID.String() != c.ContainerID {
            if !w.containerID.IsEmpty() {
                w.resultsManager.Remove(w.containerID)
            }
            w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
            w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
            w.onHold = false
        }
    
        if w.onHold {
            return true
        }
    
        if c.State.Running == nil {
            glog.V(3).Infof("Non-running container probed: %v - %v",
                format.Pod(w.pod), w.container.Name)
            if !w.containerID.IsEmpty() {
                w.resultsManager.Set(w.containerID, results.Failure, w.pod)
            }
            //       ,       ,   worker
            return c.State.Terminated == nil ||
                w.pod.Spec.RestartPolicy != api.RestartPolicyNever
        }
    
        //         ,               InitialDelaySeconds
        if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
            return true
        }
    
        //    prober          
        result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
        if err != nil {
            return true
        }
    
        if w.lastResult == result {
            w.resultRun++
        } else {
            w.lastResult = result
            w.resultRun = 1
        }
    
        //       ,             ,     
        if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
            (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
            return true
        }
    
        //          
        w.resultsManager.Set(w.containerID, result, w.pod)
    
        if w.probeType == liveness && result == results.Failure {
            //    liveness     ,           ,            ,    
            w.onHold = true
        }
    
        return true
    }

    liveness検出結果はresultsManagerに格納され、結果はキャッシュに保存され、m.updatesパイプに送信されます.パイプ消費者はkubeletの主なサイクルsyncLoopIterationである.
    case update := 

    liveness検出が通過しなければpodは再起動し,kubeletのsyncループで処理すればよい.しかしreadness検出に失敗してpodを再起動できないため、readnessの論理は次のとおりです.
    func (m *manager) updateReadiness() {
        update := 

    proberManagerが起動すると、goroutineを実行してreadinessManagerパイプのデータを読み込み、データに基づいてstatusManagerを呼び出してapiserverのpodの状態情報を更新します.
    サービスロジックを担当するコンポーネントがこの状態を取得すると,endpointsのコンテンツを更新する必要があるか,すなわちサービスの要求がこのpodに送信されるか否かを異なる値に基づいて決定できる.
    Probeメソッド
    上はprobemanagerの主な論理で、私たちは次に本当に探査任務を実行するprobe方法を見ます.
    // probe probes the container.
    func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (results.Result, error) {
        var probeSpec *v1.Probe
        switch probeType {
        case readiness:
            probeSpec = container.ReadinessProbe
        case liveness:
            probeSpec = container.LivenessProbe
        default:
            return results.Failure, fmt.Errorf("Unknown probe type: %q", probeType)
        }
        ...
        result, output, err := pb.runProbeWithRetries(probeType, probeSpec, pod, status, container, containerID, maxProbeRetries)
        ...

    probe主メソッド呼び出しpb.runProbeWithRetriesメソッドは、containerid、タイプ、再試行回数などを入力します.
    execメソッド
    runtimeServiceを呼び出すExecSyncメソッドはコンテナ実行コマンドに入り,結果を回収し,終了コードが0であればプローブに成功したと考える.
    command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
            return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
        
    ....
        
    func (pb *prober) newExecInContainer(container v1.Container, containerID kubecontainer.ContainerID, cmd []string, timeout time.Duration) exec.Cmd {
        return execInContainer{func() ([]byte, error) {
            return pb.runner.RunInContainer(containerID, cmd, timeout)
        }}
    }
    
    ...
    
    func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, cmd []string, timeout time.Duration) ([]byte, error) {
        stdout, stderr, err := m.runtimeService.ExecSync(id.ID, cmd, timeout)
        return append(stdout, stderr...), err
    }
    
    func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
        data, err := e.CombinedOutput()
        klog.V(4).Infof("Exec probe response: %q", string(data))
        if err != nil {
            exit, ok := err.(exec.ExitError)
            if ok {
                if exit.ExitStatus() == 0 {
                    return probe.Success, string(data), nil
                }
                return probe.Failure, string(data), nil
            }
            return probe.Unknown, "", err
        }
        return probe.Success, string(data), nil
    }
    

    HTTPメソッド
    標準的なhttpプローブテンプレートは,400>code>=200であれば成功すると考えられる.httpはサポートされていません
    func DoHTTPProbe(url *url.URL, headers http.Header, client GetHTTPInterface) (probe.Result, string, error) {
        req, err := http.NewRequest("GET", url.String(), nil)
        if err != nil {
            // Convert errors into failures to catch timeouts.
            return probe.Failure, err.Error(), nil
        }
        if _, ok := headers["User-Agent"]; !ok {
            if headers == nil {
                headers = http.Header{}
            }
            // explicitly set User-Agent so it's not set to default Go value
            v := version.Get()
            headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
        }
        req.Header = headers
        if headers.Get("Host") != "" {
            req.Host = headers.Get("Host")
        }
        res, err := client.Do(req)
        if err != nil {
            // Convert errors into failures to catch timeouts.
            return probe.Failure, err.Error(), nil
        }
        defer res.Body.Close()
        b, err := ioutil.ReadAll(res.Body)
        if err != nil {
            return probe.Failure, "", err
        }
        body := string(b)
        if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
            klog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
            return probe.Success, body, nil
        }
        klog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body)
        return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
    }

    TCPメソッド
    gRPCまたはFTPサービスは、通常、TCPプローブを使用して、指定されたポートでTCP接続を確立しようとします.
    ソケット接続が成功した場合は、成功を返します.
    func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
        conn, err := net.DialTimeout("tcp", addr, timeout)
        if err != nil {
            // Convert errors to failures to handle timeouts.
            return probe.Failure, err.Error(), nil
        }
        err = conn.Close()
        if err != nil {
            klog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
        }
        return probe.Success, "", nil
    }
    

    リファレンス
  • https://cizixs.com/2017/06/12...
  • https://kubernetes.io/docs/ta...