Zoo Keeper完全解析(五)Zoo Keeperを用いて分散型作業スケジュールシステムのJavaを実現する.


前編では、Zoo Keeperを使って分散型作業スケジュールシステムを実現する原理について説明しました.「Zoo Keeper完全解析(4)Zoo Keeperを使って分散型作業スケジュールシステムを実現する原理」とリンクされています. ,この記事では、Javaを使って実現されるいくつかの詳細について説明します.
  注意したいのは、この実現はまだ基礎的な実現です.
一、メインノードとなる:
  実現する時、私はslaaveノードとマスターノードをすべて抽象的になります. ZooNodeノードは、各ノードがマスターノードとなり得るが、違いは誰が先にマスターノードを作成したかにある.作成が成功すれば、このノードはマスターノードに関するタスクを実行します.作成に失敗したら、slaaveノードに関するタスクを実行します.
public class ZooNode {


    private ZooKeeper zooKeeper;

    private String masterId;

    public boolean isMaster;

    private ZooRole zooRole;

    public ZooNode() {

        this.zooKeeper = ZooService.getZooKeeper();
        if (this.zooKeeper == null) {
            throw new NullPointerException();
        }
    }

    /**
     *      
     */
    public void createMaster() throws InterruptedException{
        Random random = new Random();
        String randomMasterId = Integer.toHexString(random.nextInt()) ;
        masterId = randomMasterId;
        try {
            zooKeeper.create("/master", randomMasterId.getBytes(), OPEN_ACL_UNSAFE , CreateMode.EPHEMERAL);
            isMaster = true;
        } catch (KeeperException.NodeExistsException e) {
            isMaster = false;
        } catch (KeeperException.ConnectionLossException e) {
            //       ,                 loss,         ,    master,     
            checkMaster();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

    private void checkMaster() {
        while (true) {
            try {
                byte[] masterIdBytes = zooKeeper.getData("/master", false, new Stat());
                String currentMasterId = new String(masterIdBytes);
                isMaster = masterId.equals(currentMasterId);
                return;
            } catch (KeeperException.ConnectionLossException e) {
                e.printStackTrace();
            }
            catch (KeeperException e) {
                return;
            }
            catch (InterruptedException e) {
                return;
            }
        }
    }


    public void doJob(){

        if (isMaster) {
            zooRole = new ZooMaster();
        } else {
            zooRole = new ZooSlave();
        }

        zooRole.doJob();
    }


}
  見えます createMaster()方法では、マスターノードを作成しようとしますが、作成に成功すれば、isMasterフラグビットはtrueであり、メインノードとなります.作成に失敗したら、isMasterフラグビットはfalseであり、slakeノードです.
 
二、マスターノード
  マスターノードにおいて、私たちはノードを待ち受けます.もし/jobノードに変化が発生したら、割り当てられるノードをアイドルノードの下に割り当てます.具体的な方法は巡回します. /slaaveノードは、その後、対応する/assignノードの下に割り当てられたタスクがあるかどうかを確認し、ない場合は、タスクを割り当てます.
    private void onNodeChange(WatchedEvent event){

    	//          
        if (Watcher.Event.EventType.NodeChildrenChanged != event.getType()) {
            return;
    	}

        try {
        //         ,       ,           true job
        List jobPathList = zooKeeper.getChildren("/job", false);
        if (CollectionUtils.isEmpty(jobPathList)) {
            return;
        }

        JobMessage initJobMessage = null;
        String initJobPath = "";
        for (String jobPath : jobPathList) {
            String jobCurrentPath = "/job/" + jobPath;
            byte[] jobDataByteArray = zooKeeper.getData(jobCurrentPath, false, null);
            String jobData = new String(jobDataByteArray);

            if (StringUtils.isEmpty(jobData)) {
                continue;
            }

            //       jobMessage ,   status  0 ,         ,    
            JobMessage jobMessage = JSON.parseObject(jobData, JobMessage.class);
            if (jobMessage == null) {
                continue;
            }

            //  path    
            if (JobMessage.StatusEnum.INIT.getValue() == jobMessage.getStatus()) {
                initJobMessage = jobMessage;
                initJobPath = jobPath;
                break;
            }
        }

        if (initJobMessage == null) {
            return;
        }

        //    /slave   ,        /assign       ,       / assgin  
        List slaveNodeList = zooKeeper.getChildren("/slave", false);
        if (CollectionUtils.isEmpty(slaveNodeList)) {
            return;
        }

        boolean assignSuccess = false;
        for (String slaveNodePath : slaveNodeList) {

            String assignSlaveCurrentPath = "/assign/" + slaveNodePath;

            //         ,       ,         
            List assignSlaveChildNodeList = zooKeeper.getChildren(assignSlaveCurrentPath, false);
            if (CollectionUtils.isNotEmpty(assignSlaveChildNodeList)) {
                continue;
            }

            //  assign            
            String jobAssignPath = assignSlaveCurrentPath + "/" + initJobPath;
            zooKeeper.create(jobAssignPath, JSON.toJSONString(initJobMessage).getBytes(), OPEN_ACL_UNSAFE , CreateMode.PERSISTENT);
            assignSuccess = true;
            break;
        }
        LogUtils.printLog("    " + (assignSuccess ? "  " : "  "));

    } catch (KeeperException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
以上がマスターの傍受/jobのコールバック方法です.
三、slaaveノード:
  slaaveノードは、自分のサブノードが更新されているかどうかを傍受するために、更新があれば、対応するjobを取得し、タスクを実行し、タスクを実行した後、指定されたタスク状態を変更し、自分のサブノードをクリアする.具体的な実現コードは以下のgithubを見ることができます.
四、対応実現コード:
  Zoo Keeperを用いて実現される分散型作業スケジュールシステムの実現コードは、 コードを実現します.修正意見を歓迎します.
  次のページでは、zooKeeperを使って分散ロックを実現する方法を説明します.「Zoo Keeper完全解析(六)Zoo Keeperを使って分散ロックを実現する原理」とリンクしています.