zookeeper適用シーン練習(分散ロック)

5302 ワード


通常の高同時プログラムでは、データの一貫性を保証するために、現在のスレッドをロックするためにロックが使用されます.シングル・マシン・オペレーションでは、Synchronized、Lock、または他の読み書きマルチを使用して、現在のスレッドをロックすることができます.しかし,分散型のシステムでは,これを実現することは困難である.したがってzookeeperにおけるノードの特性を用いてこれを満たすことができる.大まかに実現する構想は以下の通りである.
 1.各クライアントはzookeeperに一時的なシーケンスノードを作成します
 2.クライアントは、現在作成されているノードが最小であるかどうかを判断します.
 3.もしそうであれば、現在のタスクを実行するロックが取得されます.
 4.そうでなければ,自分より小さいノードを見つけて傍受し,削除されればロックを得ることができる.
上は大まかな実装構想であり,次にコードによって実装する.
 
package com.test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedLock {

	private String lockName;
	private final int timeOut = 3000;
	private final String root = "/locks";
	private String myZnode;//         
	private String waitZnode;
	private static Logger logger = LoggerFactory
			.getLogger(DistributedLock.class);
	private CuratorFramework client;
	private CountDownLatch latch = new CountDownLatch(1);

	public DistributedLock(String connectString, String lockName) {
		this.lockName = lockName;
		client = CuratorFrameworkFactory.builder().connectionTimeoutMs(timeOut)
				.connectString(connectString)
				.retryPolicy(new RetryNTimes(3, 3000)).build();
		ConnectionStateListener listener = new ConnectionStateListener() {

			public void stateChanged(CuratorFramework client,
					ConnectionState newState) {
				if (newState == ConnectionState.CONNECTED) {
					logger.info("     ");
					latch.countDown();
				}
			}
		};

		client.getConnectionStateListenable().addListener(listener);
		client.start();
		try {
			latch.await();
			createRoot();
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	/**
	 * @Title:      root
	 * @Description: TODO
	 * @param
	 * @return void
	 * @throws
	 */
	private void createRoot() {
		try {
			Stat stat = client.checkExists().forPath(root);
			if (stat != null) {
				logger.info("root has already exists");
			} else {
				//      
				client.create().creatingParentsIfNeeded().forPath(root);

			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public void getLocks() {

		try {
			myZnode = client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
					.forPath(root + "/" + lockName);
			logger.info(myZnode + "has created");
			//         ,           ,       
			List<String> subNodes = client.getChildren().forPath(root);
			//       lockname     
			List<String> lockObjNodes = new ArrayList<String>();
			for (String node : subNodes) {
				if (node.contains(lockName)) {
					lockObjNodes.add(node);
				}
			}
			//          
			Collections.sort(lockObjNodes);
			//                
			if (myZnode.equals(root + "/" + lockObjNodes.get(0))) {
				doAction();
			} else {
				//                 
				String subMyZone = myZnode
						.substring(myZnode.lastIndexOf("/") + 1);
				waitZnode = lockObjNodes.get(Collections.binarySearch(
						lockObjNodes, subMyZone) - 1);
				//        
				Stat stat = client.checkExists()
						.usingWatcher(deleteNodeWatcher).forPath("/"+waitZnode);
				if (stat != null) {
					System.out.println(Thread.currentThread().getName()
							+ "      ");
				} else {
					doAction();
				}
			}
		} catch (Exception e) {
			logger.error(e.getMessage());
		}
	}

	//          
	CuratorWatcher deleteNodeWatcher = new CuratorWatcher() {

		public void process(WatchedEvent event) throws Exception {

			if (event.getType() == EventType.NodeDeleted) {
				doAction();
			}
		}
	};

	private void doAction() {
		System.out.println(Thread.currentThread().getName() + "    ");
		client.close();
	}
}

テストしてみます
 
/**     
 * @FileName: TestCurrentZk.java   
 * @Package:com.test   
 * @Description: TODO  
 * @author: LUCKY    
 * @date:2016 2 2    11:36:04   
 * @version V1.0     
 */
package com.test;

/**
 * @ClassName: TestCurrentZk
 * @Description: TODO
 * @author: LUCKY
 * @date:2016 2 2    11:36:04
 */
public class TestCurrentZk {

	public static void main(String[] args) throws Exception {
		Thread threads[] = new Thread[10];
		for (int i = 0; i < threads.length; i++) {
			threads[i] = new Thread(new Runnable() {
				public void run() {
					ClientTest clientTest = new ClientTest(
							"100.66.162.36:2181", "locknametest");
					clientTest.getLocks();
				}
			});

			threads[i].start();

		}
		Thread.sleep(Integer.MAX_VALUE);
	}
}