Mysqlインクリメンタル書き込みHdfs(二)--Storm+hdfsのフロー処理
13089 ワード
一.概要
前回はmysqlからkafkaにデータを投げ込む方法を紹介しましたが、今回はstormを利用してhdfsにデータを書き込む過程に専念しました.stormがhdfsに書き込むカスタマイズ可能なものが多いので、kafkaから読み取るのではなく、Spoutデータを自分で定義してデータソースとして使用し、次の章で統合します.ここではデフォルトでstormの知識の基礎を持っています.少なくともSpoutとboltが何なのか知っています.
hdfsへの書き込みには、次のカスタムポリシーがあります.カスタム書き込みファイルの名前 書き込みコンテンツフォーマット を定義する.所定の条件が満たされた後に書き込むファイル を変更する.ファイルへの書き込み時にトリガーされるアクション の変更
この記事では、まずstormでHDFSを書き込む方法、いくつかのAPIを書き込む方法、および最後に与えられた例について説明します.
stormは10個のTupleを受け取るたびにhdfs書き込みファイルを変更し、新しいファイルの名前は何回目に変更されます.
ps:stormバージョン:1.1.1.Hadoopバージョン:2.7.4.
次に、まずStormがHDFSにどのように書き込むかを見てみましょう.
二.Storm書き込みHDFS
Storm公式には対応するAPIが提供されており、使用できます.HDFSは、HdfsBoltを作成し、対応するルールを定義することで書き込むことができます.
まずmavenで依存およびプラグインを構成します.
ここでは、クラスタにパッケージ化する場合、パッケージ化されたプラグインはmaven-shade-pluginというプラグインを使用し、maven Lifecycleのpackageを使用してパッケージ化する必要があります.Maven-assembly-pluginプラグインでパッケージ化するのではなく、
Maven-assembly-pluginを使用すると、依存するすべてのパッケージがunpackされ、packになるため、同じファイルが上書きされる場合があります.クラスタに公開されるとNo FileSystem for scheme:hdfsのエラーが報告されます.
次にHdfsBoltを使用してHdfsを書き込みます.公式文書の例を見てみましょう.
ここまでで終わりです.HdfsBoltは1つのStormの中の特殊ないくつかのboltとすることができる.このboltの役割は,受信情報に基づいてHdfsに書き込まれてもよい.
新しいHdfsBoltでは、Stormはかなり柔軟性を提供しています.ある条件が達成されたときに書き込みファイルを変換したり、新しい書き込みファイルの名前を変換したり、書き込み時のセパレータを定義したりすることができます.
使用を選択すると、Stormには一部のインタフェースが用意されていますが、豊富でないと思ったら、対応するクラスをカスタマイズすることもできます.これらの戦略をどのように制御するかを見てみましょう.
RecordFormat
これはインタフェースで、受信したコンテンツのフォーマットを自由に定義できます.
StormはDelimitedRecordFormatを提供しており、使用方法は上記のとおりです.このクラスのデフォルトの分割子はカンマ「,」であり、withFieldDelimiterメソッドで区切り子を変更できます.最初の区切り文字がカンマでない場合は、クラスインプリメンテーションRecordFormatインタフェースを書き換えることもできます.
FileNameFormat
同じインタフェースです.
Stormが提供するデフォルトはorgである.apache.storm.hdfs.format.DefaultFileNameFormat.デフォルトで使用される変換ファイル名は少し長いです.フォーマットは次のとおりです.
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例:
MyBolt-5-7-1390579837830.txt
デフォルトでは、接頭辞は空で、拡張識別子は「.txt」です.
SyncPolicy
同期ポリシーによりbuffered dataをHdfsファイルにバッファリング(クライアントがデータを読み出すことができる)orgを実装することができる.apache.storm.hdfs.sync.SyncPolicyインタフェース:
FileRotationPolicy
このインタフェースでは、書き込みファイルをどのように変換するかを制御できます.
Stormには、インタフェースを実装する3つのクラスがあります.最も簡単なのは変換を行うorgである.apache.storm.hdfs.bolt.rotation.NoRotationPolicy、何もしない. ファイルサイズにより変換をトリガorg.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy. は、時間条件により変換のorgをトリガ.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.
もっと複雑なニーズがあれば、自分で定義することもできます.
RotationAction
これは主に1つ以上のhookを提供し、追加しても追加しなくてもいいです.主に書き込みファイル変換がトリガーされると起動します.
三.一例を実現する
上記の状況を理解すると、書き込み記録の数に応じて書き込み変換(書き込みファイルの変更)を制御し、変換後のファイルの名前が現在何回目の変換であるかを示す例を実現します.
まずHdfsBoltの内容を見てみましょう.
次に、各ポリシーのクラスをそれぞれ見てみましょう.
FileRotationPolicy
FileNameFormat
RotationAction
OK、これで大成功です.上のコードで、10個のTupleを受け取るたびに書き込みファイルが変換され、新しいファイルの名前は何回目の変換です.
完全なコードには、私のgithubで表示できるランダムに生成された文字列のSpoutが含まれています.
StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
推荐阅读:Mysqlストリームインクリメンタル書込みHdfs(一)--mysqlからkafka Spark SQLまで、DataFrameをjsonフォーマットに変換する方法
前回はmysqlからkafkaにデータを投げ込む方法を紹介しましたが、今回はstormを利用してhdfsにデータを書き込む過程に専念しました.stormがhdfsに書き込むカスタマイズ可能なものが多いので、kafkaから読み取るのではなく、Spoutデータを自分で定義してデータソースとして使用し、次の章で統合します.ここではデフォルトでstormの知識の基礎を持っています.少なくともSpoutとboltが何なのか知っています.
hdfsへの書き込みには、次のカスタムポリシーがあります.
この記事では、まずstormでHDFSを書き込む方法、いくつかのAPIを書き込む方法、および最後に与えられた例について説明します.
stormは10個のTupleを受け取るたびにhdfs書き込みファイルを変更し、新しいファイルの名前は何回目に変更されます.
ps:stormバージョン:1.1.1.Hadoopバージョン:2.7.4.
次に、まずStormがHDFSにどのように書き込むかを見てみましょう.
二.Storm書き込みHDFS
Storm公式には対応するAPIが提供されており、使用できます.HDFSは、HdfsBoltを作成し、対応するルールを定義することで書き込むことができます.
まずmavenで依存およびプラグインを構成します.
1.1.1
org.apache.storm
storm-core
${storm.version}
org.slf4j
log4j-over-slf4j
commons-collections
commons-collections
3.2.1
com.google.guava
guava
15.0
org.apache.hadoop
hadoop-client
2.7.4
org.slf4j
slf4j-log4j12
org.apache.hadoop
hadoop-hdfs
2.7.4
org.slf4j
slf4j-log4j12
org.apache.storm
storm-hdfs
1.1.1
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
1.8
1.8
org.codehaus.mojo
exec-maven-plugin
1.2.1
exec
java
true
false
compile
com.learningstorm.kafka.KafkaTopology
org.apache.maven.plugins
maven-shade-plugin
1.7
true
package
shade
ここでは、クラスタにパッケージ化する場合、パッケージ化されたプラグインはmaven-shade-pluginというプラグインを使用し、maven Lifecycleのpackageを使用してパッケージ化する必要があります.Maven-assembly-pluginプラグインでパッケージ化するのではなく、
Maven-assembly-pluginを使用すると、依存するすべてのパッケージがunpackされ、packになるため、同じファイルが上書きされる場合があります.クラスタに公開されるとNo FileSystem for scheme:hdfsのエラーが報告されます.
次にHdfsBoltを使用してHdfsを書き込みます.公式文書の例を見てみましょう.
// "|" ",",
RecordFormat format = new DelimitedRecordFormat()
.withFieldDelimiter("|");
// 1k Hdfs
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// 5MB , ,
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
// ,
FileNameFormat fileNameFormat = new DefaultFileNameFormat()
.withPath("/foo/");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
// bolt
topologyBuilder.setBolt("hdfsBolt", bolt, 5).globalGrouping("randomStrSpout");
ここまでで終わりです.HdfsBoltは1つのStormの中の特殊ないくつかのboltとすることができる.このboltの役割は,受信情報に基づいてHdfsに書き込まれてもよい.
新しいHdfsBoltでは、Stormはかなり柔軟性を提供しています.ある条件が達成されたときに書き込みファイルを変換したり、新しい書き込みファイルの名前を変換したり、書き込み時のセパレータを定義したりすることができます.
使用を選択すると、Stormには一部のインタフェースが用意されていますが、豊富でないと思ったら、対応するクラスをカスタマイズすることもできます.これらの戦略をどのように制御するかを見てみましょう.
RecordFormat
これはインタフェースで、受信したコンテンツのフォーマットを自由に定義できます.
public interface RecordFormat extends Serializable {
byte[] format(Tuple tuple);
}
StormはDelimitedRecordFormatを提供しており、使用方法は上記のとおりです.このクラスのデフォルトの分割子はカンマ「,」であり、withFieldDelimiterメソッドで区切り子を変更できます.最初の区切り文字がカンマでない場合は、クラスインプリメンテーションRecordFormatインタフェースを書き換えることもできます.
FileNameFormat
同じインタフェースです.
public interface FileNameFormat extends Serializable {
void prepare(Map conf, TopologyContext topologyContext);
String getName(long rotation, long timeStamp);
String getPath();
}
Stormが提供するデフォルトはorgである.apache.storm.hdfs.format.DefaultFileNameFormat.デフォルトで使用される変換ファイル名は少し長いです.フォーマットは次のとおりです.
{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
例:
MyBolt-5-7-1390579837830.txt
デフォルトでは、接頭辞は空で、拡張識別子は「.txt」です.
SyncPolicy
同期ポリシーによりbuffered dataをHdfsファイルにバッファリング(クライアントがデータを読み出すことができる)orgを実装することができる.apache.storm.hdfs.sync.SyncPolicyインタフェース:
public interface SyncPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
FileRotationPolicy
このインタフェースでは、書き込みファイルをどのように変換するかを制御できます.
public interface FileRotationPolicy extends Serializable {
boolean mark(Tuple tuple, long offset);
void reset();
}
Stormには、インタフェースを実装する3つのクラスがあります.
もっと複雑なニーズがあれば、自分で定義することもできます.
RotationAction
これは主に1つ以上のhookを提供し、追加しても追加しなくてもいいです.主に書き込みファイル変換がトリガーされると起動します.
public interface RotationAction extends Serializable {
void execute(FileSystem fileSystem, Path filePath) throws IOException;
}
三.一例を実現する
上記の状況を理解すると、書き込み記録の数に応じて書き込み変換(書き込みファイルの変更)を制御し、変換後のファイルの名前が現在何回目の変換であるかを示す例を実現します.
まずHdfsBoltの内容を見てみましょう.
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(" ");
// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(1.0f, FileSizeRotationPolicy.Units.KB);
/** rotate file with Date,every month create a new file
* format:yyyymm.txt
*/
FileRotationPolicy rotationPolicy = new CountStrRotationPolicy();
FileNameFormat fileNameFormat = new TimesFileNameFormat().withPath("/test/");
RotationAction action = new NewFileAction();
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://127.0.0.1:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(action);
次に、各ポリシーのクラスをそれぞれ見てみましょう.
FileRotationPolicy
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.tuple.Tuple;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Hdfs , 10 , , “TimesFileNameFormat”
*
*/
public class CountStrRotationPolicy implements FileRotationPolicy {
private SimpleDateFormat df = new SimpleDateFormat("yyyyMM");
private String date = null;
private int count = 0;
public CountStrRotationPolicy(){
this.date = df.format(new Date());
// this.date = df.format(new Date());
}
/**
* Called for every tuple the HdfsBolt executes.
*
* @param tuple The tuple executed.
* @param offset current offset of file being written
* @return true if a file rotation should be performed
*/
@Override
public boolean mark(Tuple tuple, long offset) {
count ++;
if(count == 10) {
System.out.print("num :" +count + " ");
count = 0;
return true;
}
else {
return false;
}
}
/**
* Called after the HdfsBolt rotates a file.
*/
@Override
public void reset() {
}
@Override
public FileRotationPolicy copy() {
return new CountStrRotationPolicy();
}
}
FileNameFormat
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.task.TopologyContext;
import java.util.Map;
/**
*
* ,
*/
public class TimesFileNameFormat implements FileNameFormat {
//
private String path = "/storm";
//
private String extension = ".txt";
private Long times = new Long(0);
public TimesFileNameFormat withPath(String path){
this.path = path;
return this;
}
@Override
public void prepare(Map conf, TopologyContext topologyContext) {
}
@Override
public String getName(long rotation, long timeStamp) {
times ++ ;
// ,
return times.toString() + this.extension;
}
public String getPath(){
return this.path;
}
}
RotationAction
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
/**
hook , 。
*/
public class NewFileAction implements RotationAction {
private static final Logger LOG = LoggerFactory.getLogger(NewFileAction.class);
@Override
public void execute(FileSystem fileSystem, Path filePath) throws IOException {
LOG.info("Hdfs change the written file!!");
return;
}
}
OK、これで大成功です.上のコードで、10個のTupleを受け取るたびに書き込みファイルが変換され、新しいファイルの名前は何回目の変換です.
完全なコードには、私のgithubで表示できるランダムに生成された文字列のSpoutが含まれています.
StormHdfsDemo:https://github.com/shezhiming/StormHdfsDemo
推荐阅读:Mysqlストリームインクリメンタル書込みHdfs(一)--mysqlからkafka Spark SQLまで、DataFrameをjsonフォーマットに変換する方法