zookeeper分散ロックの実装

10737 ワード

zookeeper分散ロックの実装
一時的な順序ノードです.このタイプのノードには、次のいくつかの特性があります.
  • ノードのライフサイクルは、ノードを作成するクライアント・セッションが失効すると、このノードも消去されます.
  • 各親ノードは、その子ノードが作成した順序を維持し、順序ノード(SEQUENTIAL)が作成されると、親ノードは自動的にそのノードに整形数値を割り当て、接尾辞の形でノード名に自動的に追加され、このノードの最終ノード名として使用される.

  • 上記の2つの特性を用いて,分散ロックを実現するための基本論理を取得する.
  • クライアント呼び出しcreate()メソッド「_locknode_/guid-lock-」という名前のノードを作成するには、ノードの作成タイプをEPHEMERAL_に設定する必要があることに注意してください.SEQUENTIAL.
  • クライアント呼び出しgetChildren("_locknode")メソッドは、作成したすべてのサブノードを取得し、このノードにサブノード変更通知のWatcherを登録します.
  • クライアントがすべてのサブノードpathを取得した後、ステップ1で作成したノードがすべてのノードの中で最もシーケンス番号が小さいことに気づいた場合、このクライアントはロックを取得したと考えられる.
  • ステップ3で自分がすべてのサブノードの中で最も小さいものではないことを発見し、自分がまだロックを取得していないことを説明したら、次のサブノード変更通知まで待ち始め、サブノードの取得を行い、ロックを取得するか否かを判断する.

  • ロックを解除するプロセスは比較的簡単で、自分で作成したサブノードを削除すればいいです.
    以上の情報は次のとおりです.http://jm-blog.aliapp.com/?p=2554
    この考え方に基づいてzookeeperベースの分散ロックを実現する.
    直接コードを貼って、以下のように、不適切な点や改善すべき点があれば、ご指導ください.
    package com.usfot;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.util.List;
    import java.util.SortedSet;
    import java.util.TreeSet;
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * Shared          ,         。
     * DistributedSharedLock         。    
     * Created by liyanxin on 2015/3/18.
     */
    public class DistributedSharedLock implements Watcher {
    
        private static final String ADDR = "127.0.0.1:2181";
        private static final String LOCK_NODE = "guid-lock-";
        private String rootLockNode; //   
        private ZooKeeper zk = null;
        private Integer mutex;
        private Integer currentLock;
    
        /**
         *       
         *   zk   
         *   zk   
         *
         * @param rootLockNode
         */
        public DistributedSharedLock(String rootLockNode) {
            this.rootLockNode = rootLockNode;
            try {
                //  zk   
                zk = new ZooKeeper(ADDR, 10 * 10000, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
            mutex = new Integer(-1);
            // Create ZK node name
            if (zk != null) {
                try {
                    //       
                    Stat s = zk.exists(rootLockNode, false);
                    if (s == null) {
                        zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out.println("Keeper exception when instantiating queue: " + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }
    
        /**
         *   zk   ,   
         *
         * @throws KeeperException
         * @throws InterruptedException
         */
        public void acquire() throws KeeperException, InterruptedException {
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;
            // Add child with value i
            b.putInt(ThreadLocalRandom.current().nextInt(10));
            value = b.array();
    
            //      
            String lockName = zk.create(rootLockNode + "/" + LOCK_NODE, value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
    
            synchronized (mutex) {
                while (true) {
                    //         number,         
                    Integer acquireLock = new Integer(lockName.substring(lockName.lastIndexOf('-') + 1));
                    List<String> childLockNode = zk.getChildren(rootLockNode, true);
    
                    SortedSet<Integer> sortedLock = new TreeSet<Integer>();
                    for (String temp : childLockNode) {
                        Integer tempLockNumber = new Integer(temp.substring(temp.lastIndexOf('-') + 1));
                        sortedLock.add(tempLockNumber);
                    }
    
                    currentLock = sortedLock.first();
    
                    //                            
                    if (currentLock >= acquireLock) {
                        System.err.println("thread_name=" + Thread.currentThread().getName() + "|attend lcok|lock_num=" + currentLock);
                        return;
                    } else {
                        //               
                        System.err.println("thread_name=" + Thread.currentThread().getName() + "|wait lcok|lock_num=" + currentLock);
                        mutex.wait();
                    }
                }
            }
        }
    
    
        /**
         *    
         *
         * @throws KeeperException
         * @throws InterruptedException
         */
        public void release() throws KeeperException, InterruptedException {
            String lockName = String.format("%010d", currentLock);
            zk.delete(rootLockNode + "/" + LOCK_NODE + lockName, -1);
            System.err.println("thread_name=" + Thread.currentThread().getName() + "|release lcok|lock_num=" + currentLock);
        }
    
        @Override
        public void process(WatchedEvent event) {
            synchronized (mutex) {
                mutex.notify();
            }
        }
    }

    テストコードは以下の通りです.
    package com.usfot;
    
    import org.apache.zookeeper.KeeperException;
    
    /**
     *   10   ,        ,           。
     *      zk      。       ,   。
     *          ,           。
     * Created by liyanxin on 2015/3/18.
     */
    public class DistributedSharedLockTest {
    
        public static void main(String args[]) {
            for (int i = 0; i < 10; i++) {
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");
                        try {
                            lock.acquire();
                            Thread.sleep(1000); //              
                            System.out.println("======           ======");
                            lock.release();
                            System.err.println("=============================");
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                t.start();
            }
        }
    }

    テストの状況を見てみると、
    thread_name=Thread-5|wait lcok|lock_num=300
    thread_name=Thread-1|attend lcok|lock_num=300
    thread_name=Thread-9|wait lcok|lock_num=300
    thread_name=Thread-3|wait lcok|lock_num=300
    thread_name=Thread-7|wait lcok|lock_num=300
    thread_name=Thread-6|wait lcok|lock_num=300
    thread_name=Thread-2|wait lcok|lock_num=300
    thread_name=Thread-0|wait lcok|lock_num=300
    thread_name=Thread-8|wait lcok|lock_num=300
    thread_name=Thread-4|wait lcok|lock_num=300
    ======           ======
    thread_name=Thread-2|wait lcok|lock_num=301
    thread_name=Thread-6|wait lcok|lock_num=301
    thread_name=Thread-1|release lcok|lock_num=300
    =============================
    .................................

    これは一部の印刷ログです.
    ロックオブジェクトのスレッドセキュリティを別の方法でテストします.次のテストコードは、
    package com.usfot;
    
    import org.apache.zookeeper.KeeperException;
    
    /**
     * Created by liyanxin on 2015/3/18.
     */
    public class DistributedSharedLockTest2 {
    
    
        public static void main(String args[]) {
            final DistributedSharedLock lock = new DistributedSharedLock("/_locknode_");
    
            /**
             *              ,           
             *      
             */
            for (int i = 0; i < 10; i++) {
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            lock.acquire();
                            Thread.sleep(1000); //              
                            System.out.println("======           ======");
                            lock.release();
                            System.err.println("=============================");
                        } catch (KeeperException e) {
                            e.printStackTrace();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                t.start();
            }
        }
    }

    テストの結果、デッドロックdeadlockの問題が発見されましたが、この問題はどのようにして発生しましたか?複数のスレッドがmutexオブジェクトの内蔵ロックで競合するためです.スレッドAがmutexオブジェクトの内蔵ロックを取得すると、同期コードブロックに入り、zkサーバの分散ロックを取得する操作を行い、分散ロックを取得すると、同期コードブロックを終了し、mutexの内蔵ロックもスレッドAによって解放される.多くのスレッドがmutexオブジェクトの内蔵ロックを競合しています.このとき,スレッドBはmutexの内蔵ロックを取得し,同期コードブロックに入るが,分散ロックが取得されていないため,スレッドBは待機する.その後、スレッドAが分散ロックを解放し、zkサーバロックノードを削除すると、watcherイベントがトリガーされ、mutexオブジェクト内蔵ロックで待機しているスレッドが起動し、
    注意notifyで起動します.notify皆さんは、すべての待機スレッドの1つしか起動できないことを知っているはずです.ちょうどこの時に起動したのはスレッドBではないかもしれませんが、deadlockが来ました.
    どうやって解決しますか?私はnotifyをnotifyAllに変えて試してみましたが、プログラムは順調に実行でき、デッドロックはありません.
    次のログでコードを再実行します.
    thread_name=Thread-9|wait lcok|lock_num=360
    thread_name=Thread-0|wait lcok|lock_num=360
    thread_name=Thread-1|wait lcok|lock_num=360
    thread_name=Thread-5|wait lcok|lock_num=360
    thread_name=Thread-7|wait lcok|lock_num=360
    thread_name=Thread-3|wait lcok|lock_num=360
    thread_name=Thread-6|wait lcok|lock_num=360
    thread_name=Thread-2|attend lcok|lock_num=360
    thread_name=Thread-4|wait lcok|lock_num=360
    thread_name=Thread-8|wait lcok|lock_num=360
    ======           ======
    thread_name=Thread-2|release lcok|lock_num=360
    =============================
    thread_name=Thread-8|attend lcok|lock_num=361
    thread_name=Thread-4|wait lcok|lock_num=361
    thread_name=Thread-6|wait lcok|lock_num=361
    ..............

    一部の印刷ログ
    参照先:http://blog.csdn.net/java2000_wl/article/details/8694270
    ========================================
    改善された分散ロックの実装
    以下は改良された分散ロック実装であり、従来の実装と唯一異なる点は、ここではロック競合他社ごとに設計され、注目する必要があることである」locknode_”ノードの下のシーケンス番号が自分より小さいノードが存在するかどうか.次のようになります.
  • クライアント呼び出しcreate()メソッド「_locknode_/guid-lock-」という名前のノードを作成するには、ノードの作成タイプをEPHEMERAL_に設定する必要があることに注意してください.SEQUENTIAL.
  • クライアント呼び出しgetChildren("_locknode")メソッドは、作成されたすべてのサブノードを取得します.ここではWatcherは登録されていません.
  • クライアントがすべてのサブノードpathを取得した後、ステップ1で作成したノード番号が最も小さいことに気づいた場合、このクライアントはロックを取得したとみなされる.
  • は、ステップ3において、自分がすべてのサブノードの中で最も小さいものではないことを発見した場合、自分がまだロックを取得していないことを示す.クライアントは、自分より小さいノードを見つけてexist()メソッドを呼び出し、イベントリスニングを登録する必要があります.
  • 以降、注目されるノードが除去されると、クライアントは対応する通知を受信する.このときクライアントはgetChildrenを再度呼び出す必要がある("_locknode_")メソッドは、作成したすべてのサブノードを取得し、自分が本当に最小のノードであることを確認し、ステップ3に進みます.

  • ==================END==================