Zookeeper入門の3-Javaクライアントcuratorの使用

7895 ワード

ZKのjavaクライアント-curator基本使用
一般的な添削変更の実装--同期インタフェース
 public class CuratorConTest {

    static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); //   3 

    //      --     
/*        CuratorFramework zkClient = CuratorFrameworkFactory.newClient("localhost:32770",
                5000,
                3000,
                policy);*/

    //     --     
    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(policy)
            .namespace("zk-jsy")  //                     ,
            //     namespace,  base     /base。       /,   。
            .build();


    public static void main(String[] args) throws Exception {

        //     
//        zkClient.start();

        zkFluentClient.start();

        //     
        CuratorConTest test = new CuratorConTest();
        test.testCreate();

        //     
        test.testGet();

        //     
        test.testUpdate();

        //     
        test.testDelete();

//        Thread.sleep(Integer.MAX_VALUE);
    }

    private void testCreate() throws Exception {
        // 1、        ,         ,          ip  
        //org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /zk-jsy/book
        //    zookeeper    curator         ,  zk    3.5.1-Alpha,   3.4.8  
        zkFluentClient.create().forPath("/book1-" + ThreadLocalRandom.current().nextInt());

        // 2、         
        zkFluentClient.create().forPath("/book2-" + ThreadLocalRandom.current().nextFloat(), "mytestbook2Create".getBytes());

        // 3、      ,        
        zkFluentClient.create().withMode(CreateMode.EPHEMERAL).
            forPath("/book3-" + ThreadLocalRandom.current().nextInt());

        // 4、      ,          ,        。           
        zkFluentClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL).
            forPath("/test/book4-test" + ThreadLocalRandom.current().nextInt());
    }

    private void testGet() throws Exception {
        String path = "/getData/mydata-" + ThreadLocalRandom.current().nextInt();
        zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
                ("sogetdata" + ThreadLocalRandom.current().nextInt()).getBytes());

        // 1、  ,      bytes
        String value = new String(zkFluentClient.getData().forPath(path));
        System.out.println(value);

        // 2、    
        Stat stat = new Stat();
        String value11 = new String(zkFluentClient.getData().storingStatIn(stat).forPath(path));
        System.out.println(stat.toString());
        System.out.println(value11);
    }

    private void testUpdate() throws Exception {
        String path = "/updateData/mydata-" + ThreadLocalRandom.current().nextInt();
        zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
                ("toBeUpdate" + ThreadLocalRandom.current().nextInt()).getBytes());
        System.out.println("originData:" + new String(zkFluentClient.getData().forPath(path)));

        // 1、    update,  version
        Stat stat = zkFluentClient.setData().forPath(path, ("newData" + ThreadLocalRandom.current().nextInt()).getBytes());
        System.out.println("newData:" + new String(zkFluentClient.getData().forPath(path)));

        // 2、     ,      CAS,  version   ,      
        zkFluentClient.setData().withVersion(stat.getVersion()).forPath(path, ("UpdateByVersion:" + stat.getVersion()).getBytes());
        System.out.println("updateByVersionData:" + new String(zkFluentClient.getData().forPath(path)));

        // 2.1   cas,  version=1,      2
        zkFluentClient.setData().withVersion(1).forPath(path,"error".getBytes());
        System.out.println("updateByErrorVersionData:" + new String(zkFluentClient.getData().forPath(path)));
        // KeeperErrorCode = BadVersion for /zk-jsy/updateData/mydata--1431282676       ,version  
    }

    private void testDelete() throws Exception {

        // 1、    ,                
        String path = "/book/forDelete" + ThreadLocalRandom.current().nextInt();
        zkFluentClient.create().creatingParentsIfNeeded().forPath(path);
        //Thread.sleep(20000);  // sleep           
        zkFluentClient.delete().forPath(path);  //     

        // 2、    ,          ,    /,     namespace     
        zkFluentClient.delete().deletingChildrenIfNeeded().forPath("/book");

        // 3、    ,         ,      ,      ,                   
        zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath("/");
    }
}

作成された非同期インプリメンテーション
/**
 *     
 */
public class AsyncCuratorTest {

    static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); //   3 

    //     --     
    static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
            .connectString("localhost:32770")
            .sessionTimeoutMs(5000)
            .connectionTimeoutMs(3000)
            .retryPolicy(policy)
            .namespace("zk-asyncjsy")  //                     ,
            //     namespace,  base     /base。       /,   。
            .build();

    static CountDownLatch countDownLatch = new CountDownLatch(2); // countDownLatch

    static ExecutorService tp = Executors.newFixedThreadPool(2); // ThreadPool

    public static void main(String[] args) throws Exception {

        String path = "/asyncCreate" + ThreadLocalRandom.current().nextInt();

        zkFluentClient.start();

        //      -1
        zkFluentClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());

                        System.out.println("Thread of processResult:" + Thread.currentThread().getName());

                        countDownLatch.countDown();
                    }
                }, tp).forPath(path, "createInfo".getBytes());

        //      -2 ,   ,     
        zkFluentClient.create().creatingParentsIfNeeded()
                .withMode(CreateMode.EPHEMERAL)
                .inBackground(new BackgroundCallback() {
                    @Override
                    public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                        System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());

                        System.out.println("Thread of processResult:" + Thread.currentThread().getName());

                        countDownLatch.countDown();
                    }
                }).forPath(path, "createAgain".getBytes());

        countDownLatch.await();
        tp.shutdown();
    }
}

実行結果は次のとおりです.
Event[code:0, type:CREATE Thread of processResult:pool-3-thread-1 Event[code:-110, type:CREATE Thread of processResult:main-EventThread
次の点に注意してください.
  • 最初の非同期コールバックはスレッドプールtpに転送され、オンラインスレッドプールで対応するコールバックを実行するために使用され、結果から、実行スレッドはMainスレッド
  • ではないことがわかる.
  • 第2の非同期コールバックは、スレッドプールtpに転送されないため、メインスレッドMain
  • が実行する.
  • countDownLatchここでは、スレッドの実行が終了することを保証するために、shutdownスレッドプール
  • を使用することができる.
  • から返されるevent.getResultCodeが0の場合、操作が成功したことを示し、他の値の場合、成功しなかったことを示します.たとえば-110は、データノードが存在していることを示します.