雑談etcd(三)etcdの使用


せつぞく
client
type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance

    // Username is a user name for authentication.
    Username string
    // Password is a password for authentication.
    Password string
}
  • Cluster:クラスタにetcdサービス・エンド・ノードを追加するなど、管理者操作に属します.
  • KV:主にK-Vを操作する機能を使用しています.
  • Lease:TTL=10秒のリースを申請するなど、リース関連操作.
  • Watcher:サブスクリプションを観察し、最新のデータの変化を監視します.
  • Auth:etcdのユーザーと権限を管理し、管理者の操作に属します.
  • Maintenance:etcdをアクティブに移行するリーダーノードなど、etcdを維持し、管理者操作に属します.

  • インスタンス化client
    import (
    	"fmt"
    	"go.etcd.io/etcd/clientv3"
    	"time"
    )
    
    //  
    func main() {
    	//     
    	config := clientv3.Config{
    		Endpoints:   []string{"192.168.1.109:2379"},
    		DialTimeout: 5 * time.Second,
    	}
    
    	//    
    	if client, err := clientv3.New(config); err != nil {
    		fmt.Println(err)
    		return
    	}
     
       fmt.Println("connect success")
    	defer client.Close()
    }
  • Endpoints:etcdの複数のノードサービスアドレス.
  • DialTimeout:clientの最初の接続タイムアウトを作成します.ここでは5秒転送され、5秒も接続に成功しなければerrに戻ります.クライアントの作成に成功すると、後続の下位接続の状態に関心を持つ必要がなくなり、クライアント内部が再接続されます.

  • k-vアクセス
    put
       //    
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
       // -  
    	_, err = client.Put(ctx, "/demo/demo1_key", "demo1_value")
    	//    ,cancel 
    	cancel()
    	if err != nil {
    		fmt.Println("put failed, err:", err)
    		return
    	}
    
    Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
    
  • ctx:Contextパッケージオブジェクト、タイムアウト制御
  • などのコンテキストを追跡するための列
  • key:オブジェクトを格納するkey
  • val:オブジェクトを格納するvalue
  • opts:可変パラメータ、追加オプション
  • get
    ctx, cancel = context.WithTimeout(context.Background(), time.Second)
    	resp, err := client.Get(ctx, "/demo/demo1_key")
       // Get       WithPrefix  ,             
       //eg: resp, err := client.Get(ctx, "/demo/", clientv3.WithPrefix())
    	cancel()
    	if err != nil {
    		fmt.Println("get failed err:", err)
    		return
    	}
    
    	for _, item := range resp.Kvs { //Kvs   key   
    		fmt.Printf("%s : %s 
    ", item.Key, item.Value) }

    delete
    ctx, _ = context.WithTimeout(context.Background(), time.Second)
       resp, err := client.Delete(ctx, "/demo/demo1_key")
       if err != nil {
           fmt.Println(err)
       }
       fmt.Println(resp.PrevKvs)

    ウォッチ監視
    WatchメソッドはWatchChanのような変数を返します.WatchChanはchannelです.
    import (
    	"context"
    	"fmt"
    	"go.etcd.io/etcd/clientv3"
    	"time"
    )
    
    func main() {
    	client, err := clientv3.New(clientv3.Config{
    		Endpoints:   []string{"192.168.1.109:2379"},
    		DialTimeout: time.Second,
    	})
    	if err != nil {
    		fmt.Println("connect failed err : ", err)
    		return
    	}
    	defer client.Close()
    
    	client.Put(context.Background(), "/demo/demo2_key", "demo2_value")
    	go func() {
    		//watch
    		watchKey := client.Watch(context.Background(), "/demo/demo2_key")
    		for resp := range watchKey {
    			for _, item := range resp {
    				fmt.Printf("%s %q : %q 
    ", item.Type, item.Kv.key, item.Kv.Value) } } } if resp, err := client.Put(context.TODO(), "/demo/demo2_key/", "demo2_watch"); err != nil { fmt.Println(err) } else { fmt.Println(resp) } }

    Transactionトランザクション
    import (
    	"context"
    	"fmt"
    	"go.etcd.io/etcd/clientv3"
    	"log"
    	"sync"
    	"time"
    )
    
    func main() {
    	client, err := clientv3.New(clientv3.Config{
    		Endpoints:   5 * time.Second,
    		DialTimeout: 3 * time.Second,
    	})
    	if err != nil {
    		fmt.Println("connect failed err: ", err)
    		return
    	}
    	client.Close()
    
    	var w sync.WaitGroup
    	w.Add(10)
    	key10 := "setnx"
    	for i := 0; i < 10; i++ {
    		go func(i int) {
    			time.Sleep(5 * time.Millisecond)
    			//  key Create_Revision     0    key    。  If,Then    Else            。
    			//         successed  ,   true     If   
    			_, err := client.Txn(context.Background()).
    				If(clientv3.Compare(clientv3.CreateRevision(key10), "=", 0)).
    				Then(clientv3.OpPut(key10, fmt.Sprintf("%d", i))).
    				Commit()
    			if err != nil {
    				fmt.Println(err)
    			}
    
    			w.Done()
    		}(i)
    	}
    	w.Wait()
    
    	if resp, err := client.Get(context.TODO(), key10); err != nil {
    		log.Fatal(err)
    	} else {
    		log.Println(resp)
    	}
    }

    leaseリース
  • Grant:リースを割り当てます.
  • Revoke:リースをリリースします.
  • TimeToLive:残りのTTL時間を取得します.
  • Leases:etcdのすべてのリースを列挙します.
  • KeepAlive:自動タイミングの継続あるリース.
  • KeepAliveOnce:あるリースの契約を1回継続します.
  • Close:現在のクライアントによって確立されたすべてのリースを閉じます.
  • import (
    	"time"
    )
    
    var (
    	dialTimeout    = 2 * time.Second
    	requestTimeout = 10 * time.Second
    )
    
    func main() {
    	ctx, _ := context.WithTimeout(context.Background(), requestTimeout)
    	client, err := clientv3.New(clientv3.Config{
    		DialTimeout: dialTimeout,
    		Endpoints:   []string{"localhost:2379"},
    	})
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	kv := clientv3.NewKv(client)
    
    	lease, err := client.Grant(ctx, 3)
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	//Insert key with a lease of 3 second TTL
    	kv.Put(ctx, "/demo/demo1_key", "demo1_value", clientv3.WithLease(lease.ID))
    
    	gr, _ = kv.Get(ctx, "/demo/demo1_key")
    	if len(gr.Kvs) == 1 {
    		fmt.Println("Found key")
    	}
    
    	//let the TTL expire
    	time.Sleep(3 * time.Second)
    
    	gr, _ = kv.Get(ctx, "/demo/demo1_key")
    	if len(gr.Kvs) == 0 {
    		fmt.Println("no more key")
    	}
    }