linuxの下でzookeeper C APIを使用してzookeeperアプリケーションを開発する方法の紹介

17667 ワード

1、開発環境を構成してZooKeeperローカルライブラリを構築する最も簡単な方法はant構築ツールを使用することである.解凍したZooKeeperリリースパッケージのディレクトリにbuild.xmlというファイルがあります.
このファイルにはant構築に必要な構築手順が含まれています.また、Linuxオペレーティングシステムを使用する場合は、ホストで使用できるようにする必要があります.automake、autoconf、cppunitなどのツールも必要です.
必要なツールがすべてインストールされると、以下の方法でZooKeeperのライブラリを構築できます.
ant compile-native

構築が完了すると、build/c/build/usr/libにリンクライブラリファイルを見つけ、build/c/build/usr/include/zookeeeperに必要なヘッダファイルを見つけます.
2、ZooKeeperとセッションを開始する前に、まずzhandle_が必要です.tハンドル.zookeeperを呼び出すことでInit関数を使用してハンドルを取得します.関数の定義は次のとおりです.
ZOOAPI zhandle_t* zookeeper_init(const char* host, //  ZooKeeper             ,     host:port,         。
                                 watcher_fn fn,		//            
                                 int recv_timeout,	//      ,      
                                 const clientid_t* clientid,//              ID,         
                                 void* context,//   zkhandle_t           
                                 int flags);//         ,     0  
zookeeper_Init呼び出しは、実際にセッションの確立が完了する前に返される可能性があるため、ZOO_を受信した場合にのみ返されます.CONNECTED_STATEイベントの場合のみ、セッション確立が完了したと考えられます.このイベントは、ポイント関数の実装を監視することによって処理できます.この関数の定義は次のとおりです.
typedef void (* watcher_fn)(zhandle_t* zh, //        ZooKeeper  。
                            int type,	   //    :ZOO_CREATED_EVENT、ZOO_DELETED_EVENT、ZOO_CHANGED_EVENT、ZOO_CHILD_EVENT、ZOO_SESSION_EVENT。	
                            int state,		//    
                            const char* path,//         znode    ,         ,   null
                            void* watcherCtx);//         
は、監視ポイント関数の実装例です.
static int connected = 0;
static int expired = 0;
void main_watcher(zhandle_t* zkh,
                  int type,
                  int state,
                  const char* path,
                  void* context) {
  if (type == ZOO_SESSION_EVENT) {
    if (state == ZOO_CONNECTED_STATE) {
      connected = 1;//    ZOO_CONNECTED_STATE             
    } else if (state == ZOO_NOTCONNECTED_STATE) {
      connected = 0;
    } else if (state == ZOO_EXPIRED_SESSION_STATE) {
      expired = 1;//    ZOO_EXPIRED_SESSION_STATE          (       )
      connected = 0;
      zookeeper_close(zkh);
    }
  }
}
はすべての操作を直列に接続し、私たちのプライマリノードのinit関数は次のようになります.
static int server_id;
int init(char* hostPort) {
  srand(time(NULL));
  server_id = rand();//     ID
  zoo_set_debug_level(ZOO_LOG_LEVEL_INFO);//  log       
  zh = zookeeper_init(hostPort, //       
                      main_watcher,
                      15000,
                      0,
                      0,
                      0);
  return errno;
}
コードの最初の2行には、乱数を生成するシードとプライマリノードの識別子が設定されています.server_を使用します.idは異なるプライマリノードを識別します.その後、ログメッセージの出力レベルを設定します.
最後にzookeeperを呼び出しましたInit関数の初期化
3、マスターノードを起動する
ブート(Bootstraping)プライマリノードとは、プライマリ・スレーブ・モードの例で使用されるいくつかのznodeノードを作成し、プライマリ・プライマリ・ノードを選挙するプロセスです.まず、必要なznodeノードを4つ作成します.
void bootstrap() {
  if (!connected) {//      ,       。
    LOG_WARN(("Client not connected to ZooKeeper"));
    return;
  }
  create_parent("/workers", "");//       :/workers、/assign、/tasks、/status
  create_parent("/assign", "");
  create_parent("/tasks", "");
  create_parent("/status", "");
  ...
}
以下はcreate_parent関数:
void create_parent(const char* path,
                   const char* value) {
               zoo_acreate(zh,//      znode   ,     zhandle_t     ,      ,          
              path,//    path     const char*  ,path          znode          ,
              value,//             znode       。
              0,//          (     )    ,   ,   0。
              &ZOO_OPEN_ACL_UNSAFE,//   ,      ACL    ,        unsafe  。
              0,//  znode            ,              。
              create_parent_completion,//        ,            ,ZooKeeper                 。
              NULL);//            ,   ,            。
}
このメソッドは非同期呼び出しのため、ZooKeeperクライアントが操作要求の完了時にこの関数を呼び出す完了関数を入力する必要があります.以下は完了関数の定義です.
typedef void
(*string_completion_t)(int rc,//rc    ,              
const char *value,//value       
const void *data);//data                 ,  ,                           

