apphe zookeeper使用方法の例を詳しく説明する。

17789 ワード

本論文はApache Zookeeperの使用方法の実例について詳しく説明した知識に関連しています。次に具体的な内容を見てみます。
概要
Apache ZookeeperはApache HadoopのZookeeperサブプロジェクトから発展してきました。今はApacheのトッププロジェクトになりました。Zookeeperは分散システムに対して効率的で信頼性の高い、使いやすい協同サービスを提供しており、分散アプリケーションに対して、同一の名前付きサービス、構成管理、状態同期、グループサービスなど、かなり多くのサービスを提供することができる。Zookeeperインターフェースは簡単で、開発者は分散システムプログラミングの難しい同期と一致性にあまりこだわる必要はなく、Zookeeperが提供する既製のサービスを使って分散システムの配置管理、グループ管理、Leader選択などの機能を実現できます。
英語の住所:http://zookeeper.apache.org/doc/current/javaExample.html
簡単なZookeeper Watchクライアント
Zookeeper Java APIの基本的な使い方を紹介するために、ここではどのように一歩一歩ずつ機能を実現するかを紹介します。  Zookeeperクライアント。このZookeeperクライアントは、監視されているノードが変化すると、クライアントが起動したり停止したりするZookeeperノードZnodeを指定します。
基本的な要求
このクライアントは4つの基本的な要求を備えています。
(1)クライアントが持つパラメータ:
(2)Zookeeperサービスアドレス。
(3)監視されているZnodeノード名。
(4)実行可能プログラムとそのバンドのパラメータ
クライアントは、監視されているZnodeノードのデータを取得し、指定された実行可能プログラムを起動します。監視されているZnodeノードが変更された場合、クライアントはそのコンテンツを再取得し、指定された実行可能プログラムを再起動する。監視されているZnodeノードが消えたら、クライアントは実行可能なプログラムを殺します。
プログラミング
一般的には、Zookeeperアプリケーションは、サーバー端との接続を維持する部分と、Znodeノードのデータを監視する部分とに分けられている。本プログラムでは、Exectorクラスは、Zookeeper接続を維持し、Data Monitorクラスは、Zookeeperディレクトリツリー内のデータを監視します。同時に、Exectorは、メインスレッドとプログラムの主な実行ロジックを含みます。少量のユーザー間のインタラクションと実行可能なプログラムとのインタラクションを行い、このプログラムは、あなたが着信するパラメータを受け入れることができます。また、監視されているZnodeノードの状態変化に応じて停止または再開されます。
Exectorクラス
Exectorオブジェクトは、本ルーチンの最も基本的な「コンテナ」であり、ZookeeperオブジェクトとDataMonitorオブジェクトを含む。

