kube-proxyソースコード解析
広告|kubernetesオフラインインストールパッケージ、わずか3ステップ
kube-proxyソースコード解析
ipvsはiptablesモードに比べて高い性能と安定性を備えており、本稿ではこのモードのソースコード解析を主とし、iptablesモードの原理を理解したい場合は、その実現を参考にすることができ、アーキテクチャに差はない.
kube-proxyの主な機能は、サービスとendpointのイベントを傍受し、エージェントポリシーをマシンにドロップすることです.最下位はdocker/libnetworkを呼び出し、libnetworkは最終的にnetlinkとnetnsを呼び出してipvsの作成などの動作を実現する
構成の初期化
コードエントリ:
コマンドラインパラメータによるproxyServerの構成の初期化
Proxierは主要なエントリで、2つの関数を抽象化しています.
ipvsのinterfaceこれは重要です.
後でipvs_を詳しく見てみましょうlinuxはどのように上のインタフェースを実現しますか?
virtual serverとrealserver、最も重要なのはip:port、それからsessionAffinityなどのエージェントのモードです.
apiserver clientの作成
Proxierを作成ipvsモードのみに注目したproxier
このProxierには、次の方法があります.
だからipvsのこのProxierは私たちが必要とするほとんどのインタフェースを実現しました
まとめてみます.
ProxyServerの起動 clean upパラメータが付いているかどうかを確認し、持っている場合はすべてのルールをクリアして を終了します. OOM adjusterは実現していないようで、 を無視します resouceContainerも実現せず、 を無視 metricsサーバを起動します.これは重要です.例えば、promethusのmetricsを含むこのパラメータを監視したいときに入力することができます.metrics-bind-addressパラメータ informerを起動し、イベントの傍受を開始し、それぞれコヒーレント処理を開始する.
1 2 3 4私たちはあまり注目する必要はありません.5をよく見てください.
serviceConfig.RunとendpointConfig.Runはコールバック関数に値を割り当てるだけなので、登録したhandlerはinformerに与えられ、informerはイベントを傍受するとコールバックします.
では質問ですが、登録されているこのhandlerは何ですか?前文の
だからこのproxierIPVS
handlerのコールバック関数は、informerがこれらの関数をコールバックするので、自分で開発したときにこのinterfaceを登録すればいいです.
リスニングの開始
ここで実行すると、サービスendpointの削除などの動作が傍受され、コールバックされ、上の図を振り返ると、最終的にはProxierによって実現されるので、後でProxierに注目すればいいです.
それからSyncLoopを始めます.
Proxier実装
サービスを作成するときにOnServiceAddメソッドが呼び出されます.ここでは、前のステータスと現在のステータスの2つを記録し、syncRunnerに信号を送って処理させます.
サービス情報を記録すると、何もしていないことがわかります.サービスをmapに存在させ、map情報を直接削除しなければ何も処理しません.
proxier.syncRunner.Run()の中から一つの信号が送られてきました
この信号は処理されています
runnerでは信号実行が受信され、受信されていない信号は定期的に実行されます.
このbfr runnerで私たちが最も考えなければならないのはコールバック関数で、tryRunでこのコールバックがスケジューリングされた条件を満たしているかどうかをチェックします.
これは600行程度の揉み関数であり,主な論理を処理する場所でもある.
syncProxyRules markやcomment などのiptablesルールを設定します.マシンにNICがあると判断し、ipvsは上の にアドレスをバインドする必要がある.はipsetがあることを決定し、ipsetはiptablesの拡張であり、一連のアドレスにiptablesルール を設定することができる.
...(臭いし長いし、重複コードが多くて見られないし、細部の問題は自分で見ましょう)最も注目しているのは、VirtualServerの をどのように処理するかです.
実装を見て、存在しない場合は作成し、存在する場合は更新し、NICにserviceのcluster ipをバインドします.
サービスインプリメンテーションの作成
ipvsのAddVirtualServerの実装を見ることができます.主にsocketを利用してカーネルプロセスと通信しています.
Newの時に特殊なsocketを作成しました.ここでは私たちの普通のsocketのプログラミングと違いはありません.肝心なのはsyscallです.AF_NETLINKというパラメータは、カーネルプロセスとの通信を表します.
サービスを作成し、dockerサービス形式に変換し、直接呼び出します.
そしてサービス情報をパッケージ化し、socketに書けばいいです.
コンストラクションリクエスト
リクエストの構築時に渡されるのはipvsプロトコルクラスタです
カーネルと通信するメッセージヘッダを構築します
メッセージにDataを追加します.このDataは配列です.2つの方法が必要です.
例えばheaderはこのようにシーケンス化されていて、一見呆然として、長い間考えてやっと理解しました:([unsafe.Sizeof(hdr)]byte)1つの*[]byteタイプ、長さは構造体の大きさ(unsafe.Pointer(hdr))構造体をbyteポインタタイプに変換して*を加えてその値を[:]byteに変換して返します
サービス情報をカーネルに送信
普通のsocketが受信データを送信する
サービスデータパッケージ
ここでは細かく、コア思想はカーネルが一定のフォーマットの標準データしか認識していないことであり、サービス情報をその標準に従ってカーネルにパッケージして送信すればよい.
どのように梱包するかについては詳しく説明しません.
まとめ
サービスは全体的にコードが簡単ですが、実装が少し迂回しているところがあり、直接的ではないと思います.全体的にapiserverイベントを傍受し、処理よりも定期的に同期ポリシーを実行する.
kube-proxyソースコード解析
ipvsはiptablesモードに比べて高い性能と安定性を備えており、本稿ではこのモードのソースコード解析を主とし、iptablesモードの原理を理解したい場合は、その実現を参考にすることができ、アーキテクチャに差はない.
kube-proxyの主な機能は、サービスとendpointのイベントを傍受し、エージェントポリシーをマシンにドロップすることです.最下位はdocker/libnetworkを呼び出し、libnetworkは最終的にnetlinkとnetnsを呼び出してipvsの作成などの動作を実現する
構成の初期化
コードエントリ:
cmd/kube-proxy/app/server.go
Run()関数コマンドラインパラメータによるproxyServerの構成の初期化
proxyServer, err := NewProxyServer(o)
type ProxyServer struct {
// k8s client
Client clientset.Interface
EventClient v1core.EventsGetter
// ipvs
IptInterface utiliptables.Interface
IpvsInterface utilipvs.Interface
IpsetInterface utilipset.Interface
//
Proxier proxy.ProxyProvider
// ,ipvs iptables userspace kernelspace(windows)
ProxyMode string
//
ConfigSyncPeriod time.Duration
// service endpoint
ServiceEventHandler config.ServiceHandler
EndpointsEventHandler config.EndpointsHandler
}
Proxierは主要なエントリで、2つの関数を抽象化しています.
type ProxyProvider interface {
// Sync immediately synchronizes the ProxyProvider's current state to iptables.
Sync()
//
SyncLoop()
}
ipvsのinterfaceこれは重要です.
type Interface interface {
//
Flush() error
// virtual server
AddVirtualServer(*VirtualServer) error
UpdateVirtualServer(*VirtualServer) error
DeleteVirtualServer(*VirtualServer) error
GetVirtualServer(*VirtualServer) (*VirtualServer, error)
GetVirtualServers() ([]*VirtualServer, error)
// virtual server realserver, VirtualServer clusterip realServer pod( endpoint)
AddRealServer(*VirtualServer, *RealServer) error
GetRealServers(*VirtualServer) ([]*RealServer, error)
DeleteRealServer(*VirtualServer, *RealServer) error
}
後でipvs_を詳しく見てみましょうlinuxはどのように上のインタフェースを実現しますか?
virtual serverとrealserver、最も重要なのはip:port、それからsessionAffinityなどのエージェントのモードです.
type VirtualServer struct {
Address net.IP
Protocol string
Port uint16
Scheduler string
Flags ServiceFlags
Timeout uint32
}
type RealServer struct {
Address net.IP
Port uint16
Weight int
}
apiserver clientの作成
client, eventClient, err := createClients(config.ClientConnection, master)
Proxierを作成ipvsモードのみに注目したproxier
else if proxyMode == proxyModeIPVS {
glog.V(0).Info("Using ipvs Proxier.")
proxierIPVS, err := ipvs.NewProxier(
iptInterface,
ipvsInterface,
ipsetInterface,
utilsysctl.New(),
execer,
config.IPVS.SyncPeriod.Duration,
config.IPVS.MinSyncPeriod.Duration,
config.IPTables.MasqueradeAll,
int(*config.IPTables.MasqueradeBit),
config.ClusterCIDR,
hostname,
getNodeIP(client, hostname),
recorder,
healthzServer,
config.IPVS.Scheduler,
)
...
proxier = proxierIPVS
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
このProxierには、次の方法があります.
+OnEndpointsAdd(endpoints *api.Endpoints)
+OnEndpointsDelete(endpoints *api.Endpoints)
+OnEndpointsSynced()
+OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints)
+OnServiceAdd(service *api.Service)
+OnServiceDelete(service *api.Service)
+OnServiceSynced()
+OnServiceUpdate(oldService, service *api.Service)
+Sync()
+SyncLoop()
だからipvsのこのProxierは私たちが必要とするほとんどのインタフェースを実現しました
まとめてみます.
+-----------> endpointHandler
|
+-----------> serviceHandler
| ^
| | +-------------> sync
| | |
ProxyServer---------> Proxier --------> service
| |
| +-------------> endpoint
| |
+-----> ipvs interface ipvs handler
ProxyServerの起動
1 2 3 4私たちはあまり注目する必要はありません.5をよく見てください.
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)
serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
// service handler
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
// ServiceEventHandler informer
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
// endpoint
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)
go informerFactory.Start(wait.NeverStop)
serviceConfig.RunとendpointConfig.Runはコールバック関数に値を割り当てるだけなので、登録したhandlerはinformerに与えられ、informerはイベントを傍受するとコールバックします.
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnServiceSynced()")
c.eventHandlers[i].OnServiceSynced()
}
では質問ですが、登録されているこのhandlerは何ですか?前文の
serviceEventHandler = proxierIPVS
endpointsEventHandler = proxierIPVS
だからこのproxierIPVS
handlerのコールバック関数は、informerがこれらの関数をコールバックするので、自分で開発したときにこのinterfaceを登録すればいいです.
type ServiceHandler interface {
// OnServiceAdd is called whenever creation of new service object
// is observed.
OnServiceAdd(service *api.Service)
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
OnServiceUpdate(oldService, service *api.Service)
// OnServiceDelete is called whenever deletion of an existing service
// object is observed.
OnServiceDelete(service *api.Service)
// OnServiceSynced is called once all the initial even handlers were
// called and the state is fully propagated to local cache.
OnServiceSynced()
}
リスニングの開始
go informerFactory.Start(wait.NeverStop)
ここで実行すると、サービスendpointの削除などの動作が傍受され、コールバックされ、上の図を振り返ると、最終的にはProxierによって実現されるので、後でProxierに注目すればいいです.
s.Proxier.SyncLoop()
それからSyncLoopを始めます.
Proxier実装
サービスを作成するときにOnServiceAddメソッドが呼び出されます.ここでは、前のステータスと現在のステータスの2つを記録し、syncRunnerに信号を送って処理させます.
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
サービス情報を記録すると、何もしていないことがわかります.サービスをmapに存在させ、map情報を直接削除しなければ何も処理しません.
change, exists := scm.items[*namespacedName]
if !exists {
change = &serviceChange{}
// service
change.previous = serviceToServiceMap(previous)
scm.items[*namespacedName] = change
}
// service
change.current = serviceToServiceMap(current)
,
if reflect.DeepEqual(change.previous, change.current) {
delete(scm.items, *namespacedName)
}
proxier.syncRunner.Run()の中から一つの信号が送られてきました
select {
case bfr.run
この信号は処理されています
s.Proxier.SyncLoop()
func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
proxier.syncRunner.Loop(wait.NeverStop)
}
runnerでは信号実行が受信され、受信されていない信号は定期的に実行されます.
func (bfr *BoundedFrequencyRunner) Loop(stop
このbfr runnerで私たちが最も考えなければならないのはコールバック関数で、tryRunでこのコールバックがスケジューリングされた条件を満たしているかどうかをチェックします.
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run,
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
}
// proxier.syncProxyRules
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
これは600行程度の揉み関数であり,主な論理を処理する場所でもある.
syncProxyRules
...(臭いし長いし、重複コードが多くて見られないし、細部の問題は自分で見ましょう)
serv := &utilipvs.VirtualServer{
Address: net.ParseIP(ingress.IP),
Port: uint16(svcInfo.port),
Protocol: string(svcInfo.protocol),
Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
}
}
実装を見て、存在しない場合は作成し、存在する場合は更新し、NICにserviceのcluster ipをバインドします.
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
if appliedVirtualServer == nil {
if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
return err
}
} else {
if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
return err
}
}
}
// bind service address to dummy interface even if service not changed,
// in case that service IP was removed by other processes
if bindAddr {
_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
if err != nil {
return err
}
}
return nil
}
サービスインプリメンテーションの作成
ipvsのAddVirtualServerの実装を見ることができます.主にsocketを利用してカーネルプロセスと通信しています.
pkg/util/ipvs/ipvs_linux.go
里runner構造体はこれらの方法を実現し、ここでdocker/libnetwork/ipvsライブラリを使用した.// runner implements Interface.
type runner struct {
exec utilexec.Interface
ipvsHandle *ipvs.Handle
}
// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {
ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
if err != nil {
glog.Errorf("IPVS interface can't be initialized, error: %v", err)
return nil
}
return &runner{
exec: exec,
ipvsHandle: ihandle,
}
}
Newの時に特殊なsocketを作成しました.ここでは私たちの普通のsocketのプログラミングと違いはありません.肝心なのはsyscallです.AF_NETLINKというパラメータは、カーネルプロセスとの通信を表します.
sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
if err != nil {
return nil, err
}
s := &NetlinkSocket{
fd: int32(fd),
}
s.lsa.Family = syscall.AF_NETLINK
if err := syscall.Bind(fd, &s.lsa); err != nil {
syscall.Close(fd)
return nil, err
}
return s, nil
}
サービスを作成し、dockerサービス形式に変換し、直接呼び出します.
// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
eSvc, err := toBackendService(vs)
if err != nil {
return err
}
return runner.ipvsHandle.NewService(eSvc)
}
そしてサービス情報をパッケージ化し、socketに書けばいいです.
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
req := newIPVSRequest(cmd)
req.Seq = atomic.AddUint32(&i.seq, 1)
if s == nil {
req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
} else {
req.AddData(fillService(s))
} // service
if d == nil {
if cmd == ipvsCmdGetDest {
req.Flags |= syscall.NLM_F_DUMP
}
} else {
req.AddData(fillDestinaton(d))
}
// service
res, err := execute(i.sock, req, 0)
if err != nil {
return [][]byte{}, err
}
return res, nil
}
コンストラクションリクエスト
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
return newGenlRequest(ipvsFamily, cmd)
}
リクエストの構築時に渡されるのはipvsプロトコルクラスタです
カーネルと通信するメッセージヘッダを構築します
func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
return &NetlinkRequest{
NlMsghdr: syscall.NlMsghdr{
Len: uint32(syscall.SizeofNlMsghdr),
Type: uint16(proto),
Flags: syscall.NLM_F_REQUEST | uint16(flags),
Seq: atomic.AddUint32(&nextSeqNr, 1),
},
}
}
メッセージにDataを追加します.このDataは配列です.2つの方法が必要です.
type NetlinkRequestData interface {
Len() int //
Serialize() []byte // , ,service
}
例えばheaderはこのようにシーケンス化されていて、一見呆然として、長い間考えてやっと理解しました:([unsafe.Sizeof(hdr)]byte)1つの*[]byteタイプ、長さは構造体の大きさ(unsafe.Pointer(hdr))構造体をbyteポインタタイプに変換して*を加えてその値を[:]byteに変換して返します
func (hdr *genlMsgHdr) Serialize() []byte {
return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}
サービス情報をカーネルに送信
普通のsocketが受信データを送信する
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
var (
err error
)
if err := s.Send(req); err != nil {
return nil, err
}
pid, err := s.GetPid()
if err != nil {
return nil, err
}
var res [][]byte
done:
for {
msgs, err := s.Receive()
if err != nil {
return nil, err
}
for _, m := range msgs {
if m.Header.Seq != req.Seq {
continue
}
if m.Header.Pid != pid {
return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
}
if m.Header.Type == syscall.NLMSG_DONE {
break done
}
if m.Header.Type == syscall.NLMSG_ERROR {
error := int32(native.Uint32(m.Data[0:4]))
if error == 0 {
break done
}
return nil, syscall.Errno(-error)
}
if resType != 0 && m.Header.Type != resType {
continue
}
res = append(res, m.Data)
if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
break done
}
}
}
return res, nil
}
サービスデータパッケージ
ここでは細かく、コア思想はカーネルが一定のフォーマットの標準データしか認識していないことであり、サービス情報をその標準に従ってカーネルにパッケージして送信すればよい.
どのように梱包するかについては詳しく説明しません.
func fillService(s *Service) nl.NetlinkRequestData {
cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
if s.FWMark != 0 {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
} else {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
// Port needs to be in network byte order.
portBuf := new(bytes.Buffer)
binary.Write(portBuf, binary.BigEndian, s.Port)
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
}
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
if s.PEName != "" {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
}
f := &ipvsFlags{
flags: s.Flags,
mask: 0xFFFFFFFF,
}
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
return cmdAttr
}
まとめ
サービスは全体的にコードが簡単ですが、実装が少し迂回しているところがあり、直接的ではないと思います.全体的にapiserverイベントを傍受し、処理よりも定期的に同期ポリシーを実行する.