この具体的な例では、次のように実現します.
void create_parent_completion(int rc, const char* value, const void* data) {
  switch (rc) {//              
    case ZCONNECTIONLOSS:
      create_parent(value, (const char*)data);//            。
      break;
    case ZOK:
      LOG_INFO(("Created parent node", value));
      break;
    case ZNODEEXISTS:
      LOG_WARN(("Node already exists"));
      break;
    default:
      LOG_ERROR(("Something went wrong when running for master"));
      break;
  }
}
多くの完了関数には、現在のシステムが何をしているかを示す簡単なログ記録機能が含まれています.
一般に,完成関数はより複雑になり,上記の例で示したように,各完成関数の方法における機能を分離することが望ましい.
次のタスクは、プライマリ・ノードの役割をロックするために/masterノードを作成しようとするプライマリ・ノードです.非同期呼び出しcreateメソッドは、以前に議論したものとは異なり、コードは次のとおりです.
void run_for_master() {
  if (!connected) {
    LOG_WARN(LOGCALLBACK(zh),
             "Client not connected to ZooKeeper");
    return;
  }
  char server_id_string[9];
  snprintf(server_id_string, 9, "%x", server_id);
  zoo_acreate(zh,
              "/master",
              (const char*)server_id_string, // /master             。
              sizeof(int),//             ,   ,       ,     int    。
              &ZOO_OPEN_ACL_UNSAFE,
              ZOO_EPHEMERAL,// znode        ,                。
              master_create_completion,
              NULL);
}

完了関数は、これまでのバージョンよりも複雑になります.
void master_create_completion(int rc, const char* value, const void* data) {
  switch (rc) {
    case ZCONNECTIONLOSS:
      check_master();//     ,      znode          ,             。
      break;
    case ZOK:
      take_leadership();//           ,         
      break;
    case ZNODEEXISTS:
      master_exists();//     znode    (              ),          ,               。
 
      break;
    default:
      LOG_ERROR(LOGCALLBACK(zh),
                "Something went wrong when running for master.");
      break;
  }
}
プライマリノードプロセスが/masterノードがすでに存在することを発見した場合、zoo_awexistsメソッドで監視ポイントを設定します.
void master_exists() {
  zoo_awexists(zh,
               "/master",
               master_exists_watcher,//  /master      。
               NULL,
               master_exists_completion,// exists          。
               NULL);
}
は、この方法の呼び出しでコンテキストオブジェクトを入力して、監視ポイントオブジェクトに送信して使用することができることに注意するが、この例では使用しない.
znodeノードが削除されると、監視ポイント関数が通知メッセージを処理する実装のための通知メッセージが受信されます.
void master_exists_watcher(zhandle_t* zh,
                           int type,
                           int state,
                           const char* path,
                           void* watcherCtx) {  
                            if (type == ZOO_DELETED_EVENT) {
                                assert(!strcmp(path, "/master"));
                                run_for_master(); //  /master     ,         。
                            } else {
                                LOG_DEBUG(LOGCALLBACK(zh),
                                         "Watched event:", type2string(type));
                            }
                    }