public static void main(String[] args) {
    if (args.length < 4) {
      System.err
          .println("USAGE: Executor hostPort znode filename program [args ...]");
      System.exit(2);
    }
    String hostPort = args[0];
    String znode = args[1];
    String filename = args[2];
    String exec[] = new String[args.length - 3];
    System.arraycopy(args, 3, exec, 0, exec.length);
    try {
      new Executor(hostPort, znode, filename, exec).run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  public Executor(String hostPort, String znode, String filename,
      String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
  }
  public void run() {
    try {
      synchronized (this) {
        while (!dm.dead) {
          wait();
        }
      }
    } catch (InterruptedException e) {
    }
  }
Exectorのタスクは、ZookeeperにおけるZnodeノードの状態変化によってトリガされたイベントを起動し、停止します。コマンドラインで指定された実行可能プログラムは、上のコードで見られます。Exector類は、その構造関数でZookeeperオブジェクトを実例化したときに、自身の参照をWatchパラメータとしてZookeeperの構造関数に伝達します。同時にそれ自身の引用もData Monitor ListenerパラメータとしてData Monitorの構造関数に伝達されます。Exector自身は以下のインターフェースを実現した。

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {
...
WatchインターフェースはZoo Keeper Java APIで定義されている。Zoo Keeperはこれを用いて「容器」と通信しています。Watchは一つの方法しかサポートしていません。すなわちプロcess()はZookeeperがこの関数を使ってメインスレッドに関心があるイベントを処理しています。例えばZookeeper接続や会話の状態です。この例では「容器」Exectorはイベントを簡単に下にDanitatorに伝えます。具体的にどのように事件を処理するかはData Monitorによって決められます。本論文では、Watchの使用方法を簡単に説明しただけで、通常はExectorまたはExectorと同様のオブジェクトがZookeeperサーバとの接続を持っているが、イベントは他のオブジェクトに伝達され、他のオブジェクトがこのイベントを処理することができる。

public void process(WatchedEvent event) {
    dm.process(event);
  }
Datamonitoristenerインターフェース自体はZookeeper APIの一部ではなく、完全にカスタムインターフェースであり、このプログラムのために設計されたものと言える。DataMonitorオブジェクトは、このインターフェースと「コンテナ」(Exectorクラス)を使って通信します。Data Monitor Listenerインターフェースは以下の通りです。

public interface DataMonitorListener {
  /**
  * The existence status of the node has changed.
  */
  void exists(byte data[]);
  /**
  * The ZooKeeper session is no longer valid.
  * 
  * @param rc
  * the ZooKeeper reason code
  */
  void closing(int rc);
}
このインターフェースはData Monitorで定義されており、Exector類はこのインターフェースを実現し、Exector.existsが呼び出された時に、Exectorは事前に指定されたアプリケーションを起動または停止するかどうかを決定する。
Exector.closingが呼び出されると、ExectorはZookeeper接続によって永久的に消失し、自分を閉じるかどうかを決定します。
Data MonitorのオブジェクトがZookeeperの状態によって変化するということはもう推測しましたか?
以下はExectorクラスでData Monitor Listener.exists()とData Monitor Listener.closing()を実現するコードです。

public void exists( byte[] data ) {
  if (data == null) {
    if (child != null) {
      System.out.println("Killing process");
      child.destroy();
      try {
        child.waitFor();
      } catch (InterruptedException e) {
      }
    }
    child = null;
  } else {
    if (child != null) {
      System.out.println("Stopping child");
      child.destroy();
      try {
        child.waitFor();
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
    }
    try {
      FileOutputStream fos = new FileOutputStream(filename);
      fos.write(data);
      fos.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
    try {
      System.out.println("Starting child");
      child = Runtime.getRuntime().exec(exec);
      new StreamWriter(child.getInputStream(), System.out);
      new StreamWriter(child.getErrorStream(), System.err);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
public void closing(int rc) {
  synchronized (this) {
    notifyAll();
  }
}
Data Monitor類
Data Monitorクラスは本プログラムのZookeeperロジックの中核であり、ほぼ非同期であり、イベントによって駆動される。Data Monitor構造関数は以下の通りである。

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
    DataMonitorListener listener) {
  this.zk = zk;
  this.znode = znode;
  this.chainedWatcher = chainedWatcher;
  this.listener = listener;
  // Get things started by checking if the node exists. We are going
  // to be completely event driven
  zk.exists(znode, true, this, null);
}
ZooKeeper.exists()を呼び出し、指定されたZnodeが存在するかどうかを確認し、監視を設置し、自身の引用を伝達することがコールバックの対象となり、ある意味、ウォッチトリガ時に真実の処理フローを引き起こします。
Zoo Keeper.exists()動作がサーバで終了すると、Zoo Keeper APIはクライアントでcomplection calbackを呼び出す。

public void processResult(int rc, String path, Object ctx, Stat stat) {
  boolean exists;
  switch (rc) {
  case Code.Ok:
    exists = true;
    break;
  case Code.NoNode:
    exists = false;
    break;
  case Code.SessionExpired:
  case Code.NoAuth:
    dead = true;
    listener.closing(rc);
    return;
  default:
    // Retry errors
    zk.exists(znode, true, this, null);
    return;
  }
  byte b[] = null;
  if (exists) {
    try {
      b = zk.getData(znode, false, null);
    } catch (KeeperException e) {
      // We don't need to worry about recovering now. The watch
      // callbacks will kick off any exception handling
      e.printStackTrace();
    } catch (InterruptedException e) {
      return;
    }
  }   
  if ((b == null && b != prevData)
      || (b != null && !Arrays.equals(prevData, b))) {
    listener.exists(b);
    prevData = b;
  }
}
上記のコードはまず、Znodeが存在するかどうか、および他の重大な回復不能なエラーをチェックします。ファイル(またはZnode)が存在する場合、Znodeからデータを取得し、状態が変化したらExectorのexists()コールバック関数を呼び出します。注意して、ゲットData関数は本省で何かの異常処理をしなければなりません。それ自体は監視があります。ZooKeeper.getData()を呼び出す前に削除されたら、ZooKeeper.exists()はコールバック関数をトリガします。通信エラーがある場合、接続上の監視は接続再構築前に該当イベントをトリガします。同時に対応する処理を誘発する。
最後に、Data Monitorが監視イベントを処理するコードは以下の通りです。

public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
      // We are are being told that the state of the
      // connection has changed
      switch (event.getState()) {
      case SyncConnected:
        // In this particular example we don't need to do anything
        // here - watches are automatically re-registered with 
        // server and any watches triggered while the client was 
        // disconnected will be delivered (in order of course)
        break;
      case Expired:
        // It's all over
        dead = true;
        listener.closing(KeeperException.Code.SessionExpired);
        break;
      }
    } else {
      if (path != null && path.equals(znode)) {
        // Something has changed on the node, let's find out
        zk.exists(znode, true, this, null);
      }
    }
    if (chainedWatcher != null) {
      chainedWatcher.process(event);
    }
  }
クライアントZookeeperプログラムがセッション失効時に通信チャネルを再確立したら、すべてのセッション監視は自動的にサーバと再接続されます。Data MonitorがZnodeを指定するイベントを獲得したら、ZooKeeper.exists()を呼び出して、何が起こったのかを決定します。
完全なプログラム:
Exector.java:

/**
 * A simple example program to use DataMonitor to start and
 * stop executables based on a znode. The program watches the
 * specified znode and saves the data that corresponds to the
 * znode in the filesystem. It also starts the specified program
 * with the specified arguments when the znode exists and kills
 * the program if the znode goes away.
 */
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
public class Executor
  implements Watcher, Runnable, DataMonitor.DataMonitorListener
{
  String znode;
  DataMonitor dm;
  ZooKeeper zk;
  String filename;
  String exec[];
  Process child;
  public Executor(String hostPort, String znode, String filename,
      String exec[]) throws KeeperException, IOException {
    this.filename = filename;
    this.exec = exec;
    zk = new ZooKeeper(hostPort, 3000, this);
    dm = new DataMonitor(zk, znode, null, this);
  }
  /**
   * @param args
   */
  public static void main(String[] args) {
    if (args.length < 4) {
      System.err
          .println("USAGE: Executor hostPort znode filename program [args ...]");
      System.exit(2);
    }
    String hostPort = args[0];
    String znode = args[1];
    String filename = args[2];
    String exec[] = new String[args.length - 3];
    System.arraycopy(args, 3, exec, 0, exec.length);
    try {
      new Executor(hostPort, znode, filename, exec).run();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
  /***************************************************************************
   * We do process any events ourselves, we just need to forward them on.
   *
   * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent)
   */
  public void process(WatchedEvent event) {
    dm.process(event);
  }
  public void run() {
    try {
      synchronized (this) {
        while (!dm.dead) {
          wait();
        }
      }
    } catch (InterruptedException e) {
    }
  }
  public void closing(int rc) {
    synchronized (this) {
      notifyAll();
    }
  }
  static class StreamWriter extends Thread {
    OutputStream os;
    InputStream is;
    StreamWriter(InputStream is, OutputStream os) {
      this.is = is;
      this.os = os;
      start();
    }
    public void run() {
      byte b[] = new byte[80];
      int rc;
      try {
        while ((rc = is.read(b)) > 0) {
          os.write(b, 0, rc);
        }
      } catch (IOException e) {
      }
    }
  }
  public void exists(byte[] data) {
    if (data == null) {
      if (child != null) {
        System.out.println("Killing process");
        child.destroy();
        try {
          child.waitFor();
        } catch (InterruptedException e) {
        }
      }
      child = null;
    } else {
      if (child != null) {
        System.out.println("Stopping child");
        child.destroy();
        try {
          child.waitFor();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
      try {
        FileOutputStream fos = new FileOutputStream(filename);
        fos.write(data);
        fos.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
      try {
        System.out.println("Starting child");
        child = Runtime.getRuntime().exec(exec);
        new StreamWriter(child.getInputStream(), System.out);
        new StreamWriter(child.getErrorStream(), System.err);
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }
}
Data Monitor.java:

/**
 * A simple class that monitors the data and existence of a ZooKeeper
 * node. It uses asynchronous ZooKeeper APIs.
 */
import java.util.Arrays;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Watcher, StatCallback {
  ZooKeeper zk;
  String znode;
  Watcher chainedWatcher;
  boolean dead;
  DataMonitorListener listener;
  byte prevData[];
  public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,
      DataMonitorListener listener) {
    this.zk = zk;
    this.znode = znode;
    this.chainedWatcher = chainedWatcher;
    this.listener = listener;
    // Get things started by checking if the node exists. We are going
    // to be completely event driven
    zk.exists(znode, true, this, null);
  }
  /**
   * Other classes use the DataMonitor by implementing this method
   */
  public interface DataMonitorListener {
    /**
     * The existence status of the node has changed.
     */
    void exists(byte data[]);
    /**
     * The ZooKeeper session is no longer valid.
     *
     * @param rc
     *        the ZooKeeper reason code
     */
    void closing(int rc);
  }
  public void process(WatchedEvent event) {
    String path = event.getPath();
    if (event.getType() == Event.EventType.None) {
      // We are are being told that the state of the
      // connection has changed
      switch (event.getState()) {
      case SyncConnected:
        // In this particular example we don't need to do anything
        // here - watches are automatically re-registered with 
        // server and any watches triggered while the client was 
        // disconnected will be delivered (in order of course)
        break;
      case Expired:
        // It's all over
        dead = true;
        listener.closing(KeeperException.Code.SessionExpired);
        break;
      }
    } else {
      if (path != null && path.equals(znode)) {
        // Something has changed on the node, let's find out
        zk.exists(znode, true, this, null);
      }
    }
    if (chainedWatcher != null) {
      chainedWatcher.process(event);
    }
  }
  public void processResult(int rc, String path, Object ctx, Stat stat) {
    boolean exists;
    switch (rc) {
    case Code.Ok:
      exists = true;
      break;
    case Code.NoNode:
      exists = false;
      break;
    case Code.SessionExpired:
    case Code.NoAuth:
      dead = true;
      listener.closing(rc);
      return;
    default:
      // Retry errors
      zk.exists(znode, true, this, null);
      return;
    }
    byte b[] = null;
    if (exists) {
      try {
        b = zk.getData(znode, false, null);
      } catch (KeeperException e) {
        // We don't need to worry about recovering now. The watch
        // callbacks will kick off any exception handling
        e.printStackTrace();
      } catch (InterruptedException e) {
        return;
      }
    }
    if ((b == null && b != prevData)
        || (b != null && !Arrays.equals(prevData, b))) {
      listener.exists(b);
      prevData = b;
    }
  }
}
締め括りをつける
本論文では、Apache Zookeeperの使い方の実例について詳しく紹介します。もし何か問題があったら、メッセージを残してください。編集者はすぐに皆さんに返事します。