stormクラスタタスク移行問題スムーズ移行
5337 ワード
最近は古いstormクラスタの一部のタスクを,新しく構築したstormクラスタに移行したい.
いずれも1つのkafkaクラスタから数をとるが,新旧stormクラスタに対応するZKアドレスは異なり,移行後メッセージ消費は続かなかった.
古いstormクラスタで実行されるタスクは、zkクラスタ上のtopic消費オフセットが更新されています.
[zk: localhost(CONNECTED) 21] get/ad_***@***/online-1/partition_9
{"topology":{"id":"c412eaff-42c4-4bc6-baf0-5fff4633be03","name":"ad_***"},"offset":1148171,"partition":9,"broker":{"host":"***","port":9092},"topic":"ad_***"}
cZxid = 0x1a9a2a6758
ctime = Wed Jun 1514:24:58CST 2016
mZxid = 0x1e001f6e2f
mtime = Tue Nov 1518:57:22CST 2016
pZxid = 0x1a9a2a6758
cversion = 0
dataVersion = 836004
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 204
numChildren = 0
[zk: localhost(CONNECTED) 22] get/ad_***@***/online-1/partition_9
{"topology":{"id":"c412eaff-42c4-4bc6-baf0-5fff4633be03","name":"ad_***"},"offset":1148177,"partition":9,"broker":{"host":"***","port":9092},"topic":"ad_***"}
cZxid = 0x1a9a2a6758
ctime = Wed Jun 1514:24:58CST 2016
mZxid = 0x1e00202046
mtime = Tue Nov 1518:58:38CST 2016
pZxid = 0x1a9a2a6758
cversion = 0
dataVersion = 836009
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 204
numChildren = 0
新しいstormクラスタに移行したタスクは、zk上のオフセット量で更新されませんでした.
[zk: localhost(CONNECTED) 29] get/***_log_hz@***/OnLine-2/partition_9
{"topology":{"id":"aeef3c11-4f58-4baa-9f25-cae3bf25e7d2","name":"***_log_hz@***"},"offset":4860271981,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1c0cc06551
ctime = Thu Oct 2011:30:10CST 2016
mZxid = 0x1c49d95a71
mtime = Wed Nov 0916:47:03CST 2016
pZxid = 0x1c0cc06551
cversion = 0
dataVersion = 513318
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0
kafkaspoutソースコードから更新オフセット量の関数が表示されます.
publicvoidcommit() {
...
_state.writeJSON(committedPath(), data);
...
}
_stateの初期化:
Map stateConf = newHashMap(conf);
List zkServers = _spoutConfig.zkServers;
if(zkServers == null) {
zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if(zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = newZkState(stateConf);
そして_spoutConfig.zkServerはプロアクティブに割り当てられていないので、Configを取る.STORM_ZOOKEEPER_SERVERSの構成項目、すなわち新しいstormクラスタ用のzkアドレス:storm.zookeeper.serversこのzkクラスタではstormタスクのzkパスは作成されず、オフセット量も更新されません.
したがって、zkServer変数を元のzookeeperのアドレスにコードで割り当て、オフセット量が更新されるかどうかを確認します.
BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOSTS);
いずれも1つのkafkaクラスタから数をとるが,新旧stormクラスタに対応するZKアドレスは異なり,移行後メッセージ消費は続かなかった.
古いstormクラスタで実行されるタスクは、zkクラスタ上のtopic消費オフセットが更新されています.
[zk: localhost(CONNECTED) 21] get/ad_***@***/online-1/partition_9
{"topology":{"id":"c412eaff-42c4-4bc6-baf0-5fff4633be03","name":"ad_***"},"offset":1148171,"partition":9,"broker":{"host":"***","port":9092},"topic":"ad_***"}
cZxid = 0x1a9a2a6758
ctime = Wed Jun 1514:24:58CST 2016
mZxid = 0x1e001f6e2f
mtime = Tue Nov 1518:57:22CST 2016
pZxid = 0x1a9a2a6758
cversion = 0
dataVersion = 836004
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 204
numChildren = 0
[zk: localhost(CONNECTED) 22] get/ad_***@***/online-1/partition_9
{"topology":{"id":"c412eaff-42c4-4bc6-baf0-5fff4633be03","name":"ad_***"},"offset":1148177,"partition":9,"broker":{"host":"***","port":9092},"topic":"ad_***"}
cZxid = 0x1a9a2a6758
ctime = Wed Jun 1514:24:58CST 2016
mZxid = 0x1e00202046
mtime = Tue Nov 1518:58:38CST 2016
pZxid = 0x1a9a2a6758
cversion = 0
dataVersion = 836009
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 204
numChildren = 0
新しいstormクラスタに移行したタスクは、zk上のオフセット量で更新されませんでした.
[zk: localhost(CONNECTED) 29] get/***_log_hz@***/OnLine-2/partition_9
{"topology":{"id":"aeef3c11-4f58-4baa-9f25-cae3bf25e7d2","name":"***_log_hz@***"},"offset":4860271981,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1c0cc06551
ctime = Thu Oct 2011:30:10CST 2016
mZxid = 0x1c49d95a71
mtime = Wed Nov 0916:47:03CST 2016
pZxid = 0x1c0cc06551
cversion = 0
dataVersion = 513318
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0
kafkaspoutソースコードから更新オフセット量の関数が表示されます.
publicvoidcommit() {
...
_state.writeJSON(committedPath(), data);
...
}
_stateの初期化:
Map stateConf = newHashMap(conf);
List zkServers = _spoutConfig.zkServers;
if(zkServers == null) {
zkServers = (List) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
}
Integer zkPort = _spoutConfig.zkPort;
if(zkPort == null) {
zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
}
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
_state = newZkState(stateConf);
そして_spoutConfig.zkServerはプロアクティブに割り当てられていないので、Configを取る.STORM_ZOOKEEPER_SERVERSの構成項目、すなわち新しいstormクラスタ用のzkアドレス:storm.zookeeper.serversこのzkクラスタではstormタスクのzkパスは作成されず、オフセット量も更新されません.
したがって、zkServer変数を元のzookeeperのアドレスにコードで割り当て、オフセット量が更新されるかどうかを確認します.
#publicfinalstatic String STORM_ZOOKEEPER_SERVERS = "10.***0,10.***1,10.***2";
spoutConfig.zkServers = Lists.newArrayList(Constants.STORM_ZOOKEEPER_SERVERS.split(","));
KafkaSpout kafkaSpout = newKafkaSpout(spoutConfig);
を するため、kafkaspoutは のオフセット を み り、groupidをOnLine-3に し、offsetが を したことがわかります.
[zk: localhost(CONNECTED) 35] get/***_log_hz@***/OnLine-3/partition_9
{"topology":{"id":"f67da6a9-d136-449e-b00c-ed14eefe6a9c","name":"***_log_hz@***"},"offset":5957447236,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1e0050ca16
ctime = Tue Nov 1520:30:32CST 2016
mZxid = 0x1e00516f84
mtime = Tue Nov 1520:31:43CST 2016
pZxid = 0x1e0050ca16
cversion = 0
dataVersion = 34
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0
[zk: localhost(CONNECTED) 36] get/***_log_hz@***/OnLine-3/partition_9
{"topology":{"id":"f67da6a9-d136-449e-b00c-ed14eefe6a9c","name":"***_log_hz@***"},"offset":5957454675,"partition":9,"broker":{"host":"10.***","port":9092},"topic":"***_log"}
cZxid = 0x1e0050ca16
ctime = Tue Nov 1520:30:32CST 2016
mZxid = 0x1e00517378
mtime = Tue Nov 1520:31:45CST 2016
pZxid = 0x1e0050ca16
cversion = 0
dataVersion = 35
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 199
numChildren = 0
したがって, いstormクラスタ を るタスクが, しいstormクラスタ に する ,kafkaspoutのzkパスが わらないことを し,zkアドレスを すれば, をスムーズにすることができる.