また私たちの前のmasterに戻ります.exists関数では、私たちが実現した完了関数は簡単で、私たちがこれまで採用してきたモードでもあります.詳細には、/masterノードの作成操作とexistsリクエストの実行の間に、/masterノードが削除された可能性があることに注意する必要があります.
たとえば、前のプライマリ・ノード・プロセスが終了した場合など、完了関数では、znodeノードが存在するかどうかを再確認する必要があります.存在しない場合、クライアント・プロセスはプライマリ・ノード・プロセスを再選挙する必要があります.
void master_exists_completion(int rc,
                              const struct Stat* stat,
                              const void* data) {
  switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
      master_exists();
      break;
    case ZOK:
      if (stat == NULL) {//       stat   null    znode      。
        LOG_INFO(LOGCALLBACK(zh),
                 "Previous master is gone, running for master");
        run_for_master();//       ,            。
      }
      break;
    default:
      LOG_WARN(LOGCALLBACK(zh),
               "Something went wrong when executing exists: ",
               rc2string(rc));
      break;
  }
}
プライマリ・ノード・プロセスがプライマリ・プライマリ・ノードになると、管理権の行使を開始できます.次の節で説明します.
4、管理権の行使
プライマリ・ノード・プロセスがプライマリ・プライマリ・ノードとして選択されると、ロールの実行を開始できます.まず、ノードから使用可能なすべてのリスト情報を取得します.
void take_leadership() {
  get_workers();
} void get_workers() {
  zoo_awget_children(zh,
                     "/workers",
                     workers_watcher,//                    。
                     NULL,
                     workers_completion,//                。
                     NULL);
}
実装では、前回読み込んだノードリスト情報をキャッシュし、新しいリストを再び読み込むと、古いリストを置き換えます.これらの操作はzoo_awget_childrenが指定した完了関数で完了します.
void workers_completion(int rc,
                        const struct String_vector* strings,
                        const void* data) {
  switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
      get_workers();
      break;
    case ZOK:
      struct String_vector* tmp_workers =
          removed_and_set(strings, &workers);//       。
      free_vector(tmp_workers);//      ,            ,            ,          
      get_tasks();//               。
      break;
    default:
      LOG_ERROR(LOGCALLBACK(zh),
                "Something went wrong when checking workers: %s",
                rc2string(rc));
      break;
  }
}
タスク情報を取得するには、サービス側プロセスが/tasksのすべてのサブノードを取得し、前回の読み取り後に新しく導入されたタスクリスト情報を取得する必要があるため、読み取りのたびに取得したリスト情報を区別する必要があります.そうしないと、タスクが2回割り当てられる可能性があります.
void get_tasks() {
  zoo_awget_children(zh,
                     "/tasks",
                     tasks_watcher,
                     NULL,
                     tasks_completion,
                     NULL);
} void tasks_watcher(
    zhandle_t
    *
    zh,
    int type,
    int state,
    const char* path,
    void* watcherCtx) {
  if (type == ZOO_CHILD_EVENT) {
    assert(!strcmp(path, "/tasks"));
    get_tasks();//          ,          。
  } else {
    LOG_INFO(LOGCALLBACK(zh),
             "Watched event: ",
             type2string(type));
  }
} void tasks_completion(
    int rc,
    const struct String_vector* strings,
    const void* data) {
  switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
      get_tasks();
      break;
    case ZOK:
      LOG_DEBUG(LOGCALLBACK(zh), "Assigning tasks");
      struct String_vector* tmp_tasks = added_and_set(strings, &tasks);
      assign_tasks(tmp_tasks);//                。
      free_vector(tmp_tasks);
      break;
    default:
      LOG_ERROR(LOGCALLBACK(zh),
                "Something went wrong when checking tasks: %s",
                rc2string(rc));
      break;
  }
}

5、タスク割当タスク割当は、タスク情報の読み取りを含み、ノードから選択し、ノードタスクリストにznodeノードを追加することによってタスクを割り当て、最後に/tasksのサブノードから削除する.
次のコードは、これらの基本的な操作プロセスを実現し、タスク情報の取得、タスクの割り当て、タスクの削除などの操作は非同期関数で実現され、必要な完了関数を提供します.コードは次のとおりです.
void assign_tasks(const struct String_vector* strings) {
  int i;
  for (i = 0; i < strings->count; i++) {
    get_task_data(strings->data[i]);//      ,          。
  }
}
void get_task_data(const char*
                   task){
  if (task == NULL) return;
  char* tmp_task = strndup(task, 15);
  char* path = make_path(2, "/tasks/", tmp_task);
  zoo_aget(zh,
           path,
           0,
           get_task_data_completion,
           (const void*)tmp_task);//             。
  free(path);
}
struct task_info {	//               。
    char *value;
    int value_len;
    char *worker;
};

void get_task_data_completion(int rc, const char *value, int value_len,
                              const struct Stat *stat, const void *data){
    int worker_index;

    switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
        get_task_data((const char* )data);
        break;
    case ZOK:
        if (workers != NULL) {
            worker_index = (rand() % workers->count);	//         ,           。
            struct task_info *new_task;		//      task_info               。
            new_task = ( struct task_info *)malloc(sizeof(struct task_info));

            new_task->name = (char*)data;
            new_task->value = strndup(value, value_len);
            new_task->value_len = value_len;
            
            const char * worker_string  = workers->data[worker_index];
            new_task->worker = strdup(worker_string);
            task_assignment(new_task);	//       ,           。
        }
        break;

    default:
        LOG_ERROR(LOCALLBACK(zh),
                    "something went wrong when checking the master lock:%s", rc2string(rc));
        break;
    }
}

