kubernetesソース読解のkubelet(二)
18710 ワード
前にクbeletが起動したブログを書きましたが、興味があれば行ってみてください.多くの詳細は説明されていませんが、詳細について詳しく説明します.
kubeletの最も重要な職責はpod(複数の容器)のライフサイクル管理ですが、podの源はどこですか?作成中に無視されがちな関数を見てみましょう
makePodSourceConfigという関数です.podソースが定義されています.具体的には以下の通りです.
この関数ではPodManifestPath、ManifestURL、KubeClientの3つの方法でpodソースを取得することをサポートしていますが、実は前の2つはkubeletをstandaloneモードにし、k 8 sのapiと管理を離れ、単独で実行し、mainfestで作成したpodはローカルpodと呼ばれ、apiserverを通じて管理できません.最も重要な方法は、コンテナ情報をKubeClientで取得することです.ここでwatch apiserverは、apiのホスト上のコンテナの変化を監視します.
ここにlistwatchが作成され、ホスト上のすべてのネーミングスペースpodsの変化を監視します.変化の更新をupdatesのchannelに入れます.次に、構造体lwを作成する関数NewListWatchFromClientを詳しく見てみましょう.
リストFuncとwtachFuncが定義されています.次はnewSourceApiserverFromLWがupdatesに更新を入れる方法を見てみましょう.
sendメソッドでpods変化をupdates channelに送信します.send関数はどのようにトリガーされますか?続けて、まずreflectorを作成し、runを実行します.
上にreflectorの構造体が作成され、podの情報を格納するstoreがあります.上のsendメソッドはstoreの中のpushFuncです.このメソッドは興味深いです.storeデータが変化したときに呼び出され、監視の目的を達成します.
reflectorを作成すると、Runが実行され、コードを見続けます.
Run内周期の呼び出しListAndWatchメソッドは,具体的なメソッド実装を見て,
具体的にはwatchHandlerメソッドを見て、重要な部分を切り取ります.
イベントタイプを判断してstoreのメソッドをそれぞれ呼び出し、storeデータを更新するとsendメソッドがトリガーされ、updates channelに更新情報が格納されます.OK、プロセス全体が終わりました!
kubelet podsソース
kubeletの最も重要な職責はpod(複数の容器)のライフサイクル管理ですが、podの源はどこですか?作成中に無視されがちな関数を見てみましょう
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
...
if kubeDeps.PodConfig == nil {
var err error
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
if err != nil {
return nil, err
}
}
...
makePodSourceConfigという関数です.podソースが定義されています.具体的には以下の通りです.
func makePodSourceConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, nodeName types.NodeName) (*config.PodConfig, error) {
manifestURLHeader := make(http.Header)
if kubeCfg.ManifestURLHeader != "" {
pieces := strings.Split(kubeCfg.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", kubeCfg.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// define file config source
if kubeCfg.PodManifestPath != "" {
glog.Infof("Adding manifest file: %v", kubeCfg.PodManifestPath)
config.NewSourceFile(kubeCfg.PodManifestPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
}
// define url config source
if kubeCfg.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kubeCfg.ManifestURL, manifestURLHeader)
config.NewSourceURL(kubeCfg.ManifestURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
glog.Infof("Watching apiserver")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, cfg.Channel(kubetypes.ApiserverSource))
}
return cfg, nil
}
この関数ではPodManifestPath、ManifestURL、KubeClientの3つの方法でpodソースを取得することをサポートしていますが、実は前の2つはkubeletをstandaloneモードにし、k 8 sのapiと管理を離れ、単独で実行し、mainfestで作成したpodはローカルpodと呼ばれ、apiserverを通じて管理できません.最も重要な方法は、コンテナ情報をKubeClientで取得することです.ここでwatch apiserverは、apiのホスト上のコンテナの変化を監視します.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, updates chan<- interface{}) {
lw := cache.NewListWatchFromClient(c.Core().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector(api.PodHostField, string(nodeName)))
newSourceApiserverFromLW(lw, updates)
}
ここにlistwatchが作成され、ホスト上のすべてのネーミングスペースpodsの変化を監視します.変化の更新をupdatesのchannelに入れます.次に、構造体lwを作成する関数NewListWatchFromClientを詳しく見てみましょう.
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
FieldsSelectorParam(fieldSelector).
Do().
Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
FieldsSelectorParam(fieldSelector).
Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
リストFuncとwtachFuncが定義されています.次はnewSourceApiserverFromLWがupdatesに更新を入れる方法を見てみましょう.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan.Pod
for _, o := range objs {
pods = append(pods, o.(*v1.Pod))
}
updates .PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
}
cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0).Run()
}
sendメソッドでpods変化をupdates channelに送信します.send関数はどのようにトリガーされますか?続けて、まずreflectorを作成し、runを実行します.
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
expectedType: reflect.TypeOf(expectedType),
period: time.Second,
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
return r
}
上にreflectorの構造体が作成され、podの情報を格納するstoreがあります.上のsendメソッドはstoreの中のpushFuncです.このメソッドは興味深いです.storeデータが変化したときに呼び出され、監視の目的を達成します.
func NewUndeltaStore(pushFunc func([]interface{}), keyFunc KeyFunc) *UndeltaStore {
return &UndeltaStore{
Store: NewStore(keyFunc),
PushFunc: pushFunc,
}
}
reflectorを作成すると、Runが実行され、コードを見続けます.
func (r *Reflector) Run() {
glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
go wait.Until(func() {
if err := r.ListAndWatch(wait.NeverStop); err != nil {
utilruntime.HandleError(err)
}
}, r.period, wait.NeverStop)
}
Run内周期の呼び出しListAndWatchメソッドは,具体的なメソッド実装を見て,
func (r *Reflector) ListAndWatch(stopCh chan struct{}) error {
glog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)
var resourceVersion string
resyncCh, cleanup := r.resyncChan()
defer cleanup()
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
options := metav1.ListOptions{ResourceVersion: "0"}
list, err := r.listerWatcher.List(options)
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)
}
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
r.setLastSyncResourceVersion(resourceVersion)
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
for {
select {
case case return
case return
}
if r.ShouldResync == nil || r.ShouldResync() {
glog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
timemoutseconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timemoutseconds,
}
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
glog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if urlError, ok := err.(*url.Error); ok {
if opError, ok := urlError.Err.(*net.OpError); ok {
if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {
time.Sleep(time.Second)
continue
}
}
}
return nil
}
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
glog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)
}
return nil
}
}
}
具体的にはwatchHandlerメソッドを見て、重要な部分を切り取ります.
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Modified:
err := r.store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := r.store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
イベントタイプを判断してstoreのメソッドをそれぞれ呼び出し、storeデータを更新するとsendメソッドがトリガーされ、updates channelに更新情報が格納されます.OK、プロセス全体が終わりました!