Zookeeperを用いてリーダー選挙を実現
シーンの適用
分散システムの最も典型的なアーキテクチャは
一主多従
.多くの場合、大規模なデータ、画像、ファイルなどを処理するが、このような仕事は極めて資源を消費し、データ、ファイルなどは共有されており、すべての機械が一度計算処理すれば、貴重な計算資源を浪費する.これらの作業は1台のマシンに任せることができ,他のマシンはデータベース,分散ファイルシステムなどを通じて計算結果Leader(Master)を共有する.また、データベース、キャッシュなどのコンポーネントの読み書き分離は、従来の性能向上方式である.読み書き分離はリーダーにすべて書き、クエリーはfollowerを使用するマシンです.Zookeeperが提供するAPIを使用すると、leader選挙を簡単に実現できます.
具体的な手順
1.クライアント接続時、指定したディレクトリ(ここでは「/leader」と仮定)に1つ作成する
EPHEMERAL_SEQUENTIAL
のノードは、イントラネットのIPデータを作成ノードに格納します.
2、ディレクトリのサブポイントセクションを取得し、シリアル番号が一番小さいノードを取得し、このノードをleaderに設定します.このノードが削除されると、leader断線が証明されます.
3、他の機器はリーダーノードを傍受し、リーダーノードの削除時に、ディレクトリの最小子点節をリーダーとする.
詳細コード
zkクライアントツールクラス
LeaderElection
テスト
mainメソッドを実行し、本機械のある出力結果
注意:ここでは単純なテストのみを行いました.本番環境で使用するには、まず複数のJVMで同時にテストしてください.
参考記事:
http://zookeeper.apache.org/doc/r3.4.9/recipes.html
本文はオリジナルで、転載は出典を明記してください
http://blog.csdn.net/massivestars/article/details/53894551
ZooKeeperは、分散ロックとキューを実装します.参照してください.
Zookeeperによる分散ロック
ZooKeeperによるキューの実装
分散システムの最も典型的なアーキテクチャは
一主多従
.多くの場合、大規模なデータ、画像、ファイルなどを処理するが、このような仕事は極めて資源を消費し、データ、ファイルなどは共有されており、すべての機械が一度計算処理すれば、貴重な計算資源を浪費する.これらの作業は1台のマシンに任せることができ,他のマシンはデータベース,分散ファイルシステムなどを通じて計算結果Leader(Master)を共有する.また、データベース、キャッシュなどのコンポーネントの読み書き分離は、従来の性能向上方式である.読み書き分離はリーダーにすべて書き、クエリーはfollowerを使用するマシンです.Zookeeperが提供するAPIを使用すると、leader選挙を簡単に実現できます.
具体的な手順
1.クライアント接続時、指定したディレクトリ(ここでは「/leader」と仮定)に1つ作成する
EPHEMERAL_SEQUENTIAL
のノードは、イントラネットのIPデータを作成ノードに格納します.
2、ディレクトリのサブポイントセクションを取得し、シリアル番号が一番小さいノードを取得し、このノードをleaderに設定します.このノードが削除されると、leader断線が証明されます.
3、他の機器はリーダーノードを傍受し、リーダーノードの削除時に、ディレクトリの最小子点節をリーダーとする.
詳細コード
zkクライアントツールクラス
package org.massive.common;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Created by Massive on 2016/12/18.
*/
public class ZooKeeperClient {
private static String connectionString = "localhost:2181";
private static int sessionTimeout = 10000;
public static ZooKeeper getInstance() throws IOException, InterruptedException {
//--------------------------------------------------------------
// zookeeper get/create/exists (KeeperErrorCode = ConnectionLoss)
// Zookeeper
//--------------------------------------------------------------
final CountDownLatch connectedSignal = new CountDownLatch(1);
ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
return zk;
}
public static int getSessionTimeout() {
return sessionTimeout;
}
public static void setSessionTimeout(int sessionTimeout) {
ZooKeeperClient.sessionTimeout = sessionTimeout;
}
}
LeaderElection
package org.massive.group;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
/**
* Created by Massive on 2016/12/25.
*/
public class LeaderElection {
private ZooKeeper zk;
private int sessionTimeout;
private static byte[] DEFAULT_DATA = {0x12,0x34};
private static Object mutex = new Object();
private static String ROOT = "/leader";
private byte[] localhost = getLocalIpAdressBytes();
private String znode;
private static CountDownLatch firstElectionSignal = new CountDownLatch(1);
//----------------------------------------------------
// leader IP
//----------------------------------------------------
private static String leader;
public LeaderElection() throws IOException, InterruptedException, KeeperException {
this.zk = ZooKeeperClient.getInstance();
this.sessionTimeout = zk.getSessionTimeout();
ensureExists(ROOT);
ensureLocalNodeExists();
System.out.println("-------------------------------------");
System.out.println("local IP: " + getLocalIpAddress());
System.out.println("local created node: " + znode);
System.out.println("-------------------------------------");
}
/**
* ,
* @throws KeeperException
* @throws InterruptedException
*/
public void ensureLocalNodeExists() throws KeeperException, InterruptedException {
List list = zk.getChildren(ROOT,new NodeDeleteWatcher());
for (String node : list) {
Stat stat = new Stat();
String path = ROOT + "/" + node;
byte[] data = zk.getData(path,false,stat);
if (Arrays.equals(data,localhost)) {
znode = path;
return;
}
}
znode = zk.create(ROOT + "/", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
}
public void ensureExists(String path) {
try {
Stat stat = zk.exists(path, false);
if (stat == null) {
zk.create(path, DEFAULT_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void start() throws KeeperException, InterruptedException, UnsupportedEncodingException {
do{
synchronized (mutex) {
System.out.println("begin leader election...");
List nodes = zk.getChildren(ROOT, null);
SortedSet sortedNode = new TreeSet();
for (String node : nodes) {
sortedNode.add(ROOT + "/" + node);
}
//----------------------------------------------------
//
//----------------------------------------------------
String first = sortedNode.first();
leader = first;
//----------------------------------------------------
// ( )
//----------------------------------------------------
NodeDeleteWatcher watcher = znode.equals(first) ? null : new NodeDeleteWatcher();
byte[] data = zk.getData(first, watcher, null);
leader = new String(data, "UTF-8");
System.out.println("leader election end, the leader is : " + leader);
if (firstElectionSignal.getCount() != 0) {
firstElectionSignal.countDown();
}
if (znode.equals(first)) {
return;
}
mutex.wait();
}
} while (true);
}
class NodeDeleteWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (mutex) {
mutex.notify();
}
}
}
}
/**
* IP
* @return
* @throws SocketException
*/
public static String getLocalIpAddress() throws SocketException {
//
Enumeration nifs = NetworkInterface.getNetworkInterfaces();
while (nifs.hasMoreElements()) {
NetworkInterface nif = nifs.nextElement();
// IP ,
Enumeration addresses = nif.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
String ip = addr.getHostAddress();
// IPv4
if (addr instanceof Inet4Address && !"127.0.0.1".equals(ip)) {
return ip;
}
}
}
return null;
}
public static byte[] getLocalIpAdressBytes() throws SocketException {
String ip = getLocalIpAddress();
return ip == null ? null : ip.getBytes();
}
public static String getLeader() throws InterruptedException {
//----------------------------------------------------
// leader
//----------------------------------------------------
firstElectionSignal.await();
return leader;
}
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
//-------------------------------------------------------
// leader
//-------------------------------------------------------
new Thread(new Runnable() {
@Override
public void run() {
try {
LeaderElection leaderElection = new LeaderElection();
leaderElection.start();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
String leader = LeaderElection.getLeader();
}
}
テスト
mainメソッドを実行し、本機械のある出力結果
-------------------------------------
local IP: 192.168.1.103
local created node: /leader/0000000017
-------------------------------------
begin leader election...
leader election end, the leader is : 192.168.1.103
注意:ここでは単純なテストのみを行いました.本番環境で使用するには、まず複数のJVMで同時にテストしてください.
参考記事:
http://zookeeper.apache.org/doc/r3.4.9/recipes.html
本文はオリジナルで、転載は出典を明記してください
http://blog.csdn.net/massivestars/article/details/53894551
ZooKeeperは、分散ロックとキューを実装します.参照してください.
Zookeeperによる分散ロック
ZooKeeperによるキューの実装