ZooKeeperアプリケーションの構成サーバの構築
3731 ワード
構成情報の集中管理と共有は、分散アプリケーションの基本的なニーズです.このようなニーズに対して、ZooKeeperは2つのレベルの特性を提供することができる.1つは、分散アプリケーションの参加者がZooKeeperの構成情報を更新、読み取りできるように、高可用性ストレージサービスを提供することである.もう1つは、ZooKeeperの構成情報が変更されたときに、モニタが設定されているクライアントに対して、アクティブな構成サービスで、クライアントが通知を受け取ると、すぐに対応する操作を実行します.
簡略化のため、この例には2つの前置条件があります.第一に、構成項目の名前はznodeのパスで直接表され、znodeに格納されている文字列値は構成項目の値である.第二に、いつでも、構成を更新する操作は、単一のクライアントのみが実行します.
ZooKeeperに対して読み書きロジックを実行してActiveKeyValueにカプセル化し、コードは以下の通りである.
以上のコードでは、writeメソッドが書き込みを担当し、znodeがすでに存在する場合は直接値を設定し、存在しない場合はznodeを作成してから値を設定します.書き込み操作はznodeに保存されている値を返します.最も重要なのはreadメソッドのWacherパラメータです.読み取りクライアントは通知を受信するためにこのモニタを登録する必要があります.ConnectionWatcherベースクラスは、ZooKeeperインスタンス、ユーザー認証、およびZooKeeperサーバへの接続を作成します.
コンフィグUpdaterクラスは、ZooKeeperのコンフィギュレーションアイテムをランダムな時間間隔で更新し、コンフィギュレーションクライアントを更新します.コードは次のとおりです.
コンフィギュレーションサービスの読み取りクライアントとしてConfigWatcherを使用します.コードは次のとおりです.
コードの主な論理はprocessで実現され,構成が変更されるとクライアントは通知を受けて新しい構成内容を表示する.
簡略化のため、この例には2つの前置条件があります.第一に、構成項目の名前はznodeのパスで直接表され、znodeに格納されている文字列値は構成項目の値である.第二に、いつでも、構成を更新する操作は、単一のクライアントのみが実行します.
ZooKeeperに対して読み書きロジックを実行してActiveKeyValueにカプセル化し、コードは以下の通りである.
public class ActiveKeyValueStore extends ConnectionWatcher {
private static final charset CHARSET = charset.forName("UTF-8");
public void write(String path, String value) throws InterruptedException,
KeeperException {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
zk.setData(path, value.getBytes(CHARSET), -1);
}
}
public String read(String path, Watcher watcher) throws InteruptedException,
KeeperException {
byte[] data = zk.getData(path, watcher, null/*stat*/);
return new String(data, CHARSET);
}
}
以上のコードでは、writeメソッドが書き込みを担当し、znodeがすでに存在する場合は直接値を設定し、存在しない場合はznodeを作成してから値を設定します.書き込み操作はznodeに保存されている値を返します.最も重要なのはreadメソッドのWacherパラメータです.読み取りクライアントは通知を受信するためにこのモニタを登録する必要があります.ConnectionWatcherベースクラスは、ZooKeeperインスタンス、ユーザー認証、およびZooKeeperサーバへの接続を作成します.
コンフィグUpdaterクラスは、ZooKeeperのコンフィギュレーションアイテムをランダムな時間間隔で更新し、コンフィギュレーションクライアントを更新します.コードは次のとおりです.
public class ConfigUpdate {
public static final String PATH = "/config";
private ActiveKeyValueStore store;
private Random random = new Random();
public ConfigUpdater(String hosts) throws IOException, InterruptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void run throws InterruptedException, KeeperException {
while (ture) {
String value = random.nextInt(100) + '';
store.write(PATH,value);
System.out.printf("Set %s to %s
", PATH, value);
TimeUnit.SECONDS.sleep(random.nextInt(10));
}
}
public static void main(String[] args) throws Exception {
ConfigUpdater configUpdater = new ConfigUpdater(args[0]);
configUpdater.run();
}
}
コンフィギュレーションサービスの読み取りクライアントとしてConfigWatcherを使用します.コードは次のとおりです.
public class ConfigWatcher implements Watcher {
private ActiveKeyValueStore store;
public configWatcher(String hosts) throws IOException, InterreptedException {
store = new ActiveKeyValueStore();
store.connect(hosts);
}
public void displayConfig() throws InterruptedException, KeeperException {
String value = store.read(ConfigUpdater.PATH, this);
System.out.printf("Read %s as %s
", ConfigUpdater.PATH, value);
}
@Override
public void process(WatcherEvent event) {
if (event.getType() == EventType.NodeDataChanged) {
try {
displayConfig();
} catch (InterruptedException e) {
System.err.println("Interrupted. Exiting.");
Thread.currentThread().interrupted();
} catch (KeeperException e) {
System.err.printf("KeeperException: %s. Exiting.
", e);
}
}
}
public static void main(String[] args) throws Exception {
ConfigWatcher configWatcher = new ConfigWatcher(args[0]);
configWatcher.displayConfig();
Thread.sleep(Long.MAX_VALUE);
}
}
コードの主な論理はprocessで実現され,構成が変更されるとクライアントは通知を受けて新しい構成内容を表示する.