zookeeperは分布式ロックを実現する(公平ロック|非公平ロック)

13856 ワード

何気なく巨牛の人工知能のチュートリアルを見つけて、思わず共有してあげました.教程は基礎がゼロで、分かりやすくて、しかもとても面白くてユーモアがあって、小説を読むようです!すごいと思って、みんなに分かち合いました.ここをクリックしてチュートリアルにジャンプします.
zkは分散ロックを実現しており、このようなリソースは多いが、耐えられる推奨は少ない.twitterで実現されている分散ロックも見て、吊り下げている.しかしtwitterにパッケージされているzookeepreツールパッケージは、内容が少し多く、分散型を使いたいと思っていたら、少し冗長に見えます.従って、zkが分散ロックを実現するツールクラスを独自に実現し、公平ロックと非公品ロックを含む.コードとテストクラスを直接貼り付けます.
クラスの構成、propertiesの構成プロパティの読み出し
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;

/**
 * zk     
 * @author zc.ding
 * @since 2019/3/23
 */
@Component
@PropertySource("classpath:zookeeper.properties")
@ConfigurationProperties(prefix = "zk")
@Data
public class ZkConfig {
    private String address;
    private int sessionTimeout;
    private long waitConnTimeout;
    private int fairLock;
       
}

分散ロック実装クラス、
import com.base.framework.core.commons.Constants;
import com.base.framework.core.utils.ApplicationContextUtils;
import com.base.framework.core.utils.ThreadPoolUtil;
import com.base.framework.zk.config.ZkConfig;
import com.base.framework.zk.exceptions.ZkFrameworkExpception;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * zk    /       
 * @author zc.ding
 * @since 2019/3/23
 */
@Component
@DependsOn({"zkConfig", "applicationContextUtils"})
public class DistributedLock implements InitializingBean {

