Zookeeper入門の3-Javaクライアントcuratorの使用
7895 ワード
ZKのjavaクライアント-curator基本使用
一般的な添削変更の実装--同期インタフェース
作成された非同期インプリメンテーション
実行結果は次のとおりです.
Event[code:0, type:CREATE Thread of processResult:pool-3-thread-1 Event[code:-110, type:CREATE Thread of processResult:main-EventThread
次の点に注意してください.最初の非同期コールバックはスレッドプールtpに転送され、オンラインスレッドプールで対応するコールバックを実行するために使用され、結果から、実行スレッドはMainスレッド ではないことがわかる.第2の非同期コールバックは、スレッドプールtpに転送されないため、メインスレッドMain が実行する. countDownLatchここでは、スレッドの実行が終了することを保証するために、shutdownスレッドプール を使用することができる.から返されるevent.getResultCodeが0の場合、操作が成功したことを示し、他の値の場合、成功しなかったことを示します.たとえば-110は、データノードが存在していることを示します.
一般的な添削変更の実装--同期インタフェース
public class CuratorConTest {
static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 3
// --
/* CuratorFramework zkClient = CuratorFrameworkFactory.newClient("localhost:32770",
5000,
3000,
policy);*/
// --
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(policy)
.namespace("zk-jsy") // ,
// namespace, base /base。 /, 。
.build();
public static void main(String[] args) throws Exception {
//
// zkClient.start();
zkFluentClient.start();
//
CuratorConTest test = new CuratorConTest();
test.testCreate();
//
test.testGet();
//
test.testUpdate();
//
test.testDelete();
// Thread.sleep(Integer.MAX_VALUE);
}
private void testCreate() throws Exception {
// 1、 , , ip
//org.apache.zookeeper.KeeperException$UnimplementedException: KeeperErrorCode = Unimplemented for /zk-jsy/book
// zookeeper curator , zk 3.5.1-Alpha, 3.4.8
zkFluentClient.create().forPath("/book1-" + ThreadLocalRandom.current().nextInt());
// 2、
zkFluentClient.create().forPath("/book2-" + ThreadLocalRandom.current().nextFloat(), "mytestbook2Create".getBytes());
// 3、 ,
zkFluentClient.create().withMode(CreateMode.EPHEMERAL).
forPath("/book3-" + ThreadLocalRandom.current().nextInt());
// 4、 , , 。
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL).
forPath("/test/book4-test" + ThreadLocalRandom.current().nextInt());
}
private void testGet() throws Exception {
String path = "/getData/mydata-" + ThreadLocalRandom.current().nextInt();
zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
("sogetdata" + ThreadLocalRandom.current().nextInt()).getBytes());
// 1、 , bytes
String value = new String(zkFluentClient.getData().forPath(path));
System.out.println(value);
// 2、
Stat stat = new Stat();
String value11 = new String(zkFluentClient.getData().storingStatIn(stat).forPath(path));
System.out.println(stat.toString());
System.out.println(value11);
}
private void testUpdate() throws Exception {
String path = "/updateData/mydata-" + ThreadLocalRandom.current().nextInt();
zkFluentClient.create().creatingParentsIfNeeded().forPath(path,
("toBeUpdate" + ThreadLocalRandom.current().nextInt()).getBytes());
System.out.println("originData:" + new String(zkFluentClient.getData().forPath(path)));
// 1、 update, version
Stat stat = zkFluentClient.setData().forPath(path, ("newData" + ThreadLocalRandom.current().nextInt()).getBytes());
System.out.println("newData:" + new String(zkFluentClient.getData().forPath(path)));
// 2、 , CAS, version ,
zkFluentClient.setData().withVersion(stat.getVersion()).forPath(path, ("UpdateByVersion:" + stat.getVersion()).getBytes());
System.out.println("updateByVersionData:" + new String(zkFluentClient.getData().forPath(path)));
// 2.1 cas, version=1, 2
zkFluentClient.setData().withVersion(1).forPath(path,"error".getBytes());
System.out.println("updateByErrorVersionData:" + new String(zkFluentClient.getData().forPath(path)));
// KeeperErrorCode = BadVersion for /zk-jsy/updateData/mydata--1431282676 ,version
}
private void testDelete() throws Exception {
// 1、 ,
String path = "/book/forDelete" + ThreadLocalRandom.current().nextInt();
zkFluentClient.create().creatingParentsIfNeeded().forPath(path);
//Thread.sleep(20000); // sleep
zkFluentClient.delete().forPath(path); //
// 2、 , , /, namespace
zkFluentClient.delete().deletingChildrenIfNeeded().forPath("/book");
// 3、 , , , ,
zkFluentClient.delete().guaranteed().deletingChildrenIfNeeded().forPath("/");
}
}
作成された非同期インプリメンテーション
/**
*
*/
public class AsyncCuratorTest {
static RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); // 3
// --
static CuratorFramework zkFluentClient = CuratorFrameworkFactory.builder()
.connectString("localhost:32770")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(3000)
.retryPolicy(policy)
.namespace("zk-asyncjsy") // ,
// namespace, base /base。 /, 。
.build();
static CountDownLatch countDownLatch = new CountDownLatch(2); // countDownLatch
static ExecutorService tp = Executors.newFixedThreadPool(2); // ThreadPool
public static void main(String[] args) throws Exception {
String path = "/asyncCreate" + ThreadLocalRandom.current().nextInt();
zkFluentClient.start();
// -1
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());
System.out.println("Thread of processResult:" + Thread.currentThread().getName());
countDownLatch.countDown();
}
}, tp).forPath(path, "createInfo".getBytes());
// -2 , ,
zkFluentClient.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("Event[code:" + curatorEvent.getResultCode() + ", type:" + curatorEvent.getType());
System.out.println("Thread of processResult:" + Thread.currentThread().getName());
countDownLatch.countDown();
}
}).forPath(path, "createAgain".getBytes());
countDownLatch.await();
tp.shutdown();
}
}
実行結果は次のとおりです.
Event[code:0, type:CREATE Thread of processResult:pool-3-thread-1 Event[code:-110, type:CREATE Thread of processResult:main-EventThread
次の点に注意してください.