LSFシリーズ-zookeeperによるシンプルなクラスタサービス管理

10997 ワード

実現する機能は簡単で、クラスタ登録サービスである.あるサービスに対して、対応する複数のサービスアドレスを持つことができ、あるサービス機器がサービスを提供し始めると、自分のIPアドレスを登録し、対応クライアントにとって、対応するサービス機器のIPリストを取得する.zkは各サービス機器のサービス状態を知る.
 
このコードはオンラインで検証されていません..ご参考まで
 
対応するインタフェースは簡単です.
package zhenghui.lsf.configserver.service;

/**
 * User: zhenghui
 * Date: 13-12-22
 * Time:   4:57
 *       .
 */
public interface AddressService {

    /**
     *          
     * serviceUniqueName         
     *
     */
    public void setServiceAddresses(String serviceUniqueName,
                                    String address);

    /**
     *          
     *
     * @param serviceUniqueName
     * @return String              ,    null
     */
    public String getServiceAddress(String serviceUniqueName);

}

 
対応する実装クラスは以下の通りである.
 
 
AddressComponent.java
 
 
package zhenghui.lsf.configserver.impl;

import org.springframework.beans.factory.InitializingBean;
import zhenghui.lsf.configserver.service.AddressService;

import java.util.List;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * User: zhenghui
 * Date: 13-12-22
 * Time:   4:58
 *   zk  .      ,        .         .
 */
public class AddressComponent extends ZookeeperWatcher implements AddressService, InitializingBean {

    private AtomicBoolean inited = new AtomicBoolean(false);

    private static final int DEFAULT_TIME_OUT = 30000;

    /**
     *     cache
     */
    private ConcurrentHashMap<String, Future<List<String>>> serviceAddressCache = new ConcurrentHashMap<String, Future<List<String>>>();

    /**
     * zk      .
     */
    private String zkAdrress = "10.125.195.174:2181";

    @Override
    public void setServiceAddresses(String serviceUniqueName, String address) {
        String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName;
        createPath(path, address);
    }

    private void init() throws Exception {
        //         
        if (!inited.compareAndSet(false, true)) {
            return;
        }
        createConnection(zkAdrress, DEFAULT_TIME_OUT);
    }

    @Override
    public String getServiceAddress(String serviceUniqueName) throws ExecutionException, InterruptedException {
        final String path = DEFAULT_SERVER_PATH + separator + serviceUniqueName;
        List<String> addressList;

        Future<List<String>> future = serviceAddressCache.get(path);
        if(future == null){
            FutureTask<List<String>> futureTask = new FutureTask(new Callable<List<String>>() {
                public List<String> call() {
                    return getChildren(path, true);
                }
            });
            Future<List<String>> old = serviceAddressCache.putIfAbsent(path, futureTask);
            if (old == null) {
                futureTask.run();
                addressList = futureTask.get();
            } else {
                addressList = old.get();
            }
        } else {
            addressList = future.get();
        }

        return addressList.get(new Random().nextInt(addressList.size()));
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    }

    public void setZkAdrress(String zkAdrress) {
        this.zkAdrress = zkAdrress;
    }

    @Override
    protected void addressChangeHolder(String path) {
        serviceAddressCache.remove(path);
    }
}
  
 
 
ZookeeperWatcher.java
 
 
 
package zhenghui.lsf.configserver.impl;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zhenghui.lsf.exception.LSFException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * User: zhenghui
 * Date: 13-12-25
 * Time:   4:13
 *   zk   
 *     ,      zk path.         ,path  "/zhenghui/lsf/address/interfacename:1.0.0"       "/zhenghui/lsf/address"      ,     .
 */
public abstract class ZookeeperWatcher implements Watcher {

    private Logger logger = LoggerFactory.getLogger(ZookeeperWatcher.class);

    private CountDownLatch connectedSemaphore = new CountDownLatch(1);

    private ZooKeeper zk;

