ZooKeeper分散ロック

15214 ワード

Redis分散ロックの本文では,著者らはRedisを用いて分散ロックを開発する方法を紹介した.
Redis分散ロックは軽量でスループットが高いという特徴があるが,一貫性保証は弱い.Zookeeperを用いて分散ロックを開発し,高い整合性に対する要求を満たすことができる.
Zookeeper分布式ロック原理
Zookeeperノードには、分散ロックの開発に役立ついくつかの性質があります.
  • テンポラリノード:クライアントはテンポラリノードを作成することができ、クライアントセッションが終了またはタイムアウトするとZookeeperは自動的にテンポラリノードを削除します.このプロパティは、デッドロックを回避するために使用できます.
  • フリップフロップ:ノードの状態が変化すると、Zookeeperは対応するイベントをリスニングするクライアントに通知します.この特性は、ブロック待機ロックを実現するために使用することができる.
  • 順序ノード:クライアントはノードの下にサブノードを作成できます.Zookeeperは、データベースの自己増加プライマリ・キーと同様に、サブノード数に基づいて整数シーケンス番号を自動的に生成します.

  • 比較的容易に考えられる分散ロックの実装方法は、次のとおりです.
  • ロックノードが作成されたかどうかを確認し、作成しない場合は一時ノード
  • を作成しようとする.
  • 一時ノードが正常に作成された場合は、ロックが正常に行われました.ロックを持つクライアントがクラッシュしたり、ネットワークが異常になってセッションを維持できない場合、ロックノードは削除され、デッドロックは発生しません.
  • 一時ノードの作成に失敗した場合は、ロックが失敗したことを示し、ロックを待つ.watchロックノードexistsイベントは、ノードが削除されたという通知を受信した後、再びロックを試みます.
  • ZookeeperのWatchは使い捨てなので、もう一度ロックを試みて失敗したら、Watchを再設定する必要があります.
  • 操作が完了すると、ロックノードを削除してロックを解除します.

  • このスキームでは、ロックが解放されると、Zookeeperがイベントを大量に購読したクライアントに通知する必要があるという問題があり、この現象を「驚群現象」または「羊群効果」と呼ぶ.
    驚きの群れ現象はZookeeperが正常にサービスを提供するのに非常に不利であるため、実際には通常別の案が採用されている.
  • は、ロックノードとして永続ノードを作成し、ロックしようとするクライアントは、ロックノードの下で一時的な順序ノードを作成する.Zookeeperはサブノードの秩序性を保証する.
  • ロックノードの下でidが最も小さいノードが現在のクライアントのために作成されたノードである場合、現在のクライアントが正常にロックされたことを示す.
  • そうでなければロックに失敗し、前のシーケンスノードを購読します.前のノードが削除されると、現在のノードが最小になり、ロックが成功したことを示します.
  • 操作が完了すると、ロックノードを削除してロックを解除します.

  • このスキームは、ロックが解放されるたびに、驚くべき現象を回避するために、クライアントに通知するだけです.
    このスキームの特徴は、優先的にキューに並んで待っているクライアントが先にロックを取得することであり、このロックをフェアロックと呼ぶ.ロックが解放されると、すべてのクライアントがロックを再競合するスキームを非公平ロックと呼ぶ.
    Demo
    このセクションでは、著者らはZookeeper公式Java APIを使用して簡単な公平なロックを実現します.
    Mavenを使用して依存管理、プロジェクト依存Zookeeper公式java sdkおよびapache commons-lang 3ツールパッケージ:
    
        org.apache.commons
        commons-lang3
        3.6
    
    
        org.apache.zookeeper
        zookeeper
        3.4.5
        pom
    
    

    クリックして完全なコードを表示します.
    package zk;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.zookeeper.*;
    
    /**
     * @author finley
     */
    public class ZKLock {
    
        private ZooKeeper zk;
    
        private String basePath;
    
        private String lockPath;
    
        private static final byte[] LOCK_DATA = "".getBytes();
    
        // zk         , basePath       ,     basePath         
        public ZKLock(ZooKeeper zk, String basePath) {
            //    zk      , '/'  ,   '/'  
            if (basePath.endsWith("/") || !basePath.startsWith("/")) {
                throw new IllegalArgumentException("base path must start with '/', and must not end with '/'");
            }
            this.zk = zk;
            this.basePath = basePath;
        }
    
        //    basePath       ,        
        private void ensureBasePath() throws KeeperException, InterruptedException {
            if (zk.exists(basePath, false) == null) {
                // basePath    ,    
                List pathParts = new ArrayList<>(Arrays.asList(basePath.split("/"))); //           
                pathParts.remove(0); //   basePath  '/'  , pathParts[0]      ,    
    
                //     ,              
                int last = 0;
                for (int i = pathParts.size() - 1; i >= 0; i--) {
                    String path = "/" + StringUtils.join(pathParts.subList(0, i), '/');
                    if (zk.exists(path, false) != null) {
                        last = i;
                        break;
                    }
                }
    
                //             ,      
                for (int i = last; i < pathParts.size(); i++) {
                    String path = "/" + StringUtils.join(pathParts.subList(0, i + 1), '/');
                    try {
                        zk.create(path, LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    } catch (KeeperException.NodeExistsException ignore) {} // may created by other thread
                }
    
            }
        }
    
        //         
        public void lock() throws KeeperException, InterruptedException {
            ensureBasePath();
    
            //   basePath           
            String lockPath = zk.create(basePath + "/lock_", LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
            System.out.println(Thread.currentThread().getName() + " create: " + lockPath);
    
            //           
            while(true) {
                //    basePath              
                //             ,            lockPath   ,   create   getChildren                 
                List children = zk.getChildren(basePath,false);
                Collections.sort(children);
                String minNode = children.get(0);
    
                //             ,    
                if (StringUtils.isNotBlank(lockPath) && StringUtils.isNotBlank(minNode) && StringUtils.equals(lockPath, basePath + "/" + minNode) {
                    this.lockPath = lockPath; //     ,     
                    return;
                }
    
                //     ,   watch
                String watchNode = null;
                String node = lockPath.substring(lockPath.lastIndexOf("/") + 1);
                for (int i = children.size() - 1; i >= 0; i--) {
                    String child = children.get(i);
                    if (child.compareTo(node) < 0) {
                        watchNode = child;
                        break;
                    }
                }
    
                //          ,   watch
                if (watchNode != null) {
                    System.out.println(Thread.currentThread().getName() + " watch: " + watchNode);
    
                    String watchPath = basePath + "/" + watchNode;
                    
                    //    getData    exists     :           watch      ,            (   /     )
                    // exists        ,       NodeDeleted  (         ,          )。           
                    //          ,getData      ,          
    
                    //       watch                   
                    try {
                        zk.getData(watchPath, event -> {
                            if(event.getType() == Watcher.Event.EventType.NodeDeleted) {
                                //        this.wait()
                                // fixme:      bug,        NodeDeleted      。            ,            
                                synchronized (this) {
                                    notifyAll();
                                }
                            }
                        }, null);
                    } catch(KeeperException.NoNodeException e) {
                        //              getData watch   ,        ,            
                        continue;
                    }
            
    
                    synchronized (this) {
                        //     watch   ,          ,            
                        wait();
                        System.out.println(Thread.currentThread().getName() + " notified");
                    }
                }
            }    
    
        }
    
        //    
        public void unlock() throws KeeperException, InterruptedException {
            //              lockPath
            if (StringUtils.isNotBlank(lockPath)) {
                zk.delete(lockPath, -1); //         
            } else {
                throw new IllegalStateException("don't has lock"); //                
            }
        }
    
        public static void main(String[] args) {
            int concurrent = 10;
            ExecutorService service = Executors.newFixedThreadPool(concurrent);
            for (int i = 0; i < concurrent; i++) {
                service.execute(() -> {
                    //             ,           zookeeper   
                    ZooKeeper zk;
                    try {
    
                        zk = new ZooKeeper("localhost:2181", 6000, watchedEvent -> {
                            if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState())
                                System.out.println("connection is established...");
                        });
    
                        ZKLock lock = new ZKLock(zk, "/test/node1");
    
                        lock.lock();
                        System.out.println(Thread.currentThread().getName() + "  acquire success");
    
                        Thread.sleep(1000);
                        System.out.println("do sth, thread: " + Thread.currentThread().getName());
    
                        lock.unlock();
                        System.out.println(Thread.currentThread().getName() + "  release success");
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            }
            service.shutdown();
        }
    
    }
    

    Curator
    CruatorはZookeeperツールセットであり、分散ロックを含む一般的なアプリケーションのパッケージを提供し、本稿ではCruatorの分散ロック実装ソースコードを例に分析する.
    mavenを使用して依存をインストールするには、次の手順に従います.
    
        org.apache.curator
        curator-recipes
        4.0.1
    
    

    ロックコードの作成:
    public class ZkLock {
    
        public static void main(String[] args) throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
            client.start();
    
            //      /curator/mutex
            InterProcessMutex mutex = new InterProcessMutex(client, "/curator/mutex");
            try {
                //     
                mutex.acquire();
                //     
                System.out.println("foo bar");
            } finally {
                //    
                mutex.release();
                client.close();
            }
    
        }
    
    }
    

    次に、InterProcessMutex.acquire()の実装を分析する.
    /**
     * Acquire the mutex - blocking until it's available. Note: the same thread
     * can call acquire re-entrantly. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
    */
    @Override
    public void acquire() throws Exception
    {
        if ( !internalLock(-1, null) )
        {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    

    次にinternalLockの方法を見てみましょう.
    private boolean internalLock(long time, TimeUnit unit) throws Exception
    {
        Thread currentThread = Thread.currentThread();
    
        // threadData     ConcurrentMap,          
        LockData lockData = threadData.get(currentThread);
        if ( lockData != null ) // lockData    ,          
        {
            //    ,       
            lockData.lockCount.incrementAndGet();
            return true;
        }
    
        // internals.attemptLock        Zookeeper      
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if ( lockPath != null )
        {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
    
        return false;
    }
    

    実際にロック操作を実行するinternals.attemptLockの方法を分析します.
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
    {
        final long      startMillis = System.currentTimeMillis();
        final Long      millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[]    localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int             retryCount = 0;
    
        String          ourPath = null;
        boolean         hasTheLock = false;
        boolean         isDone = false;
    
        //     
        while ( !isDone )
        {
            isDone = true;
    
            try
            {
                //              
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                //               ,     
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            }
            catch ( KeeperException.NoNodeException e )
            {
                //   session        ,             
                if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
                {
                    isDone = false;
                }
                else
                {
                    throw e;
                }
            }
        }
    
        if ( hasTheLock )
        {
            return ourPath;
        }
    
        return null;
    }
    

    まずStandardLockInternalsDriver.createsTheLock()ソースを読みます.
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
    {
        String ourPath;
        if ( lockNodeBytes != null )
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        }
        else
        {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    

    テンポラリシーケンスノードを作成し、説明しません.
    次に、internalLockLoopを参照します.
    while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
    {
        //        ,       
        List        children = getSortedChildren();
    
        //               
        String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
        PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
        if ( predicateResults.getsTheLock() )
        {
            haveTheLock = true;
        }
        else
        {
            //           
            String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
    
            //           wait(),          notifyall()   
            synchronized(this)
            {
                try 
                {
                    //   getData()  exists()       :
                    //             exists()      ,       (                  )。           ,   Zookeeper    
                    //          getData()      
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
    
                    //         
                    if ( millisToWait != null )
                    {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if ( millisToWait <= 0 )
                        {
                            doDelete = true;    // timed out - delete our node
                            break;
                        }
                        //        
                        wait(millisToWait);
                    }
                    else
                    {
                        //     
                        wait();
                    }
                }
                catch ( KeeperException.NoNodeException e ) 
                {
                    // getData()                 ,        。
                }
            }
        }
    }