これまで、上記のコードはタスク情報の読み取りとノードからの選択を完了していました.次に、タスクの割り当てを表すznodeノードを作成する必要があります.
void task_assignment(struct task_info* task) {
  char* path = make_path(4, "/assign/", task->worker, "/", task->name);
  zoo_acreate(zh,
              path,
              task->value,
              task->value_len,
              &ZOO_OPEN_ACL_UNSAFE,
              0,
              task_assignment_completion,
              (const void*)task);	//         znode  。
  free(path);
} void task_assignment_completion(
    int rc, const char*
    value, const void*
    data) {
  switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
      task_assignment((struct task_info*)data);
      break;
    case ZOK:
      if (data != NULL) {
        char* del_path = "";
        del_path = make_path(2, "/tasks/",
                             ((struct task_info*)data)->name);
        if (del_path != NULL) {
          delete_pending_task(del_path);	//        ,                    。
        }
        free(del_path);
        free_task_info((struct task_info*)data);	//   task_info             ,              。
      }
      break;
    case ZNODEEXISTS:
      LOG_DEBUG(LOGCALLBACK(zh),
                "Assignment has alreasy been created: %s",
                value);
      break;
    default:
      LOG_ERROR(LOGCALLBACK(zh),
                "Something went wrong when checking the master lock: %s",
                rc2string(rc));
      break;
  }
}
最後のステップは、/tasksの下のタスクノードを削除するために、/tasksノードの下に割り当てられていないタスクしかないようにする.
void delete_pending_task(const char* path) {
  if (path == NULL) return;
  char* tmp_path = strdup(path);
  zoo_adelete(zh,
              tmp_path,
              -1,
              delete_task_completion,
              (const void*)tmp_path);//        。
} void delete_task_completion(int rc, const void*
                              data) {
  switch (rc) {
    case ZCONNECTIONLOSS:
    case ZOPERATIONTIMEOUT:
      delete_pending_task((const char*)data);
      break;
    case ZOK:
      free((char*)data);	//          ,          ,                              。
      break;
    default:
      LOG_ERROR(LOGCALLBACK(zh),
                "Something went wrong when deleting task: %s",
                rc2string(rc));
      break;
  }
}

6、シングルスレッドとマルチスレッドクライアントZooKeeperのリリースパッケージでは、C言語コンポーネントに対して2つのオプション、マルチスレッドとシングルスレッドバージョンが提供されています.単一スレッドバージョンは、いくつかの履歴的な理由にすぎないため、マルチスレッドバージョンを使用することをお勧めします.
int initialized = 0;
int run = 0;
fd_set rfds, wfds, efds;
FD_ZERO(& rfds);
FD_ZERO(& wfds);
FD_ZERO(& efds);
while (!is_expired()) {
  int fd;
  int interest;
  int events;
  struct timeval tv;
  int rc;
  zookeeper_interest(zh, &fd, &interest, &tv);//              
  if (fd != -1) {
    if (interest & ZOOKEEPER_READ) {//  ZOOKEEPER_READ           
      FD_SET(fd, &rfds);
    } else {
      FD_CLR(fd, &rfds);
    }
    if (interest & ZOOKEEPER_WRITE) {//  ZOOKEEPER_WRITE           
      FD_SET(fd, &wfds);
    } else {
      FD_CLR(fd, &wfds);
    }
  } else {
    fd = 0;
  }
/*
* Master call to get a ZooKeeper handle.
*/
  if (!initialized) {
    if (init(argv[1])) {//       ,     ZooKeeper  
      LOG_ERROR(("Error while initializing the master: ", errno));
    }
    initialized = 1;
  }
/*
* The next if block contains
* calls to bootstrap the master
* and run for master. We only
* get into it when the client
* has established a session and
* is_connected is true.
*/
  if (is_connected() && !run) {//         ZooKeeper    ,    ZOO_CONNECTED_EVENT  ,          ,          
    LOG_INFO(("Connected, going to bootstrap and run for master"));
/*
* Create parent znodes
*/
    bootstrap();
/*
* Run for master
*/
    run_for_master();
    run = 1;
  }
  rc = select(fd+1, &rfds, &wfds, &efds, &tv);//  select        
  events = 0;
  if (rc > 0) {
    if (FD_ISSET(fd, &rfds)) {
      events |= ZOOKEEPER_READ;//       fd      
    }
    if (FD_ISSET(fd, &wfds)) {
      events |= ZOOKEEPER_WRITE;//       fd      
    }
  }
  zookeeper_process(zh, events);//    ZooKeeper  ,             
}
このイベントループは、コールバックやセッションイベントなど、関連するZooKeeperイベントに注目します.
マルチスレッドバージョンライブラリを使用する場合、クライアントアプリケーションをコンパイルするには-l zookeeper_を使用する必要があります.mtは、−DTHREADEDによってTHREADEDマクロオプションを定義する.
コードでは、マルチスレッドライブラリバージョンのプログラムをコンパイルする際に、THREADEDマクロ定義によって、呼び出し実行時に使用するコードフラグメントが示されます.
シングルスレッドバージョンライブラリを使用する場合は-l zookeeper_を使用します.stはプログラムをコンパイルするとともに、-DTHREADEDオプションを指定することはできません.ZooKeeperのリリースパッケージでは、手順指示をコンパイルして対応するライブラリを使用できます.
参考:「Zookeeer分布式過程協同技術詳細」機械工業出版社