ZooKeeper分散ロック
15214 ワード
Redis分散ロックの本文では,著者らはRedisを用いて分散ロックを開発する方法を紹介した.
Redis分散ロックは軽量でスループットが高いという特徴があるが,一貫性保証は弱い.Zookeeperを用いて分散ロックを開発し,高い整合性に対する要求を満たすことができる.
Zookeeper分布式ロック原理
Zookeeperノードには、分散ロックの開発に役立ついくつかの性質があります.テンポラリノード:クライアントはテンポラリノードを作成することができ、クライアントセッションが終了またはタイムアウトするとZookeeperは自動的にテンポラリノードを削除します.このプロパティは、デッドロックを回避するために使用できます. フリップフロップ:ノードの状態が変化すると、Zookeeperは対応するイベントをリスニングするクライアントに通知します.この特性は、ブロック待機ロックを実現するために使用することができる. 順序ノード:クライアントはノードの下にサブノードを作成できます.Zookeeperは、データベースの自己増加プライマリ・キーと同様に、サブノード数に基づいて整数シーケンス番号を自動的に生成します.
比較的容易に考えられる分散ロックの実装方法は、次のとおりです.ロックノードが作成されたかどうかを確認し、作成しない場合は一時ノード を作成しようとする.一時ノードが正常に作成された場合は、ロックが正常に行われました.ロックを持つクライアントがクラッシュしたり、ネットワークが異常になってセッションを維持できない場合、ロックノードは削除され、デッドロックは発生しません. 一時ノードの作成に失敗した場合は、ロックが失敗したことを示し、ロックを待つ.watchロックノードexistsイベントは、ノードが削除されたという通知を受信した後、再びロックを試みます. ZookeeperのWatchは使い捨てなので、もう一度ロックを試みて失敗したら、Watchを再設定する必要があります. 操作が完了すると、ロックノードを削除してロックを解除します.
このスキームでは、ロックが解放されると、Zookeeperがイベントを大量に購読したクライアントに通知する必要があるという問題があり、この現象を「驚群現象」または「羊群効果」と呼ぶ.
驚きの群れ現象はZookeeperが正常にサービスを提供するのに非常に不利であるため、実際には通常別の案が採用されている.は、ロックノードとして永続ノードを作成し、ロックしようとするクライアントは、ロックノードの下で一時的な順序ノードを作成する.Zookeeperはサブノードの秩序性を保証する. ロックノードの下でidが最も小さいノードが現在のクライアントのために作成されたノードである場合、現在のクライアントが正常にロックされたことを示す. そうでなければロックに失敗し、前のシーケンスノードを購読します.前のノードが削除されると、現在のノードが最小になり、ロックが成功したことを示します. 操作が完了すると、ロックノードを削除してロックを解除します.
このスキームは、ロックが解放されるたびに、驚くべき現象を回避するために、クライアントに通知するだけです.
このスキームの特徴は、優先的にキューに並んで待っているクライアントが先にロックを取得することであり、このロックをフェアロックと呼ぶ.ロックが解放されると、すべてのクライアントがロックを再競合するスキームを非公平ロックと呼ぶ.
Demo
このセクションでは、著者らはZookeeper公式Java APIを使用して簡単な公平なロックを実現します.
Mavenを使用して依存管理、プロジェクト依存Zookeeper公式java sdkおよびapache commons-lang 3ツールパッケージ:
クリックして完全なコードを表示します.
Curator
CruatorはZookeeperツールセットであり、分散ロックを含む一般的なアプリケーションのパッケージを提供し、本稿ではCruatorの分散ロック実装ソースコードを例に分析する.
mavenを使用して依存をインストールするには、次の手順に従います.
ロックコードの作成:
次に、
次に
実際にロック操作を実行する
まず
テンポラリシーケンスノードを作成し、説明しません.
次に、
Redis分散ロックは軽量でスループットが高いという特徴があるが,一貫性保証は弱い.Zookeeperを用いて分散ロックを開発し,高い整合性に対する要求を満たすことができる.
Zookeeper分布式ロック原理
Zookeeperノードには、分散ロックの開発に役立ついくつかの性質があります.
比較的容易に考えられる分散ロックの実装方法は、次のとおりです.
このスキームでは、ロックが解放されると、Zookeeperがイベントを大量に購読したクライアントに通知する必要があるという問題があり、この現象を「驚群現象」または「羊群効果」と呼ぶ.
驚きの群れ現象はZookeeperが正常にサービスを提供するのに非常に不利であるため、実際には通常別の案が採用されている.
このスキームは、ロックが解放されるたびに、驚くべき現象を回避するために、クライアントに通知するだけです.
このスキームの特徴は、優先的にキューに並んで待っているクライアントが先にロックを取得することであり、このロックをフェアロックと呼ぶ.ロックが解放されると、すべてのクライアントがロックを再競合するスキームを非公平ロックと呼ぶ.
Demo
このセクションでは、著者らはZookeeper公式Java APIを使用して簡単な公平なロックを実現します.
Mavenを使用して依存管理、プロジェクト依存Zookeeper公式java sdkおよびapache commons-lang 3ツールパッケージ:
org.apache.commons
commons-lang3
3.6
org.apache.zookeeper
zookeeper
3.4.5
pom
クリックして完全なコードを表示します.
package zk;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.zookeeper.*;
/**
* @author finley
*/
public class ZKLock {
private ZooKeeper zk;
private String basePath;
private String lockPath;
private static final byte[] LOCK_DATA = "".getBytes();
// zk , basePath , basePath
public ZKLock(ZooKeeper zk, String basePath) {
// zk , '/' , '/'
if (basePath.endsWith("/") || !basePath.startsWith("/")) {
throw new IllegalArgumentException("base path must start with '/', and must not end with '/'");
}
this.zk = zk;
this.basePath = basePath;
}
// basePath ,
private void ensureBasePath() throws KeeperException, InterruptedException {
if (zk.exists(basePath, false) == null) {
// basePath ,
List pathParts = new ArrayList<>(Arrays.asList(basePath.split("/"))); //
pathParts.remove(0); // basePath '/' , pathParts[0] ,
// ,
int last = 0;
for (int i = pathParts.size() - 1; i >= 0; i--) {
String path = "/" + StringUtils.join(pathParts.subList(0, i), '/');
if (zk.exists(path, false) != null) {
last = i;
break;
}
}
// ,
for (int i = last; i < pathParts.size(); i++) {
String path = "/" + StringUtils.join(pathParts.subList(0, i + 1), '/');
try {
zk.create(path, LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException ignore) {} // may created by other thread
}
}
}
//
public void lock() throws KeeperException, InterruptedException {
ensureBasePath();
// basePath
String lockPath = zk.create(basePath + "/lock_", LOCK_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + " create: " + lockPath);
//
while(true) {
// basePath
// , lockPath , create getChildren
List children = zk.getChildren(basePath,false);
Collections.sort(children);
String minNode = children.get(0);
// ,
if (StringUtils.isNotBlank(lockPath) && StringUtils.isNotBlank(minNode) && StringUtils.equals(lockPath, basePath + "/" + minNode) {
this.lockPath = lockPath; // ,
return;
}
// , watch
String watchNode = null;
String node = lockPath.substring(lockPath.lastIndexOf("/") + 1);
for (int i = children.size() - 1; i >= 0; i--) {
String child = children.get(i);
if (child.compareTo(node) < 0) {
watchNode = child;
break;
}
}
// , watch
if (watchNode != null) {
System.out.println(Thread.currentThread().getName() + " watch: " + watchNode);
String watchPath = basePath + "/" + watchNode;
// getData exists : watch , ( / )
// exists , NodeDeleted ( , )。
// ,getData ,
// watch
try {
zk.getData(watchPath, event -> {
if(event.getType() == Watcher.Event.EventType.NodeDeleted) {
// this.wait()
// fixme: bug, NodeDeleted 。 ,
synchronized (this) {
notifyAll();
}
}
}, null);
} catch(KeeperException.NoNodeException e) {
// getData watch , ,
continue;
}
synchronized (this) {
// watch , ,
wait();
System.out.println(Thread.currentThread().getName() + " notified");
}
}
}
}
//
public void unlock() throws KeeperException, InterruptedException {
// lockPath
if (StringUtils.isNotBlank(lockPath)) {
zk.delete(lockPath, -1); //
} else {
throw new IllegalStateException("don't has lock"); //
}
}
public static void main(String[] args) {
int concurrent = 10;
ExecutorService service = Executors.newFixedThreadPool(concurrent);
for (int i = 0; i < concurrent; i++) {
service.execute(() -> {
// , zookeeper
ZooKeeper zk;
try {
zk = new ZooKeeper("localhost:2181", 6000, watchedEvent -> {
if (Watcher.Event.KeeperState.SyncConnected == watchedEvent.getState())
System.out.println("connection is established...");
});
ZKLock lock = new ZKLock(zk, "/test/node1");
lock.lock();
System.out.println(Thread.currentThread().getName() + " acquire success");
Thread.sleep(1000);
System.out.println("do sth, thread: " + Thread.currentThread().getName());
lock.unlock();
System.out.println(Thread.currentThread().getName() + " release success");
} catch (Exception e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}
Curator
CruatorはZookeeperツールセットであり、分散ロックを含む一般的なアプリケーションのパッケージを提供し、本稿ではCruatorの分散ロック実装ソースコードを例に分析する.
mavenを使用して依存をインストールするには、次の手順に従います.
org.apache.curator
curator-recipes
4.0.1
ロックコードの作成:
public class ZkLock {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
client.start();
// /curator/mutex
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/mutex");
try {
//
mutex.acquire();
//
System.out.println("foo bar");
} finally {
//
mutex.release();
client.close();
}
}
}
次に、
InterProcessMutex.acquire()
の実装を分析する./**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
{
if ( !internalLock(-1, null) )
{
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
次に
internalLock
の方法を見てみましょう.private boolean internalLock(long time, TimeUnit unit) throws Exception
{
Thread currentThread = Thread.currentThread();
// threadData ConcurrentMap,
LockData lockData = threadData.get(currentThread);
if ( lockData != null ) // lockData ,
{
// ,
lockData.lockCount.incrementAndGet();
return true;
}
// internals.attemptLock Zookeeper
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if ( lockPath != null )
{
LockData newLockData = new LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
実際にロック操作を実行する
internals.attemptLock
の方法を分析します.String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
//
while ( !isDone )
{
isDone = true;
try
{
//
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// ,
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// session ,
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
まず
StandardLockInternalsDriver.createsTheLock()
ソースを読みます.public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
String ourPath;
if ( lockNodeBytes != null )
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
}
else
{
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
テンポラリシーケンスノードを作成し、説明しません.
次に、
internalLockLoop
を参照します.while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
// ,
List children = getSortedChildren();
//
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
//
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
// wait(), notifyall()
synchronized(this)
{
try
{
// getData() exists() :
// exists() , ( )。 , Zookeeper
// getData()
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
//
wait(millisToWait);
}
else
{
//
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// getData() , 。
}
}
}
}