LSFシリーズ-zookeeperによるシンプルなクラスタサービス管理
10997 ワード
実現する機能は簡単で、クラスタ登録サービスである.あるサービスに対して、対応する複数のサービスアドレスを持つことができ、あるサービス機器がサービスを提供し始めると、自分のIPアドレスを登録し、対応クライアントにとって、対応するサービス機器のIPリストを取得する.zkは各サービス機器のサービス状態を知る.
このコードはオンラインで検証されていません..ご参考まで
対応するインタフェースは簡単です.
対応する実装クラスは以下の通りである.
AddressComponent.java
ZookeeperWatcher.java
このコードはオンラインで検証されていません..ご参考まで
対応するインタフェースは簡単です.
package zhenghui.lsf.configserver.service;
/**
* User: zhenghui
* Date: 13-12-22
* Time: 4:57
* .
*/
public interface AddressService {
/**
*
* serviceUniqueName
*
*/
public void setServiceAddresses(String serviceUniqueName,
String address);
/**
*
*
* @param serviceUniqueName
* @return String , null
*/
public String getServiceAddress(String serviceUniqueName);
}
対応する実装クラスは以下の通りである.
AddressComponent.java
package zhenghui.lsf.configserver.impl;
import org.springframework.beans.factory.InitializingBean;
import zhenghui.lsf.configserver.service.AddressService;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* User: zhenghui
* Date: 13-12-22
* Time: 4:58
* zk . , . .
*/
public class AddressComponent extends ZookeeperWatcher implements AddressService, InitializingBean {
private AtomicBoolean inited = new AtomicBoolean(false);
private static final int DEFAULT_TIME_OUT = 30000;
/**
* cache
*/
private ConcurrentHashMap<String, Future<List<String>>> serviceAddressCache = new ConcurrentHashMap<String, Future<List<String>>>();
/**
* zk .
*/
private String zkAdrress = "10.125.195.174:2181";
@Override
public void setServiceAddresses(String serviceUniqueName, String address) {
String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName;
createPath(path, address);
}
private void init() throws Exception {
//
if (!inited.compareAndSet(false, true)) {
return;
}
createConnection(zkAdrress, DEFAULT_TIME_OUT);
}
@Override
public String getServiceAddress(String serviceUniqueName) throws ExecutionException, InterruptedException {
final String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName;
List<String> addressList;
Future<List<String>> future = serviceAddressCache.get(path);
if(future == null){
FutureTask<List<String>> futureTask = new FutureTask(new Callable<List<String>>() {
public List<String> call() {
return getChildren(path, true);
}
});
Future<List<String>> old = serviceAddressCache.putIfAbsent(path, futureTask);
if (old == null) {
futureTask.run();
addressList = futureTask.get();
} else {
addressList = old.get();
}
} else {
addressList = future.get();
}
return addressList.get(new Random().nextInt(addressList.size()));
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
public void setZkAdrress(String zkAdrress) {
this.zkAdrress = zkAdrress;
}
@Override
protected void addressChangeHolder(String path) {
serviceAddressCache.remove(path);
}
}
ZookeeperWatcher.java
package zhenghui.lsf.configserver.impl;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zhenghui.lsf.exception.LSFException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* User: zhenghui
* Date: 13-12-25
* Time: 4:13
* zk
* , zk path. ,path "/zhenghui/lsf/address/interfacename:1.0.0" "/zhenghui/lsf/address" , .
*/
public abstract class ZookeeperWatcher implements Watcher {
private Logger logger = LoggerFactory.getLogger(ZookeeperWatcher.class);
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private ZooKeeper zk;
protected static final String DEFAULT_SERVER_PATH = "/zhenghui/lsf/address";
/**
* path
*/
private static final String DEFAULT_PATH_SUFFIX = "zhenghui";
protected static final String separator = "/";
private static final String charset_utf8 = "utf-8";
private Stat stat = new Stat();
/**
* watch
*/
AtomicInteger seq = new AtomicInteger();
/**
* , .
*/
abstract protected void addressChangeHolder(String path);
/**
* zk
*
*/
protected void createConnection(String connectString, int sessionTimeout) throws LSFException {
//
releaseConnection();
try {
zk = new ZooKeeper(connectString, sessionTimeout, this);
logger.info(connectString + " ZK ");
connectedSemaphore.await();
} catch (Exception e) {
logger.error("zhenghui.lsf.configserver.impl.AddressComponent.createConnection error");
throw new LSFException("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createConnection error", e);
}
}
/**
* ZK
*/
protected void releaseConnection() {
if (zk != null) {
try {
this.zk.close();
} catch (Exception e) {
logger.error("zhenghui.lsf.configserver.impl.AddressComponent.releaseConnection error");
}
}
}
/**
* .
*/
protected boolean createPath(String path, String data) {
try {
// path
Stat stat = exists(path, true);
// ,
if(stat == null){
this.zk.create(path,"zhenghui".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
logger.info(" .path= " + path);
}
String childPath = path + separator + DEFAULT_PATH_SUFFIX;
this.zk.create(childPath,data.getBytes(charset_utf8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info(" .path= " + childPath);
return true;
} catch (Exception e) {
logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createPath",e);
return false;
}
}
protected Stat exists(String path, boolean needWatch) {
try {
return this.zk.exists(path, needWatch);
} catch (Exception e) {
return null;
}
}
/**
*
*
* @param path path
*/
protected List<String> getChildren(String path, boolean needWatch) {
try {
List<String> newServerList = new ArrayList<String>();
List<String> subList = this.zk.getChildren(path, needWatch);
if(subList != null && !subList.isEmpty()){
for (String subNode : subList) {
// server
byte[] data = zk.getData(path + separator + subNode, false, stat);
newServerList.add(new String(data, charset_utf8));
}
}
return newServerList;
} catch (Exception e) {
logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.getChildren", e);
return null;
}
}
@Override
public void process(WatchedEvent event){
// try {
// Thread.sleep(300);
// } catch (Exception e) {}
if (event == null) return;
String logPrefix = "Watch-" + seq.incrementAndGet() + ":";
logger.info(logPrefix + event.toString());
//
Watcher.Event.KeeperState keeperState = event.getState();
//
Watcher.Event.EventType eventType = event.getType();
// path
String path = event.getPath();
if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
// ZK
if (Watcher.Event.EventType.None == eventType) {
logger.info(logPrefix + " ZK ");
connectedSemaphore.countDown();
} else if (Watcher.Event.EventType.NodeChildrenChanged == eventType) {
logger.info(logPrefix + " ");
// DEFAULT_SERVER_PATH , , holder
if(!path.equals(DEFAULT_SERVER_PATH)){
addressChangeHolder(path);
}
}
}
// .
else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
logger.error(logPrefix + " ZK ");
} else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
logger.error(logPrefix + " ");
} else if (Watcher.Event.KeeperState.Expired == keeperState) {
logger.error(logPrefix + " ");
}
}
}