Kubeletソースコード分析(二):DockerClient

11672 ワード

ソースバージョン
kubernetes version: v1.3.0
DockerClient初期化
DockerClientはKubeletConfigのメンバーの一人です.KubeletConfig構造の紹介:
type KubeletConfig struct {
    Address                        net.IP
    AllowPrivileged                bool
...
    DockerClient                   dockertools.DockerInterface
    RuntimeCgroups                 string
    DockerExecHandler              dockertools.ExecHandler
...
}

一方、クbeletConfigの初期化はUnsecuredKubeletConfig()インタフェースで行われ、DockerEndpoint文字列のメンバーがいる最初に構築されたクbeletServer構成構造に依存する必要があります.
type KubeletServer struct {
    componentconfig.KubeletConfiguration

    AuthPath      util.StringFlag // Deprecated -- use KubeConfig instead
    KubeConfig    util.StringFlag
    APIServerList []string

    RunOnce bool

    // Insert a probability of random errors during calls to the master.
    ChaosChance float64
    // Crash immediately, rather than eating panics.
    ReallyCrashForTesting bool
    SystemReserved        config.ConfigurationMap
    KubeReserved          config.ConfigurationMap
}

type KubeletConfiguration struct {
    // config is the path to the config file or directory of files
    Config string `json:"config"`
...
    DockerEndpoint string `json:"dockerEndpoint"`
...    

実際にこのパラメータが指定されていない場合は、端点がデフォルトで使用されます」unix:///var/run/docker.sock「DockerEndpointとして.NewEnvClient()インタフェースを表示できます.
クbeletConfigの初期化インタフェースUnsecuredKubeletConfig()に戻ります.
func UnsecuredKubeletConfig(s *options.KubeletServer) (*KubeletConfig, error) {
    hostNetworkSources, err := kubetypes.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
    if err != nil {
        return nil, err
    }
...
    return &KubeletConfig{
        Address:                      net.ParseIP(s.Address),
        AllowPrivileged:              s.AllowPrivileged,
        ...
        DockerClient:                 dockertools.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration), // TODO(random-liu): Set RuntimeRequestTimeout for rkt.
...
    }

次にdockertoolsの表示を続行します.ConnectToDockerOrDie(s.DockerEndpoint, s.RuntimeRequestTimeout.Duration).
func ConnectToDockerOrDie(dockerEndpoint string, requestTimeout time.Duration) DockerInterface {
    if dockerEndpoint == "fake://" {
        return NewFakeDockerClient()
    }
    client, err := getDockerClient(dockerEndpoint)
    if err != nil {
        glog.Fatalf("Couldn't connect to docker: %v", err)
    }
    glog.Infof("Start docker client with request timeout=%v", requestTimeout)
    return newKubeDockerClient(client, requestTimeout)
}

クbelet起動時に「docker-endpoint」パラメータが入力されなかった場合、s.DockerEndpointは空であることを以前に理解しました.s.RuntimeRequestTimeout.Duration値は、NewKubeletServer()関数の初期化を2 minで表示できます.getDockerClient()インタフェースは簡単です:getDockerClient-->dockerapi.NewEnvClient() --> NewClient().NewClient()インタフェースは次のとおりです.
func NewClient(host string, version string, client *http.Client, httpHeaders map[string]string) (*Client, error) {
    proto, addr, basePath, err := ParseHost(host)
    if err != nil {
        return nil, err
    }

    transport, err := transport.NewTransportWithHTTP(proto, addr, client)
    if err != nil {
        return nil, err
    }

    return &Client{
        proto:             proto,
        addr:              addr,
        basePath:          basePath,
        transport:         transport,
        version:           version,
        customHTTPHeaders: httpHeaders,
    }, nil
}

「docker-endpoint」パラメータが入力されていない場合、デフォルト値は「unix:///var/run/docker.sock「.つまりhostパラメータがこの値です.ParseHost()はhostに基づいて解析し、transport-->Clientを作成します.Client構造は次のとおりです.
type Client struct {
    // proto holds the client protocol i.e. unix.
    proto string
    // addr holds the client address.
    addr string
    // basePath holds the path to prepend to the requests.
    basePath string
    // transport is the interface to send request with, it implements transport.Client.
    transport transport.Client
    // version of the server to talk to.
    version string
    // custom http headers configured by users.
    customHTTPHeaders map[string]string
}

Clientの作成に成功すると、最終的に説明したConnectToDockerOrDie()インタフェースがnewKubeDockerClient()を呼び出してpkg/kubelet/dockertools/kube_を生成します.docker_client.goのkubeDockerClient構造:
type kubeDockerClient struct {
    // timeout is the timeout of short running docker operations.
    timeout time.Duration
    client  *dockerapi.Client
}

初期化はここで終わります.最初に戻って、DockerClient定義:dockertoolsを紹介します.DockerInterfaceは次のとおりです.
type DockerInterface interface {
    ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error)
    InspectContainer(id string) (*dockertypes.ContainerJSON, error)
    CreateContainer(dockertypes.ContainerCreateConfig) (*dockertypes.ContainerCreateResponse, error)
    StartContainer(id string) error
    StopContainer(id string, timeout int) error
    RemoveContainer(id string, opts dockertypes.ContainerRemoveOptions) error
    InspectImage(image string) (*dockertypes.ImageInspect, error)
    ListImages(opts dockertypes.ImageListOptions) ([]dockertypes.Image, error)
    PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error
    RemoveImage(image string, opts dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDelete, error)
    ImageHistory(id string) ([]dockertypes.ImageHistory, error)
    Logs(string, dockertypes.ContainerLogsOptions, StreamOptions) error
    Version() (*dockertypes.Version, error)
    Info() (*dockertypes.Info, error)
    CreateExec(string, dockertypes.ExecConfig) (*dockertypes.ContainerExecCreateResponse, error)
    StartExec(string, dockertypes.ExecStartCheck, StreamOptions) error
    InspectExec(id string) (*dockertypes.ContainerExecInspect, error)
    AttachToContainer(string, dockertypes.ContainerAttachOptions, StreamOptions) error
}

