Zookeeperを用いてリーダー選挙を実現

9643 ワード

シーンの適用
分散システムの最も典型的なアーキテクチャは
一主多従
.多くの場合、大規模なデータ、画像、ファイルなどを処理するが、このような仕事は極めて資源を消費し、データ、ファイルなどは共有されており、すべての機械が一度計算処理すれば、貴重な計算資源を浪費する.これらの作業は1台のマシンに任せることができ,他のマシンはデータベース,分散ファイルシステムなどを通じて計算結果Leader(Master)を共有する.また、データベース、キャッシュなどのコンポーネントの読み書き分離は、従来の性能向上方式である.読み書き分離はリーダーにすべて書き、クエリーはfollowerを使用するマシンです.Zookeeperが提供するAPIを使用すると、leader選挙を簡単に実現できます.
具体的な手順
1.クライアント接続時、指定したディレクトリ(ここでは「/leader」と仮定)に1つ作成する
EPHEMERAL_SEQUENTIAL
のノードは、イントラネットのIPデータを作成ノードに格納します.
2、ディレクトリのサブポイントセクションを取得し、シリアル番号が一番小さいノードを取得し、このノードをleaderに設定します.このノードが削除されると、leader断線が証明されます.
3、他の機器はリーダーノードを傍受し、リーダーノードの削除時に、ディレクトリの最小子点節をリーダーとする.
詳細コード
zkクライアントツールクラス
package org.massive.common;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * Created by Massive on 2016/12/18.
 */
public class ZooKeeperClient {

    private static String connectionString = "localhost:2181";
    private static int sessionTimeout = 10000;


    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        //--------------------------------------------------------------
        //             zookeeper get/create/exists     (KeeperErrorCode = ConnectionLoss)
        //    Zookeeper          
        //--------------------------------------------------------------
        final CountDownLatch connectedSignal = new CountDownLatch(1);
        ZooKeeper zk = new ZooKeeper(connectionString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    connectedSignal.countDown();
                }
            }
        });
        connectedSignal.await(sessionTimeout, TimeUnit.MILLISECONDS);
        return zk;
    }

    public static int getSessionTimeout() {
        return sessionTimeout;
    }

    public static void setSessionTimeout(int sessionTimeout) {
        ZooKeeperClient.sessionTimeout = sessionTimeout;
    }

}

LeaderElection
package org.massive.group;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.massive.common.ZooKeeperClient;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;

/**
 * Created by Massive on 2016/12/25.
 */
public class LeaderElection {

    private ZooKeeper zk;
    private int sessionTimeout;

    private static byte[] DEFAULT_DATA = {0x12,0x34};
    private static Object mutex = new Object();

    private static String ROOT = "/leader";

    private byte[] localhost = getLocalIpAdressBytes();

    private String znode;

    private static CountDownLatch firstElectionSignal = new CountDownLatch(1);

    //----------------------------------------------------
    // leader IP  
    //----------------------------------------------------
    private static String leader;

    public LeaderElection() throws IOException, InterruptedException, KeeperException {
        this.zk = ZooKeeperClient.getInstance();
        this.sessionTimeout = zk.getSessionTimeout();
        ensureExists(ROOT);
        ensureLocalNodeExists();

        System.out.println("-------------------------------------");
        System.out.println("local IP: " + getLocalIpAddress());
        System.out.println("local created node: " + znode);
        System.out.println("-------------------------------------");


    }

    /**
     *            ,      
     * @throws KeeperException
     * @throws InterruptedException
     */
    public void ensureLocalNodeExists() throws KeeperException, InterruptedException {
        List list = zk.getChildren(ROOT,new NodeDeleteWatcher());
        for (String node : list) {
            Stat stat = new Stat();
            String path = ROOT + "/" + node;
            byte[] data = zk.getData(path,false,stat);

            if (Arrays.equals(data,localhost)) {
                znode = path;
                return;
            }
        }
        znode = zk.create(ROOT + "/", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);

    }

    public void ensureExists(String path) {
        try {
            Stat stat = zk.exists(path, false);
            if (stat == null) {
                zk.create(path, DEFAULT_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void start() throws KeeperException, InterruptedException, UnsupportedEncodingException {

        do{
            synchronized (mutex) {

                System.out.println("begin leader election...");

                List nodes = zk.getChildren(ROOT, null);
                SortedSet sortedNode = new TreeSet();
                for (String node : nodes) {
                    sortedNode.add(ROOT + "/" + node);
                }
                //----------------------------------------------------
                //           
                //----------------------------------------------------
                String first = sortedNode.first();
                leader = first;
                //----------------------------------------------------
                //         (       )
                //----------------------------------------------------
                NodeDeleteWatcher watcher = znode.equals(first) ? null : new NodeDeleteWatcher();
                byte[] data = zk.getData(first, watcher, null);
                leader = new String(data, "UTF-8");

                System.out.println("leader election end, the leader is : " + leader);

                if (firstElectionSignal.getCount() != 0) {
                    firstElectionSignal.countDown();
                }

                if (znode.equals(first)) {
                    return;
                }

                mutex.wait();
            }
        } while (true);


    }

    class NodeDeleteWatcher implements Watcher {
        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Event.EventType.NodeDeleted) {
                synchronized (mutex) {
                    mutex.notify();
                }
            }
        }
    }

    /**
     *       IP  
     * @return
     * @throws SocketException
     */
    public static String getLocalIpAddress() throws SocketException {
        //            
        Enumeration nifs = NetworkInterface.getNetworkInterfaces();

        while (nifs.hasMoreElements()) {
            NetworkInterface nif = nifs.nextElement();

            //             IP   ,      
            Enumeration addresses = nif.getInetAddresses();
            while (addresses.hasMoreElements()) {
                InetAddress addr = addresses.nextElement();

                String ip = addr.getHostAddress();
                //     IPv4   
                if (addr instanceof Inet4Address && !"127.0.0.1".equals(ip)) {
                    return ip;
                }
            }
        }
        return null;
    }

    public static byte[] getLocalIpAdressBytes() throws SocketException {
        String ip = getLocalIpAddress();
        return ip == null ? null : ip.getBytes();
    }

    public static String getLeader() throws InterruptedException {
        //----------------------------------------------------
        //    leader          
        //----------------------------------------------------
        firstElectionSignal.await();
        return leader;
    }

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {

        //-------------------------------------------------------
        //          leader  
        //-------------------------------------------------------
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    LeaderElection leaderElection = new LeaderElection();
                    leaderElection.start();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        String leader = LeaderElection.getLeader();

    }
}

テスト
mainメソッドを実行し、本機械のある出力結果
-------------------------------------
local IP: 192.168.1.103
local created node: /leader/0000000017
-------------------------------------
begin leader election...
leader election end, the leader is : 192.168.1.103

注意:ここでは単純なテストのみを行いました.本番環境で使用するには、まず複数のJVMで同時にテストしてください.
参考記事:  
http://zookeeper.apache.org/doc/r3.4.9/recipes.html
本文はオリジナルで、転載は出典を明記してください
http://blog.csdn.net/massivestars/article/details/53894551
ZooKeeperは、分散ロックとキューを実装します.参照してください.
Zookeeperによる分散ロック
ZooKeeperによるキューの実装