    private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);
    private static final ThreadLocal THREAD_LOCAL = new ThreadLocal<>();
    private static final ThreadLocal ZK_THREAD_LOCAL = new ThreadLocal<>();
    /**       **/
    private static final String LOCK_ROOT_PATH = "/locks";
    /**     **/
    private static final String LOCK_SUFFIX = "_NO_";
    /**          **/
    private static final String CREATE_ROOT_LOCK = "LOCK";
    /**     **/
    private static final int LOCK_FAIR = 1;
    private final static byte[] BUF = new byte[0];
    private static ZkConfig zkConfig;

    /**
    *      
    *  @param key       ‘/’
    *  @return boolean
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    public static boolean tryLock(String key) {
        return tryLock(key, Constants.LOCK_EXPIRES, Constants.LOCK_WAITTIME);    
    }
    
    /**
    *     
    *  @param key    /  
    *  @param expire        
    *  @param wait      
    *  @return boolean
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    public static boolean tryLock(String key, long expire, long wait) {
        ZooKeeper zooKeeper = getZooKeeper();
        ZK_THREAD_LOCAL.set(zooKeeper);
        return tryLock(zooKeeper, key, expire, wait);
    }
    
    /**
    *     
    *  @param zooKeeper zk  
    *  @param key     
    *  @param expire        
    *  @param wait      
    *  @return boolean
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
        expire = expire * 1000;
        wait = wait * 1000;
        final String currNode;
        String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
        try {
            currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //   
            List nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
            //                 
            nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
            nodes.sort(String::compareTo);
            //               session         ,          zk     ,         
            if (nodes.size() == 0) {
                return false;
            }
            //                   
            if (currNode.endsWith(nodes.get(0))) {
                runExpireThread(zooKeeper, currNode, expire);
                return true;
            }
            //     
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //    
            if(zkConfig.getFairLock() == LOCK_FAIR){
                for (int i = 0; i < nodes.size(); i++) {
                    String node = nodes.get(i);
                    if (currNode.endsWith(node)) {
                        runExpireThread(zooKeeper, currNode, expire);
                        return true;
                    }
                    Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
                    if (stat != null) {
                        delPath(zooKeeper);
                        //     
                        if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
                            return tryLock(zooKeeper, key, expire, wait);
                        }
                    }
                }
            }else{
                for (int i = 0; i < nodes.size(); i++) {
                    String node = nodes.get(i);
                    if (currNode.endsWith(node)) {
                        runExpireThread(zooKeeper, currNode, expire);
                        return true;
                    }
                    //          
                    if (currNode.endsWith(nodes.get(i + 1))) {
                        Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
                        if (stat != null) {
                            //      ,      ,            2 ,                 
                            //           
                            if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
                                delPath(zooKeeper);
                                return false;
                            }
                            return true;
                        }
                    }
                }
            }
        } catch (KeeperException | InterruptedException e) {
            LOG.error("create '{}' node fail.", key, e);
        }
        return false;
    }

    /**
    *     
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    public static void unLock() {
        ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
        delPath(zooKeeper);
        close(ZK_THREAD_LOCAL.get());
        THREAD_LOCAL.remove();
        ZK_THREAD_LOCAL.remove();
    }

    

    /**
    *            
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    private static void createLockRootPath() {
        ZooKeeper zooKeeper = getZooKeeper();
        try {
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                synchronized (CREATE_ROOT_LOCK) {
                    stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
                    if (stat == null) {
                        LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
                        zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
                    }
                }
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
    *    zooke    
    *  @return org.apache.zookeeper.ZooKeeper
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    private static ZooKeeper getZooKeeper() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            final ZooKeeper zooKeeper = new ZooKeeper(zkConfig.getAddress(), zkConfig.getSessionTimeout(), null);
            zooKeeper.register((watchedEvent) -> {
                switch (watchedEvent.getState()) {
                    case Expired:
                        close(zooKeeper);
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                    default:
                }
            });

            if(!countDownLatch.await(zkConfig.getWaitConnTimeout(), TimeUnit.MILLISECONDS)){
                close(zooKeeper);
                throw new ZkFrameworkExpception("wait for creating zookeeper connection timeout, timeout is [" + zkConfig.getWaitConnTimeout() + "]");
            }
            return zooKeeper;
        } catch (IOException | InterruptedException e) {
            throw new ZkFrameworkExpception("create Zookeeper instance fail.", e);
        }
    }
    
    /**
    *                 ,      ,zk       
    *  @param zooKeeper zk  
    *  @param currNode      
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
        THREAD_LOCAL.set(currNode);
        ThreadPoolUtil.callFixedThreadPool(() -> {
            Thread.sleep(expire * 1000);
            LOG.info("   {} ,     .", expire);
            delPath(zooKeeper);
            return null;
        });
    }

    /**
    *         
    *  @param zooKeeper zk  
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    private static void delPath(ZooKeeper zooKeeper) {
        try {
            //        ,        
            zooKeeper.delete(THREAD_LOCAL.get(), -1);
        } catch (Exception e){
            LOG.error("lock expire, delete lock");
        }
    }

    /**
    *      
    *  @param zooKeeper zk  
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */    
    private static void close(ZooKeeper zooKeeper) {
        if (zooKeeper != null) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    @Override
    public void afterPropertiesSet() {
        zkConfig = ApplicationContextUtils.getBean(ZkConfig.class);
        createLockRootPath();
    }

    /**
    * 
    *          
    *  @since                   :2019/3/23
    *  @author                  :[email protected]
    */
    static class LockWatcher implements Watcher {
        private CountDownLatch latch;

        public LockWatcher(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void process(WatchedEvent event) {
            if(event.getType() == Event.EventType.NodeDeleted){
                latch.countDown();
            }
        }
    }
}

「アプリケーションContextUtils」に依存しています.これはカスタマイズされたツールクラスで、「アプリケーションContextAware」の代わりに実装できます.
テストクラス:
import com.base.framework.zk.utils.DistributedLock;
import org.apache.zookeeper.*;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *   zookeeper    
 *
 * @author zc.ding
 * @since 2019/3/23
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AppServerApplicationTest.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestDistributedLock {

    @Test
    public void testConcurrentGetLock() throws Exception{
        int count = 50;
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        for( int i = 0; i < count; i++) {
            executorService.execute(() -> {
                try {
                    if(DistributedLock.tryLock("lock")){
                        System.out.println("     ");
                        if(sum == 0){
                            System.out.println("sum   0");
                            return;
                        }
                        System.out.println("     ,sum=" + sum);
                        sum = sum - 1;
                        // System.out.println(Thread.currentThread().getName() + ":     6 ,            .");
                        // Thread.sleep(6000);
                        // System.out.println("       ,          .");
                    }else{
                        System.out.println("zk      !");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }finally{
                    DistributedLock.unLock();
                }
            });
        }
        executorService.shutdown();
        Thread.sleep(30000);
        System.out.println("finish, sum=" + sum);
    }
    
    
}

以上がzk実装などのプロセスおよびユニットテストである.
PS:この実装には、フェアロック、非フェアロックが含まれます.公平とは、ノードの削除時に隣接するノードにのみ通知して取得するロックである.非フェアロック(驚きのグループ効果)は、ノードがロックを待つすべてのスレッドを削除して再競合します.具体的な実装は実装コードを参照してください.