最終初期化は構造体kubeDockerClientを返したので、DockerInterfaceインタフェースの実装は、kubeDockerClient構造体が存在するファイルpkg/kubelet/dockertools/kube_に戻ることができます.docker_client.goインタフェース実装を表示します.
DockeClientインタフェース分析
ソースディレクトリ:pkg/kubelet/dockertools/kube_docker_client.go実装のインタフェースは,kubeDockerClient構造体がすべてのDockerInterfaceインタフェースを実装していることがわかる.これらのインタフェースは、dockerの操作インタフェースをカプセル化し、次のインタフェースを分析します.
func (d *kubeDockerClient) ListContainers(options dockertypes.ContainerListOptions) ([]dockertypes.Container, error) {
    ctx, cancel := d.getTimeoutContext()
    defer cancel()
    containers, err := d.client.ContainerList(ctx, options)
    if ctxErr := contextError(ctx); ctxErr != nil {
        return nil, ctxErr
    }
    if err != nil {
        return nil, err
    }
    return containers, nil
}

このListContainers()インタフェースの鍵はd.clientを呼び出すことである.ContainerList(ctx, options).したがって、キーオブジェクトはクライアントであり、初期化時に説明したClient構造体に戻ります.Client構造が存在するファイル:vendor/github.com/docker/engine-api/client/client.goClient package構造:docker APIを操作するインタフェースはこれらのファイルにカプセル化されており、時間があれば深く理解することができます.ここでは一つ一つ紹介しません.d.clientに戻ります.ContainerList(ctx,options)は、以下のように実現される.
func (cli *Client) ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) {
    query := url.Values{}

    if options.All {
        query.Set("all", "1")
    }

    if options.Limit != -1 {
        query.Set("limit", strconv.Itoa(options.Limit))
    }

    if options.Since != "" {
        query.Set("since", options.Since)
    }

    if options.Before != "" {
        query.Set("before", options.Before)
    }

    if options.Size {
        query.Set("size", "1")
    }

    if options.Filter.Len() > 0 {
        filterJSON, err := filters.ToParamWithVersion(cli.version, options.Filter)

        if err != nil {
            return nil, err
        }

        query.Set("filters", filterJSON)
    }

    resp, err := cli.get(ctx, "/containers/json", query, nil)
    if err != nil {
        return nil, err
    }

    var containers []types.Container
    err = json.NewDecoder(resp.body).Decode(&containers)
    ensureReaderClosed(resp)
    return containers, err
}

