Zookeeperによる分散ロック


1.第1の態様において、スレッドsleep 1秒クロックサイクルによるロックの取り方は、性能が劣る上に、併発量が大きい場合にはスレッドがロックを取得できない場合があるという欠点がある

/**
 *  zookeeper ,  sleep1 ,
 *  , 。
 */
public class ZookeeperDistributeLock_01 {

    private static final String connectString = "xx:2181,xx:2182";

    private ZooKeeper zooKeeper;

    private static final String lock = "/lock";

    private static int count = 0;

    public ZookeeperDistributeLock_01() {
        try {
            this.zooKeeper = new ZooKeeper(connectString, 5000, null);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        try {
            zooKeeper.create(lock, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            //  , 
            try {
                Thread.sleep(1000);
                lock();
            } catch (InterruptedException e1) {
            }
        }
    }

    public void unLock() {
        try {
            zooKeeper.delete(lock, 0);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *  
     *
     * @param args
     */
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            ZookeeperDistributeLock_01 distributeLock = new ZookeeperDistributeLock_01();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    distributeLock.lock();
                    for (int j = 0; j < 1000; j++) {
                        count++;
                    }
                    distributeLock.unLock();
                    countDownLatch.countDown();
                }
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }
}

2. zookeeper watch , , , , io


/**
 *  Zookeeper watch ,
 *  , , , IO 。
 */
public class ZookeeperDistributeLock_02 implements Watcher {

    private static final String connectString = "xx:2181,xx:2182";

    private ZooKeeper zooKeeper;

    private static final String lock = "/lock";

    private static int count = 0;

    private CountDownLatch countDownLatch = null;

    public ZookeeperDistributeLock_02() {
        try {
            this.zooKeeper = new ZooKeeper(connectString, 5000, this);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        countDownLatch = new CountDownLatch(1);
        try {
            zooKeeper.create(lock, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            try {
                // lock  
                zooKeeper.exists(lock,true);
                // node , 
                countDownLatch.await();
                lock();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        }
    }

    public void unLock() {
        try {
            zooKeeper.delete(lock, 0);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        countDownLatch.countDown();
        System.out.println(" :" + watchedEvent);
    }

    /**
     *  
     *
     * @param args
     */
    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        for (int i = 0; i < 10; i++) {
            ZookeeperDistributeLock_02 distributeLock = new ZookeeperDistributeLock_02();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    distributeLock.lock();
                    for (int j = 0; j < 1000; j++) {
                        count++;
                    }
                    distributeLock.unLock();
                    countDownLatch.countDown();
                }
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }


}

3. zookeeper watch , , zookeeper


/**
 *  
 */
public class ZookeeperDistributeLock_03 implements Watcher {

    private static final String connectString = "xx:2181,xx:2182";

    private ZooKeeper zooKeeper;

    private static final String parent = "/parent";

    private static final String lock = "/lock";

    private static int count = 0;

    private CountDownLatch countDownLatch = null;

    private String currentNodePath;

    public ZookeeperDistributeLock_03() {
        try {
            this.zooKeeper = new ZooKeeper(connectString, 5000, this);
            //create parent znode
            if (zooKeeper.exists(parent, false) == null) {
                zooKeeper.create(parent, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void lock() {
        //get all child znode in parent
        try {
            currentNodePath = zooKeeper.create(parent + lock, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            acquireLock(zooKeeper);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void acquireLock(ZooKeeper zooKeeper) {
        try {
            List childs = zooKeeper.getChildren(parent, false);
            Collections.sort(childs);

            // currentNodePath childs 
            if (currentNodePath.equals(parent + "/" + childs.get(0))) {
                // 
                System.out.println(" :  " + currentNodePath);
                return;
            }
            //  , watch
            countDownLatch = new CountDownLatch(1);
            String leftNodePath = "";
            for (int i = 0; i < childs.size(); i++) {
                if (currentNodePath.equals(parent + "/" + childs.get(i))) {
                    leftNodePath = parent + "/" + childs.get(i - 1);
                    break;
                }
            }
            zooKeeper.exists(leftNodePath, true);
            countDownLatch.await();
            acquireLock(zooKeeper);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void unLock() {
        try {
            System.out.println(" : " + currentNodePath);
            zooKeeper.delete(currentNodePath, 0);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println(" :" + watchedEvent);
        countDownLatch.countDown();
    }

    /**
     *  
     *
     * @param args
     */
    public static void main(String[] args) {
        final CountDownLatch countDownLatch = new CountDownLatch(10);

        for (int i = 0; i < 10; i++) {

            new Thread(new Runnable() {
                @Override
                public void run() {
                    ZookeeperDistributeLock_03 distributeLock = new ZookeeperDistributeLock_03();
                    distributeLock.lock();
                    for (int j = 0; j < 10000; j++) {
                        count++;
                    }
                    distributeLock.unLock();
                    countDownLatch.countDown();
                }
            }).start();
        }
        try {
            countDownLatch.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(count);
    }


}