    protected static final String DEFAULT_SERVER_PATH = "/zhenghui/lsf/address";

    /**
     *   path   
     */
    private static final String DEFAULT_PATH_SUFFIX = "zhenghui";

    protected static final String separator = "/";

    private static final String charset_utf8 = "utf-8";

    private Stat stat = new Stat();

    /**
     *     watch     
     */
    AtomicInteger seq = new AtomicInteger();

    /**
     *     ,        .       
     */
    abstract protected void addressChangeHolder(String path);

    /**
     *   zk  
     *
     */
    protected void createConnection(String connectString, int sessionTimeout) throws LSFException {
        //     
        releaseConnection();
        try {
            zk = new ZooKeeper(connectString, sessionTimeout, this);
            logger.info(connectString + "    ZK   ");
            connectedSemaphore.await();
        } catch (Exception e) {
            logger.error("zhenghui.lsf.configserver.impl.AddressComponent.createConnection error");
            throw new LSFException("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createConnection error", e);
        }
    }

    /**
     *   ZK  
     */
    protected void releaseConnection() {
        if (zk != null) {
            try {
                this.zk.close();
            } catch (Exception e) {
                logger.error("zhenghui.lsf.configserver.impl.AddressComponent.releaseConnection error");
            }
        }
    }

    /**
     *        .
     */
    protected boolean createPath(String path, String data) {
        try {
            //   path    
            Stat stat = exists(path, true);
            //     ,   
            if(stat == null){
                this.zk.create(path,"zhenghui".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                logger.info("       .path= " + path);
            }
            String childPath = path + separator + DEFAULT_PATH_SUFFIX;
            this.zk.create(childPath,data.getBytes(charset_utf8),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            logger.info("       .path= " + childPath);
            return true;
        } catch (Exception e) {
            logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.createPath",e);
            return false;
        }
    }

    protected Stat exists(String path, boolean needWatch) {
        try {
            return this.zk.exists(path, needWatch);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     *      
     *
     * @param path   path
     */
    protected List<String> getChildren(String path, boolean needWatch) {
        try {
            List<String> newServerList = new ArrayList<String>();
            List<String> subList = this.zk.getChildren(path, needWatch);
            if(subList != null && !subList.isEmpty()){
                for (String subNode : subList) {
                    //            server  
                    byte[] data = zk.getData(path + separator + subNode, false, stat);
                    newServerList.add(new String(data, charset_utf8));
                }
            }
            return newServerList;
        } catch (Exception e) {
            logger.error("zhenghui.lsf.configserver.impl.ZookeeperWatcher.getChildren", e);
            return null;
        }
    }

    @Override
    public void process(WatchedEvent event){
//        try {
//            Thread.sleep(300);
//        } catch (Exception e) {}
        if (event == null) return;

        String logPrefix = "Watch-" + seq.incrementAndGet() + ":";
        logger.info(logPrefix + event.toString());
        //     
        Watcher.Event.KeeperState keeperState = event.getState();
        //     
        Watcher.Event.EventType eventType = event.getType();
        //     path
        String path = event.getPath();
        if (Watcher.Event.KeeperState.SyncConnected == keeperState) {
            //      ZK   
            if (Watcher.Event.EventType.None == eventType) {
                logger.info(logPrefix + "     ZK   ");
                connectedSemaphore.countDown();
            }  else if (Watcher.Event.EventType.NodeChildrenChanged == eventType) {
                logger.info(logPrefix + "     ");
                //    DEFAULT_SERVER_PATH       ,        ,     holder
                if(!path.equals(DEFAULT_SERVER_PATH)){
                    addressChangeHolder(path);
                }
            }
        }
        //            .
        else if (Watcher.Event.KeeperState.Disconnected == keeperState) {
            logger.error(logPrefix + " ZK       ");
        } else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {
            logger.error(logPrefix + "      ");
        } else if (Watcher.Event.KeeperState.Expired == keeperState) {
            logger.error(logPrefix + "    ");
        }
    }
}