【kubernetes/k 8 sソース分析】kubelet criソース分析

9051 ワード

CRIの基本原理
初期のkubernetesはdockerをデフォルトのruntimeとして使用し、その後rktを追加し、新しい実行を追加するたびにk 8 sはインタフェースを変更します.container runtimeが加わると,異なる容器の実現と機能の違いが大きい.標準はCRI(Container RunTime Interface)です.k 8 sが底層の容器運転に関心を持たなくなった場合、kubeletはCRI serverだけを感知し、CRI serverはCRI基準に従って実現するだけである.
 
CRIインタフェースはk 8 sに定義.io/cri-api/pkg/apis/services.go具体的に定義されているインタフェースを見てみましょう
 
RuntimeServiceインタフェース
RuntimeVersionerは、runtimeの名前、バージョン、APIバージョン、インタフェース関数Versionを返します.
  • Version(apiVersion string) (*runtimeapi.VersionResponse, error)

  • ContainerManager、containerの操作インタフェースに対して、簡単明瞭です
  • CreateContainer(podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
  • StartContainer(containerID string) error
  • StopContainer(containerID string, timeout int64) error
  • RemoveContainer(containerID string) error
  • ListContainers(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error)
  • ContainerStatus(containerID string) (*runtimeapi.ContainerStatus, error)
  • UpdateContainerResources(containerID string, resources *runtimeapi.LinuxContainerResources) error
  • ExecSync(containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error)
  • Exec(*runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error)
  • Attach(req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error)
  • ReopenContainerLog(ContainerID string) error

  • PodSandboxManagerは、sandbox(Pauseコンテナまたは仮想マシン)の操作を担当し、スレッドが安全で、以下の方法が含まれています.
  • RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error)
  • StopPodSandbox(podSandboxID string) error
  • RemovePodSandbox(podSandboxID string) error
  • PodSandboxStatus(podSandboxID string) (*runtimeapi.PodSandboxStatus, error)
  • ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error)
  • PortForward(*runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error)

  • 状態照会インタフェース等
    // RuntimeService interface should be implemented by a container runtime.
    // The methods should be thread-safe.
    type RuntimeService interface {
       RuntimeVersioner
       ContainerManager
       PodSandboxManager
       ContainerStatsManager
    
       // UpdateRuntimeConfig updates runtime configuration if specified
       UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeConfig) error
       // Status returns the status of the runtime.
       Status() (*runtimeapi.RuntimeStatus, error)
    }

     
    ImageManagerServiceインタフェース
    クエリー、ダウンロード、削除など、ミラーのインタフェースを定義します.
    // ImageManagerService interface should be implemented by a container image
    // manager.
    // The methods should be thread-safe.
    type ImageManagerService interface {
    	// ListImages lists the existing images.
    	ListImages(filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error)
    	// ImageStatus returns the status of the image.
    	ImageStatus(image *runtimeapi.ImageSpec) (*runtimeapi.Image, error)
    	// PullImage pulls an image with the authentication config.
    	PullImage(image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error)
    	// RemoveImage removes the image.
    	RemoveImage(image *runtimeapi.ImageSpec) error
    	// ImageFsInfo returns information of the filesystem that is used to store images.
    	ImageFsInfo() ([]*runtimeapi.FilesystemUsage, error)
    }

     
    RunKubelet
            -->  createAndInitKubelet
                     -->  NewMainKubelet
    -->getRuntimeAndImageServices(第1章説明)
                             -->  NewKubeGenericRuntimeManager
     
    1.getRuntimeAndImageServices関数
    NewRemoteRuntimeServiceはremote runtimeサービスをインスタンス化し、上記のRuntimeServiceインタフェースを実現
    NewRemoteImageServiceはremoteミラーサービスをインスタンス化し、上記のImageManagerServiceインタフェースを実現
    func getRuntimeAndImageServices(remoteRuntimeEndpoint string, remoteImageEndpoint string, runtimeRequestTimeout metav1.Duration) (internalapi.RuntimeService, internalapi.ImageManagerService, error) {
    	rs, err := remote.NewRemoteRuntimeService(remoteRuntimeEndpoint, runtimeRequestTimeout.Duration)
    	if err != nil {
    		return nil, nil, err
    	}
    	is, err := remote.NewRemoteImageService(remoteImageEndpoint, runtimeRequestTimeout.Duration)
    	if err != nil {
    		return nil, nil, err
    	}
    	return rs, is, err
    }

    1.1 NewRemoteRuntimeService関数
    実装はremote endpointとGRPC接続を確立し、RemoteRuntimeServiceを実例化し、RuntimeServiceインタフェースを実現し、パス
    pkg/kubelet/remote/remote_runtime.goでは、インタフェースの実装は、GRPCクライアントを呼び出してGRPC要求を送信することである
    // NewRemoteRuntimeService creates a new internalapi.RuntimeService.
    func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration) (internalapi.RuntimeService, error) {
    	klog.V(3).Infof("Connecting to runtime service %s", endpoint)
    	addr, dailer, err := util.GetAddressAndDialer(endpoint)
    	if err != nil {
    		return nil, err
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
    	defer cancel()
    
    	conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
    	if err != nil {
    		klog.Errorf("Connect remote runtime %s failed: %v", addr, err)
    		return nil, err
    	}
    
    	return &RemoteRuntimeService{
    		timeout:       connectionTimeout,
    		runtimeClient: runtimeapi.NewRuntimeServiceClient(conn),
    		logReduction:  logreduction.NewLogReduction(identicalErrorDelay),
    	}, nil
    }
    

    1.1.1 RunPodSandbox sandboxの作成
    remote serverにGRPC RunPodSandboxRequestリクエストを送信
    // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
    // the sandbox is in ready state.
    func (r *RemoteRuntimeService) RunPodSandbox(config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
    	// Use 2 times longer timeout for sandbox operation (4 mins by default)
    	// TODO: Make the pod sandbox timeout configurable.
    	ctx, cancel := getContextWithTimeout(r.timeout * 2)
    	defer cancel()
    
    	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
    		Config:         config,
    		RuntimeHandler: runtimeHandler,
    	})
    	if err != nil {
    		klog.Errorf("RunPodSandbox from runtime service failed: %v", err)
    		return "", err
    	}
    
    	if resp.PodSandboxId == "" {
    		errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.GetMetadata())
    		klog.Errorf("RunPodSandbox failed: %s", errorMessage)
    		return "", errors.New(errorMessage)
    	}
    
    	return resp.PodSandboxId, nil
    }

    1.2 NewRemoteImageService関数
    実装方式はremoteとGRPC接続を確立し、RemoteImageServiceをインスタンス化し、上述したImageManagerServiceインタフェースを実現する
    ,実装経路pkg/kubelet/remote/remote_image.goでは、インタフェースの実装は、GRPCクライアントを呼び出してGRPC要求を送信することである
    // NewRemoteImageService creates a new internalapi.ImageManagerService.
    func NewRemoteImageService(endpoint string, connectionTimeout time.Duration) (internalapi.ImageManagerService, error) {
    	klog.V(3).Infof("Connecting to image service %s", endpoint)
    	addr, dailer, err := util.GetAddressAndDialer(endpoint)
    	if err != nil {
    		return nil, err
    	}
    
    	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
    	defer cancel()
    
    	conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(), grpc.WithDialer(dailer), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
    	if err != nil {
    		klog.Errorf("Connect remote image service %s failed: %v", addr, err)
    		return nil, err
    	}
    
    	return &RemoteImageService{
    		timeout:     connectionTimeout,
    		imageClient: runtimeapi.NewImageServiceClient(conn),
    	}, nil
    }

     
    2.NewKubeGenericRuntimeManager関数
    kubeGenericRuntimeManagerをインスタンス化し、KubeGenericRuntimeインタフェースを実現し、pkg/kubelet/kuberuntime/kuberuntime_に定義するmanager.goでは、インタフェースには、コンテナの実行時、コマンドインタフェースが含まれます.
    // NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
    func NewKubeGenericRuntimeManager(
    	recorder record.EventRecorder,
    	livenessManager proberesults.Manager,
    	seccompProfileRoot string,
    	containerRefManager *kubecontainer.RefManager,
    	machineInfo *cadvisorapi.MachineInfo,
    	podStateProvider podStateProvider,
    	osInterface kubecontainer.OSInterface,
    	runtimeHelper kubecontainer.RuntimeHelper,
    	httpClient types.HttpGetter,
    	imageBackOff *flowcontrol.Backoff,
    	serializeImagePulls bool,
    	imagePullQPS float32,
    	imagePullBurst int,
    	cpuCFSQuota bool,
    	cpuCFSQuotaPeriod metav1.Duration,
    	runtimeService internalapi.RuntimeService,
    	imageService internalapi.ImageManagerService,
    	internalLifecycle cm.InternalContainerLifecycle,
    	legacyLogProvider LegacyLogProvider,
    	runtimeClassManager *runtimeclass.Manager,
    ) (KubeGenericRuntime, error)

    付与runtime
    klet.containerRuntime = runtime
    klet.streamingRuntime = runtime
    klet.runner = runtime