zookeeperは分布式ロックを実現する(公平ロック|非公平ロック)
13856 ワード
何気なく巨牛の人工知能のチュートリアルを見つけて、思わず共有してあげました.教程は基礎がゼロで、分かりやすくて、しかもとても面白くてユーモアがあって、小説を読むようです!すごいと思って、みんなに分かち合いました.ここをクリックしてチュートリアルにジャンプします.
zkは分散ロックを実現しており、このようなリソースは多いが、耐えられる推奨は少ない.twitterで実現されている分散ロックも見て、吊り下げている.しかしtwitterにパッケージされているzookeepreツールパッケージは、内容が少し多く、分散型を使いたいと思っていたら、少し冗長に見えます.従って、zkが分散ロックを実現するツールクラスを独自に実現し、公平ロックと非公品ロックを含む.コードとテストクラスを直接貼り付けます.
クラスの構成、propertiesの構成プロパティの読み出し
分散ロック実装クラス、
「アプリケーションContextUtils」に依存しています.これはカスタマイズされたツールクラスで、「アプリケーションContextAware」の代わりに実装できます.
テストクラス:
以上がzk実装などのプロセスおよびユニットテストである.
PS:この実装には、フェアロック、非フェアロックが含まれます.公平とは、ノードの削除時に隣接するノードにのみ通知して取得するロックである.非フェアロック(驚きのグループ効果)は、ノードがロックを待つすべてのスレッドを削除して再競合します.具体的な実装は実装コードを参照してください.
zkは分散ロックを実現しており、このようなリソースは多いが、耐えられる推奨は少ない.twitterで実現されている分散ロックも見て、吊り下げている.しかしtwitterにパッケージされているzookeepreツールパッケージは、内容が少し多く、分散型を使いたいと思っていたら、少し冗長に見えます.従って、zkが分散ロックを実現するツールクラスを独自に実現し、公平ロックと非公品ロックを含む.コードとテストクラスを直接貼り付けます.
クラスの構成、propertiesの構成プロパティの読み出し
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.PropertySource;
import org.springframework.stereotype.Component;
/**
* zk
* @author zc.ding
* @since 2019/3/23
*/
@Component
@PropertySource("classpath:zookeeper.properties")
@ConfigurationProperties(prefix = "zk")
@Data
public class ZkConfig {
private String address;
private int sessionTimeout;
private long waitConnTimeout;
private int fairLock;
}
分散ロック実装クラス、
import com.base.framework.core.commons.Constants;
import com.base.framework.core.utils.ApplicationContextUtils;
import com.base.framework.core.utils.ThreadPoolUtil;
import com.base.framework.zk.config.ZkConfig;
import com.base.framework.zk.exceptions.ZkFrameworkExpception;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* zk /
* @author zc.ding
* @since 2019/3/23
*/
@Component
@DependsOn({"zkConfig", "applicationContextUtils"})
public class DistributedLock implements InitializingBean {
private static final Logger LOG = LoggerFactory.getLogger(DistributedLock.class);
private static final ThreadLocal THREAD_LOCAL = new ThreadLocal<>();
private static final ThreadLocal ZK_THREAD_LOCAL = new ThreadLocal<>();
/** **/
private static final String LOCK_ROOT_PATH = "/locks";
/** **/
private static final String LOCK_SUFFIX = "_NO_";
/** **/
private static final String CREATE_ROOT_LOCK = "LOCK";
/** **/
private static final int LOCK_FAIR = 1;
private final static byte[] BUF = new byte[0];
private static ZkConfig zkConfig;
/**
*
* @param key ‘/’
* @return boolean
* @since :2019/3/23
* @author :[email protected]
*/
public static boolean tryLock(String key) {
return tryLock(key, Constants.LOCK_EXPIRES, Constants.LOCK_WAITTIME);
}
/**
*
* @param key /
* @param expire
* @param wait
* @return boolean
* @since :2019/3/23
* @author :[email protected]
*/
public static boolean tryLock(String key, long expire, long wait) {
ZooKeeper zooKeeper = getZooKeeper();
ZK_THREAD_LOCAL.set(zooKeeper);
return tryLock(zooKeeper, key, expire, wait);
}
/**
*
* @param zooKeeper zk
* @param key
* @param expire
* @param wait
* @return boolean
* @since :2019/3/23
* @author :[email protected]
*/
private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
expire = expire * 1000;
wait = wait * 1000;
final String currNode;
String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
try {
currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//
List nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
//
nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
nodes.sort(String::compareTo);
// session , zk ,
if (nodes.size() == 0) {
return false;
}
//
if (currNode.endsWith(nodes.get(0))) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//
CountDownLatch countDownLatch = new CountDownLatch(1);
//
if(zkConfig.getFairLock() == LOCK_FAIR){
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
delPath(zooKeeper);
//
if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
return tryLock(zooKeeper, key, expire, wait);
}
}
}
}else{
for (int i = 0; i < nodes.size(); i++) {
String node = nodes.get(i);
if (currNode.endsWith(node)) {
runExpireThread(zooKeeper, currNode, expire);
return true;
}
//
if (currNode.endsWith(nodes.get(i + 1))) {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
if (stat != null) {
// , , 2 ,
//
if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
delPath(zooKeeper);
return false;
}
return true;
}
}
}
}
} catch (KeeperException | InterruptedException e) {
LOG.error("create '{}' node fail.", key, e);
}
return false;
}
/**
*
* @since :2019/3/23
* @author :[email protected]
*/
public static void unLock() {
ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
delPath(zooKeeper);
close(ZK_THREAD_LOCAL.get());
THREAD_LOCAL.remove();
ZK_THREAD_LOCAL.remove();
}
/**
*
* @since :2019/3/23
* @author :[email protected]
*/
private static void createLockRootPath() {
ZooKeeper zooKeeper = getZooKeeper();
try {
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
synchronized (CREATE_ROOT_LOCK) {
stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
}
}
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
/**
* zooke
* @return org.apache.zookeeper.ZooKeeper
* @since :2019/3/23
* @author :[email protected]
*/
private static ZooKeeper getZooKeeper() {
final CountDownLatch countDownLatch = new CountDownLatch(1);
try {
final ZooKeeper zooKeeper = new ZooKeeper(zkConfig.getAddress(), zkConfig.getSessionTimeout(), null);
zooKeeper.register((watchedEvent) -> {
switch (watchedEvent.getState()) {
case Expired:
close(zooKeeper);
break;
case SyncConnected:
countDownLatch.countDown();
default:
}
});
if(!countDownLatch.await(zkConfig.getWaitConnTimeout(), TimeUnit.MILLISECONDS)){
close(zooKeeper);
throw new ZkFrameworkExpception("wait for creating zookeeper connection timeout, timeout is [" + zkConfig.getWaitConnTimeout() + "]");
}
return zooKeeper;
} catch (IOException | InterruptedException e) {
throw new ZkFrameworkExpception("create Zookeeper instance fail.", e);
}
}
/**
* , ,zk
* @param zooKeeper zk
* @param currNode
* @since :2019/3/23
* @author :[email protected]
*/
private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
THREAD_LOCAL.set(currNode);
ThreadPoolUtil.callFixedThreadPool(() -> {
Thread.sleep(expire * 1000);
LOG.info(" {} , .", expire);
delPath(zooKeeper);
return null;
});
}
/**
*
* @param zooKeeper zk
* @since :2019/3/23
* @author :[email protected]
*/
private static void delPath(ZooKeeper zooKeeper) {
try {
// ,
zooKeeper.delete(THREAD_LOCAL.get(), -1);
} catch (Exception e){
LOG.error("lock expire, delete lock");
}
}
/**
*
* @param zooKeeper zk
* @since :2019/3/23
* @author :[email protected]
*/
private static void close(ZooKeeper zooKeeper) {
if (zooKeeper != null) {
try {
zooKeeper.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void afterPropertiesSet() {
zkConfig = ApplicationContextUtils.getBean(ZkConfig.class);
createLockRootPath();
}
/**
*
*
* @since :2019/3/23
* @author :[email protected]
*/
static class LockWatcher implements Watcher {
private CountDownLatch latch;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted){
latch.countDown();
}
}
}
}
「アプリケーションContextUtils」に依存しています.これはカスタマイズされたツールクラスで、「アプリケーションContextAware」の代わりに実装できます.
テストクラス:
import com.base.framework.zk.utils.DistributedLock;
import org.apache.zookeeper.*;
import org.junit.FixMethodOrder;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.MethodSorters;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* zookeeper
*
* @author zc.ding
* @since 2019/3/23
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = AppServerApplicationTest.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class TestDistributedLock {
@Test
public void testConcurrentGetLock() throws Exception{
int count = 50;
ExecutorService executorService = Executors.newFixedThreadPool(count);
for( int i = 0; i < count; i++) {
executorService.execute(() -> {
try {
if(DistributedLock.tryLock("lock")){
System.out.println(" ");
if(sum == 0){
System.out.println("sum 0");
return;
}
System.out.println(" ,sum=" + sum);
sum = sum - 1;
// System.out.println(Thread.currentThread().getName() + ": 6 , .");
// Thread.sleep(6000);
// System.out.println(" , .");
}else{
System.out.println("zk !");
}
} catch (Exception e) {
e.printStackTrace();
}finally{
DistributedLock.unLock();
}
});
}
executorService.shutdown();
Thread.sleep(30000);
System.out.println("finish, sum=" + sum);
}
}
以上がzk実装などのプロセスおよびユニットテストである.
PS:この実装には、フェアロック、非フェアロックが含まれます.公平とは、ノードの削除時に隣接するノードにのみ通知して取得するロックである.非フェアロック(驚きのグループ効果)は、ノードがロックを待つすべてのスレッドを削除して再競合します.具体的な実装は実装コードを参照してください.