分散ロックとその実装

58841 ワード

分散ロックとその実装
なぜ分散ロックが必要なのか
スタンドアロンに配備されたプロジェクトでは、マルチスレッド間の同時制御は、Java関連の同時処理APIによってスレッド間の通信および反発を制御することができる.しかし、分散クラスタのシステムでは、単機配置の場合の同時制御戦略は失効し、単純なJava APIは分散環境での同時制御能力を備えていない.これには、共有リソースへのアクセスを制御するJVMにまたがる反発メカニズムが必要です.これが分散ロックの問題です.
分散シーンでは、CAP理論は,どの分散系でもコンシステンシ(Consistency),可用性(Availability),およびパーティションフォールトトレランスを同時に満たすことができないことを実証した.(Partition tolerance)は、最大2つしか同時に満たすことができないため、分散環境でのデータの最終的な一貫性を保証するためには、分散トランザクション、分散ロックなど、多くの技術案がサポートされる必要があります.
分散ロックのプロパティ
  • 分散システム環境において、1つの方法は、同じ時間に1つのマシンの1つのスレッドによって
  • しか実行できない.
  • 高可用性、高性能の取得ロックと解放ロック
  • は、再入可能な特性
  • を備える.
  • は、デッドロック
  • を防止するロック失効機構を備える.
  • は、非ブロッキングロック特性を備える、すなわち、ロックを取得することなく、取得ロック失敗
  • に直接戻る.
    分散ロックの3つの実装スキーム
    データベース実装
    データベースによる分散ロックの実装は、主に一意のインデックスに依存します.
    (一意のインデックス:同じインデックス値を持つ行は許可されず、重複するインデックスまたはキー値は禁止されます.データベースは、インデックスの作成時に重複するキー値があるかどうかを確認し、INSERTまたはUPDATE文を使用するたびにチェックします)
    実装の考え方:データベースに表を作成し、表にメソッド名などのフィールドを含み、メソッド名フィールドに一意のインデックスを作成します.あるメソッドを実行するには、このメソッド名を使用して表にデータを挿入します.一意のインデックスを作成したので、複数のリクエストが同時にデータベースに提出されても、1つの操作だけが成功することを保証します.挿入に成功するとメソッドのロックが取得され、実行が完了すると対応する行データ解放ロックが削除されます.
    CREATE TABLE `distributed_lock` (
      `id` int(11) NOT NULL COMMENT '  ',
      `method_name` varchar(64) NOT NULL COMMENT '   (        )',
      `desc` varchar(255) NOT NULL COMMENT '    ',
      `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  
      `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`),
      UNIQUE KEY `index_method_name` (`method_name`) USING BTREE
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;
    

    データベースによる分散ロックの強化
  • 分散ロックはデータベースの可用性に依存し、データベースが単一のポイントで停止している場合、分散ロック機能は無効になります.
  • ソリューション:
  • マルチマシン配置、データ同期、データベースマスター切替

  • 同じスレッドはロックを解除する前に行データがずっと存在し、再びデータを挿入できない.この場合、この分散ロックは再入性を備えていない
  • ソリューション:現在ロックを取得しているマシンとスレッドの情報を記録するためのテーブルの新しい列が追加され、このスレッドが再びロックを取得するときは、テーブル内のマシンとスレッドの情報が現在のマシンとスレッドと同じかどうかをクエリーし、同じであればロック
  • を直接取得します.
  • ロックが無効になっていないメカニズムは、ロックを取得した後、データベースがダウンタイムし、対応するローデータが削除されていない可能性があります.データベースサーバが回復した後も、テーブルのデータは存在し、ロックを取得できません.またはロック解除に失敗しました
  • ソリューション:
  • 表には、失効時間を記録するための列が追加され、これらの失効したデータを消去するタイミングタスクが必要である.この場合もビジネスニーズに応じてタイミングタスクの実行時間を考慮する必要がある、長すぎたり短すぎたりすることはできない.
  • マルチマシン配置、データ同期、データベースマスター切替

  • ロック特性をブロックし、コードロジックに失敗再試行メカニズム(whileループ)を追加し、ビジネスニーズに応じて成功するまでロックを複数回取得したり、失敗回数に達した後に戻るなど
  • .
    データベースが分散ロックを実装する問題
    僕らはmethod_nameは一意のインデックスを使用し、for updateを使用して行レベルロックを使用することを示します.
    ただし、MySqlはクエリを最適化します.条件にインデックスフィールドが使用されていても、インデックスを使用してデータを取得するかどうかは、MySQLが実行計画の異なる代価を判断することによって決定されます.MySQLが、ロー・ロックではなくテーブル全体のスキャン効率が高いと考えている場合、インデックスは使用されません.この場合、InnoDBはテーブル・ロックを使用します.
    Redisによる分散ロックの実装
    実現構想:
  • setnx:keyが存在しない場合にのみ、keyがvalの文字列をセットし、1を返します.keyが存在する場合、何もしないで0
  • を返します.
  • ロックを取得する場合、setnxロックを使用し、ロックのvalue値はランダムに生成されたUUIDであってもよく、expireコマンドを使用してロックにタイムアウト時間を追加し、それを超えると自動的にロック
  • を解放する.
  • がロックを取得するときに1つの取得ロックのタイムアウト時間を設定し、それを超えると取得ロック
  • を破棄する.
  • がロックを解除する場合、ランダムに生成するUUIDによってロックのキー値ペアが対応するか否かを照合し、delete解除ロック
  • を実行する.
    /** * redis       */
    @Component
    public class DistributedLock {
        @Autowired
        private JedisPool jedisPool;
    
        /** *    * @param lockName   redis  key * @param acquireTimeOut           * @param timeout          * @return */
        public String lockWithTimeOut(String lockName, int acquireTimeOut, long timeout) {
            /** *  setnx key    ; *         (UUID),        ,           * *              ,                       */
            String identifier = UUID.randomUUID().toString().replaceAll("-", "");
            timeout = System.currentTimeMillis() + timeout;
            String reIdentifier = "";
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.select(0);
                //                  
                while(System.currentTimeMillis() < timeout){
                    Long setnx = jedis.setnx(lockName, identifier);
                    if (setnx != null && setnx == 1){
                        //      
                        jedis.expire(lockName, acquireTimeOut);
                        reIdentifier = identifier;
                        break;
                    }else {
                      //       
                     //  key    ,      ,   key              ,      
                        Long ttl = jedis.ttl(lockName);
                        if (ttl == -1){
                            jedis.expire(lockName, acquireTimeOut);
                        }
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            System.out.println("    ");
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            } catch (Exception e) {
                //TODO     
                e.printStackTrace();
            } finally {
                if (jedis != null)
                    jedis.close();
            }
            return reIdentifier;
        }
    
        /** *     * @param lockName      * @param identifier     (       val    ) * @return */
        public boolean releaseLock(String lockName, String identifier){
            Jedis jedis = null;
            boolean flag = false;
            try {
                jedis = jedisPool.getResource();
                jedis.select(0);
                jedis.watch(lockName);
                String result = jedis.get(lockName);
                if (result != null && identifier.equals(result)){
                    Transaction multi = jedis.multi();
                    multi.del(lockName);
                    List<Object> exec = multi.exec();
                    if (exec != null && exec.size() > 0){
                        flag = true;
                    }
                }
                jedis.unwatch();
                return flag;
            } catch (Exception e) {
                //TODO     
                e.printStackTrace();
                return false;
            } finally {
                if (jedis != null)
                    jedis.close();
            }
        }
    
    }
    

    Redis分散ロックの強化
  • ロック失効時間
  • ロックが無効になる時間は、実際のビジネスニーズに応じて適切な値
  • を設定する必要がある.
  • 設定の失効時間が短すぎて、方法が実行を待たずにロックが自動的に解放されると、同時問題
  • が発生する.
  • 設定時間が長すぎると、他のロックを取得するスレッドが白くなる可能性があります.
  • はwhileループを利用してロックを取得することができ、再試行間隔時間と最大再試行時間を設定してロックブロック特性
  • を実現することができる.
  • 再入力不可
  • ソリューション:
  • スレッドはロックを取得した後、現在のホスト情報とスレッド情報を保存し、次回取得する前に自分が現在のロックの所有者かどうかを確認します.ロックを解除するときにこれらの情報を
  • 削除する.

  • 単一障害
  • ソリューション:
  • Redisクラスタ、Redisマスター


  • Redisが分散ロックを実現する上での問題点
    このような最大の欠点は、1つのRedisノードにのみロックが適用され、sentinelによってRedisが高可用性を保証しても、このmasterノードが何らかの理由でプライマリ・セカンダリ・スイッチングが発生した場合、ロックが失われることです.
  • Redisのmasterノードでロック
  • を取得する.
  • しかし、このロックされたkeyはslaveノード
  • に同期していない.
  • master障害、フェイルオーバ、slaveノードがmasterノード
  • にアップグレード
  • によりロックが失われる
  • Zookeeperによる分散ロック
    実現構想:
    各クライアントがメソッドにロックをかけると、zookeeper上のメソッドに対応する指定されたノードのディレクトリの下で、一意の瞬時秩序ノード(EPHEMERAL_SEQUENTIAL)が生成されます.
    Zookeeperを使用して実装できる分散ロックはブロックされており、クライアントはZKで瞬時秩序ノードを作成し、ノードにリスナーをバインドすることができ、ノードが変化すると、ZKはクライアントに通知し、クライアントは自分が作成したノードが**現在のすべてのノードの中で最も番号の小さい**であるかどうかを確認することができ、もしそうであれば自分でロックを取得することができる.逆に待ち続ける
    ロックを解除する場合は、この瞬時ノードを削除するだけでよい.同時に、一時的にノードがセッションが切断されると自動的に削除されるため、サービスダウンタイムによるロックが解放されないことを回避できます.
    /** * Zookeeper        */
    public class ZooKeeperLock implements Watcher {
    
        // ZK  
        private ZooKeeper zk = null;
        //         
        private String rootLockNode;
        //     ,         
        private String lockName;
        //    
        private String currentLock;
        //     (    )
        private String waitLock;
        //    (              )
        private CountDownLatch countDownLatch;
        //     
        private int sessionTimeout = 30000;
    
    
        /** *       ZK  ,        * * @param zkAddress ZK    * @param rootLockNode       * @param lockName       */
        public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
            this.rootLockNode = rootLockNode;
            this.lockName = lockName;
            try {
                /** *     ,zkAddress   :IP:PORT * watcher       */
                zk = new ZooKeeper(zkAddress, this.sessionTimeout, this);
                /** *            ,       */
                Stat stat = zk.exists(rootLockNode, false);
                if (null == stat) {
                    zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } 
        }
    
        /** *     ,     ,               * * @return */
        public boolean lock() {
            if (this.tryLock()) {
                System.out.println("  【" + Thread.currentThread().getName() + "】  (" + this.currentLock + ")  !");
                return true;
            } else {
                return waitOtherLock(this.waitLock, this.sessionTimeout);
            }
        }
    
        public boolean tryLock() {
            //    
            String split = "_lock_";
            if (this.lockName.contains("_lock_")) {
                throw new RuntimeException("lockName can't contains '_lock_' ");
            }
            try {
                /** *      (      )         * * path:    /    +    */
                this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
                System.out.println("  【" + Thread.currentThread().getName()
                        + "】     (" + this.currentLock + ")  ,    ...");
    
                /** *         */
                List<String> nodes = zk.getChildren(this.rootLockNode, false);
                /** *         lockName   */
                List<String> lockNodes = new ArrayList<String>();
                for (String nodeName : nodes) {
                    if (nodeName.split(split)[0].equals(this.lockName)) {
                        lockNodes.add(nodeName);
                    }
                }
                Collections.sort(lockNodes);
    
                /** *                  * *                           *             */
                String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
                if (this.currentLock.equals(currentLockPath)) {
                    return true;
                }
    
                /** *     ,             */
                String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
                int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
                this.waitLock = lockNodes.get(preNodeIndex);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /** *      ,      * * @param waitLock           * @param sessionTimeout            * @return */
        private boolean waitOtherLock(String waitLock, int sessionTimeout) {
            boolean islock = false;
            try {
                //        
                String waitLockNode = this.rootLockNode + "/" + waitLock;
                Stat stat = zk.exists(waitLockNode, true);
                if (null != stat) {
                    System.out.println("  【" + Thread.currentThread().getName()
                            + "】 (" + this.currentLock + ")    ,   (" + waitLockNode + ")  ...");
                    /** *      ,         ,      */
                    this.countDownLatch = new CountDownLatch(1);
                    islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
                    this.countDownLatch = null;
                    if (islock) {
                        System.out.println("  【" + Thread.currentThread().getName() + "】 ("
                                + this.currentLock + ")    , (" + waitLockNode + ")    ");
                    } else {
                        System.out.println("  【" + Thread.currentThread().getName() + "】 ("
                                + this.currentLock + ")    ...");
                    }
                } else {
                    islock = true;
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return islock;
        }
    
        /** *        * * @throws InterruptedException */
        public void unlock() throws InterruptedException {
            try {
                Stat stat = zk.exists(this.currentLock, false);
                if (null != stat) {
                    System.out.println("  【" + Thread.currentThread().getName() + "】    " + this.currentLock);
                    zk.delete(this.currentLock, -1);
                    this.currentLock = null;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            } finally {
                zk.close();
            }
        }
    
        /** *         * * @param watchedEvent */
        @Override
        public void process(WatchedEvent watchedEvent) {
            /** *           *      ,       */
            if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
                this.countDownLatch.countDown();
            }
        }
    }
    

    Curatorの分散ロック
    /** * Curator        : * InterProcessMutex:           * InterProcessSemaphoreMutex:        * InterProcessReadWriteLock:        * InterProcessMultiLock:                 */
    public class CuratorLock {
    
        public static void main(String[] args) {
            /** *       ,  zk    * curator  zookeeper   :ExponentialBackoffRetry * baseSleepTimeMs:  sleep    * maxRetries:       * maxSleepMs:       */
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = 
                CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
            //      
            client.start();
            /** *            ,      client,      /locks */
            InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
            try {
                /** *      * public boolean acquire(long time, TimeUnit unit) *            *             */
                mutex.acquire(3, TimeUnit.SECONDS);
    
                /** *     */
                mutex.release();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                client.close();
            }
        }
    }
    

    ZKが分散ロックを実現する問題
    **Zookeeperを使用すると、同時性の問題が発生する可能性があります:**ネットワークのジッタにより、クライアントはZKクラスタのセッション接続が切断され、zkはクライアントが停止したと思って一時ノードを削除します.このとき、他のクライアントは分散ロックを取得することができ、同時性の問題が発生する可能性があります.
    この問題は、zkには再試行メカニズムがあり、zkクラスタがクライアントの心拍数を検出できないと再試行され、Curatorクライアントは複数の再試行ポリシーをサポートするため、一般的ではない.テンポラリノードは、複数回再試行してもだめな場合に削除されます(ロックの粒度と同時性のバランスをとるには、適切な再試行ポリシーを選択することも重要です).
    三者の比較
  • パフォーマンスの観点から(高いものから低いものまで)
  • キャッシュ>Zookeeper>=データベース
  • 信頼性の観点から(高いものから低いものまで)
  • Zookeeper>キャッシュ>データベース