ZooKeeperによる分散ロック

7583 ワード

1.分散ロックの由来:
プログラム開発の過程で考えざるを得ないのは同時問題である.Javaでは、同じjvmに対して、jdkはlockや同期などを提供しています.しかし、分散型の場合、複数のプロセスがいくつかのリソースに競争関係を生じることが多いが、これらのプロセスは異なるマシン上で、jdkで提供されるものはすでに満たされていないことが多い.分散ロックは,分散の場合の同時ロックを満たすことができることを考慮している.次に,zkを用いて分散ロックを実現する方法について説明する.
2.実現構想:
2.1 zk簡単な紹介:
ZooKeeperはApacheソフトウェア財団のソフトウェアプロジェクトで、大規模な分散コンピューティングにオープンソースの分散構成サービス、同期サービス、ネーミング登録を提供しています.ZooKeeperでは、ノードタイプは、永続ノード(PERSISTENT)、一時ノード(EPHEMERAL)、およびシーケンスノード(SEQUENTIAL)に分けられます.具体的には、ノード作成中に一般的に組み合わせて使用され、4つのノードタイプを生成できます.永続ノード(PERSISTENT)、永続順序ノード(PERSISTENT_SEQUENTIAL)、一時ノード(EPHEMERAL)、一時順序ノード(EPHEMERAL_SEQUENTIAL);具体的なノードの意味は、グーグルの.
2.2 zkによる実現:
多くのプロセスが共有リソースにアクセスする必要がある場合、zkによって分散ロックを実現することができます.主なステップは:1.名前が「lock」である場合、ノードを作成します.ノードタイプは永続ノード(PERSISTENT)2.プロセスが共有リソースにアクセスする必要があるたびに、分散ロックのlock()メソッドまたはtryLock()メソッドを呼び出してロックを取得します.このとき、最初のステップで作成したlockノードの下で適切な順序のサブノードが確立されます.ノードタイプが一時順序ノード(EPHEMERAL_SEQUENTIAL)は、特定の名前name+lock+シーケンス番号を構成することにより構成される.3.サブノードが確立された後、lockの下にあるすべてのnameで始まるサブノードを並べ替え、確立されたばかりのサブノードシーケンス番号が最小のノードであるか否かを判断し、最小のノードであればロックを取得してリソースにアクセスする.4.このノードでなければ、そのノードの前の順序のノードで、そのノードに登録リスニングイベントがあるかどうかを指定します.同時にここで渋滞しています.リスニングイベントの発生を待機し、ロック制御権を取得します.  5.共有リソースを呼び出した後、unlock()メソッドを呼び出し、zkを閉じ、さらにリスニングイベントを開始してロックを解放することができる.実装される分散ロックは、厳格に順次アクセスされる同時ロックである.
3.コード実装:
Javaによる分散ロックの実装について説明します:1.クラスDistributedLockを確立し、javaを実現する.util.concurrent.locks.Lock;とorg.apache.zookeeper.Watcherインタフェース2.lockを実現するには、主にlock、tryLock、unlockなど3.watcherインタフェースの下でのprocessメソッドを実現します.  4.コンストラクタでzkを初期化します.  5.詳細はコードコメントを参照
package cn.wpeace.zktest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
/**
 * @author peace
 *
 */
public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/locks";// 
    private String lockName;//       
    private String waitNode;//      
    private String myZnode;//   
    private CountDownLatch latch;//   
    private CountDownLatch connectedSignal=new CountDownLatch(1);
    private int sessionTimeout = 30000; 
    /**
     *       ,      config   zookeeper    
     * @param config 192.168.1.127:2181
     * @param lockName       ,lockName       _lock_
     */
    public DistributedLock(String config, String lockName){
        this.lockName = lockName;
        //            
         try {
            zk = new ZooKeeper(config, sessionTimeout, this);
            connectedSignal.await();
            Stat stat = zk.exists(root, false);//      Watcher
            if(stat == null){
                //      
                zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); 
            }
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
    /**
     * zookeeper      
     */
    public void process(WatchedEvent event) {
        //     
        if(event.getState()==KeeperState.SyncConnected){
            connectedSignal.countDown();
            return;
        }
        //          
        if(this.latch != null) {  
            this.latch.countDown();  
        }
    }

    public void lock() {   
        try {
            if(this.tryLock()){
                System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);//   
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        } 
    }
    public boolean tryLock() {
        try {
            String splitStr = "_lock_";
            if(lockName.contains(splitStr))
                throw new LockException("lockName can not contains \\u000B");
            //       
            myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(myZnode + " is created ");
            //       
            List subNodes = zk.getChildren(root, false);
            //    lockName  
            List lockObjNodes = new ArrayList();
            for (String node : subNodes) {
                String _node = node.split(splitStr)[0];
                if(_node.equals(lockName)){
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);

            if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
                //        ,      
                System.out.println(myZnode + "==" + lockObjNodes.get(0));
                return true;
            }
            //         ,      1   
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);//        
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if(this.tryLock()){
                return true;
            }
            return waitForLock(waitNode,time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(root + "/" + lower,true);//      。
        //                ,           ,      
        if(stat != null){
            System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);//  ,               
            this.latch = null;
        }
        return true;
    }
    public void unlock() {
        try {
            System.out.println("unlock " + myZnode);
            zk.delete(myZnode,-1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    public void lockInterruptibly() throws InterruptedException {
        this.lock();
    }
    public Condition newCondition() {
        return null;
    }

    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
    }
}