JstormのTopologyコミットクライアント
5374 ワード
1つのtopologyには1つ以上のspout boltが含まれており、spoutはデータソースでデータを取得してboltに送信し、各boltは処理を終えて次のboltに送信する.通常、topologyの作成は、どのspout boltが含まれているかを記録し、各コンポーネントにid競合があるかどうかを検証するTopologyBuilderによって作成されます.検証方法は次のとおりです.
private void validateUnusedId(String id) {
if (_bolts.containsKey(id)) {
throw new IllegalArgumentException(
"Bolt has already been declared for id " + id);
}
if (_spouts.containsKey(id)) {
throw new IllegalArgumentException(
"Spout has already been declared for id " + id);
}
if (_stateSpouts.containsKey(id)) {
throw new IllegalArgumentException(
"State spout has already been declared for id " + id);
}
} <span style="font-family: 'Courier New'; background-color: rgb(255, 255, 255);"> </span>
TopologyBuilderは、次のように各コンポーネントを対応するデータ構造に保存します.public class TopologyBuilder {
// bolt
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
// spout
private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
//
private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
..........
}
コンポーネント構成情報の格納方法は、次のとおりです.private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
if (parallelism != null)
common.set_parallelism_hint(parallelism.intValue());
else {
common.set_parallelism_hint(Integer.valueOf(1));
}
Map conf = component.getComponentConfiguration();
if (conf != null)
common.set_json_conf(Utils.to_json(conf));
_commons.put(id, common);
}
情報を保存すると、topologyフェーズでbuilderはこれらの情報に基づいてStormTopologyインスタンスを作成し、StormSubmitterr.submitTopologyは、1、jarファイルのアップロード2、ジョブのコミットの2段階に分けてコミットします.public static void submitTopology(String name, Map stormConf,
StormTopology topology, SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException {
//
if (!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException(
"Storm conf is not valid. Must be json-serializable");
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
putUserInfo(conf, stormConf);
try {
String serConf = Utils.to_json(stormConf);
if (localNimbus != null) {
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
// topology
if (topologyNameExists(conf, name)) {
throw new RuntimeException("Topology with name `" + name
+ "` already exists on cluster");
}
// jar ,
submitJar(conf);
try {
LOG.info("Submitting topology " + name
+ " in distributed mode with conf " + serConf);
// topology, ServiceHandler submitTopology , topology,
if (opts != null) {
client.getClient().submitTopologyWithOpts(name, path,
serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, path, serConf,
topology);
}
} finally {
client.close();
}
}
LOG.info("Finished submitting topology: " + name);
} catch (InvalidTopologyException e) {
.......
}
}
jarファイルのアップロードは2つの部分を含み、jarファイル自体とそれに依存するライブラリファイルはサービス側に転送され、デフォルトのアップロードbufサイズは512 Kで、nimbusを通過することができる.thrift.max_buffer_sizeはbufサイズを調整し、サービス側が保存するディレクトリ構造は以下の通りです.[hongmin.lhm@rt2l02045 ~]$tree /home/hongmin.lhm/jstorm_data/nimbus/inbox/
/home/hongmin.lhm/jstorm_data/nimbus/inbox/
`-- 7c1b7d1e-9134-4ed8-b664-836271b49bd3
`-- stormjar-7c1b7d1e-9134-4ed8-b664-836271b49bd3.jar
private static void submitJar(Map conf) {
if (submittedJar == null) {
NimbusClient client = NimbusClient.getConfiguredClient(conf);
try {
LOG.info("Jar not uploaded to master yet. Submitting jar...");
String localJar = System.getProperty("storm.jar");
path = client.getClient().beginFileUpload();
String[] pathCache = path.split("/");
String uploadLocation = path + "/stormjar-"
+ pathCache[pathCache.length - 1] + ".jar";
List<String> lib = (List<String>) conf
.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
Map<String, String> libPath = (Map<String, String>) conf
.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
if (lib != null && lib.size() != 0) {
for (String libName : lib) {
String jarPath = path + "/" + libName;
client.getClient().beginLibUpload(jarPath);
submitJar(conf, libPath.get(libName), jarPath, client);
}
if (localJar != null)
submittedJar = submitJar(conf, localJar,
uploadLocation, client);
} else {
submittedJar = submitJar(conf, localJar, uploadLocation,
client);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
client.close();
}
} else {
LOG.info("Jar already uploaded to master. Not submitting jar.");
}
}