前述のパラメータの初期化は、GETリクエストを構築しcliを呼び出すことである.get()はhttpRequestです.
func (cli *Client) get(ctx context.Context, path string, query url.Values, headers map[string][]string) (*serverResponse, error) {
    return cli.sendRequest(ctx, "GET", path, query, nil, headers)
}

func (cli *Client) sendRequest(ctx context.Context, method, path string, query url.Values, obj interface{}, headers map[string][]string) (*serverResponse, error) {
    var body io.Reader

    if obj != nil {
        var err error
        body, err = encodeData(obj)
        if err != nil {
            return nil, err
        }
        if headers == nil {
            headers = make(map[string][]string)
        }
        headers["Content-Type"] = []string{"application/json"}
    }

    return cli.sendClientRequest(ctx, method, path, query, body, headers)
}

func (cli *Client) sendClientRequest(ctx context.Context, method, path string, query url.Values, body io.Reader, headers map[string][]string) (*serverResponse, error) {
    serverResp := &serverResponse{
        body:       nil,
        statusCode: -1,
    }

...
    req, err := cli.newRequest(method, path, query, body, headers)
    if cli.proto == "unix" || cli.proto == "npipe" {
        // For local communications, it doesn't matter what the host is. We just
        // need a valid and meaningful host name. (See #189)
        req.Host = "docker"
    }
    req.URL.Host = cli.addr
    req.URL.Scheme = cli.transport.Scheme()

    if expectedPayload && req.Header.Get("Content-Type") == "" {
        req.Header.Set("Content-Type", "text/plain")
    }

    resp, err := cancellable.Do(ctx, cli.transport, req)
    if resp != nil {
        serverResp.statusCode = resp.StatusCode
    }

...

    if serverResp.statusCode < 200 || serverResp.statusCode >= 400 {
        body, err := ioutil.ReadAll(resp.Body)
        if err != nil {
            return serverResp, err
        }
        if len(body) == 0 {
            return serverResp, fmt.Errorf("Error: request returned %s for API route and version %s, check if the server supports the requested API version", http.StatusText(serverResp.statusCode), req.URL)
        }
        return serverResp, fmt.Errorf("Error response from daemon: %s", bytes.TrimSpace(body))
    }

    serverResp.body = resp.Body
    serverResp.header = resp.Header
    return serverResp, nil
}

func Do(ctx context.Context, client transport.Sender, req *http.Request) (*http.Response, error) {
...
    result := make(chan responseAndError, 1)

    go func() {
        resp, err := client.Do(req)
        testHookDoReturned()
        result 

httpRequestの呼び出しプロセス全体がリストされ、最終的にclientが呼び出されます.Do()は、クライアントオブジェクトが以前の初期化プロセスに戻る必要があり、実際にはvemdor/githubを呼び出す.com/docker/engine-api/client/client.goのClient.Transportオブジェクトを初期化するときにapiTransportオブジェクトに設定します.
type apiTransport struct {
    *http.Client
    *tlsInfo
    transport *http.Transport
}

だからクライアントDo()は実際にhttpを呼び出す.Client.Do().OK、これで分析が終わり、具体的な各インタフェースが実現するか、ソースコードを表示するのに時間がかかりますが、大同小異です.
ソースコードを学ぶ過程で、上で紹介したcancellableのような多くの古典的な実現を見ることができます.Do()インタフェース実装,golangが非常に推奨する「コンシステント+channel」方式は,select case方式でコンシステント処理の結果をループ待ち,確かに便利である.