【Flume】【ソース分析】flumeでsinkからhdfsまで、ファイルシステムが頻繁にファイルを生成し、ファイルスクロール構成が機能しませんか?
4830 ワード
本人はhdfsのsinkをテストして、sink端のファイルのスクロール構成項目が何の役にも立たないことを発見して、構成は以下の通りです:
ここで配置するのは60秒で、ファイルは1回スクロールして、つまり60秒ごとに、新しい1つのファイルが発生します【前提、flumeのsource端はデータが来ます】
ここで注意
この属性の目的はタイムスタンプに相当するブロックです.そうしないと%Yなどは認識できません.
このプロパティを使用するか、タイムスタンプブロッカーを使用します.
しかし、flumeを起動すると、十数秒実行し、データを書き続け、hdfs側で頻繁にファイルが生成され、数秒おきに新しいファイルが生成され、flumeのログ出力で頻繁にこの文を見ることができます.
[WARN] Block Under-replication detected. Rotating file.
これさえあれば、新しいファイルが生まれます.
コピーブロックがファイルをスクロールしていることを検出し、ソースコードと組み合わせて見ます.
中には固定変数m a x C o n s e c U nderReplRotations=30、つまりコピー中のブロックが読み込まれています.最大30ファイルまでスクロールできます.30回を超えると、このブロックがコピー中であれば、データもスクロールされません.doRotate=false、スクロールされません.
しばらく実行すると30個のファイルが表示されることに気づいた人もいます
上のソースコードと合わせて見てみましょう.
10秒で1回スクロールし、2秒書いた場合、ちょうどそのファイルの内容があるブロックがコピー中であれば、10秒未満でもファイルをスクロールします.ファイルサイズ、イベント数の構成は同じです.
上記の問題を解決するために、プログラムに書かれたファイルがコピーされていることを感知させなければいいのですが、どうすればいいのでしょうか.
isUnderReplicated()メソッドを常にfalseに戻せばよい
この方法は、現在コピーされているブロックと構成で読み込まれたコピーブロックの数を比較することで、構成項目のコピーブロックの数だけを変更できますが、公式に与えられたflume構成項目にはこの項目があります.
hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. デフォルトはhadoopのdfsです.replicationプロパティ、このプロパティのデフォルト値は3です.
ここではこのhadoopの構成もせず、flumeに上記の属性を1として追加すればよい
次のように構成されています.
各位のネットユーザーが指导を惜しまないことを望みます!!!
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=XXX
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
ここで配置するのは60秒で、ファイルは1回スクロールして、つまり60秒ごとに、新しい1つのファイルが発生します【前提、flumeのsource端はデータが来ます】
ここで注意
useLocalTimeStamp
この属性の目的はタイムスタンプに相当するブロックです.そうしないと%Yなどは認識できません.
このプロパティを使用するか、タイムスタンプブロッカーを使用します.
しかし、flumeを起動すると、十数秒実行し、データを書き続け、hdfs側で頻繁にファイルが生成され、数秒おきに新しいファイルが生成され、flumeのログ出力で頻繁にこの文を見ることができます.
[WARN] Block Under-replication detected. Rotating file.
これさえあれば、新しいファイルが生まれます.
コピーブロックがファイルをスクロールしていることを検出し、ソースコードと組み合わせて見ます.
private boolean shouldRotate() {
boolean doRotate = false;
if (writer.isUnderReplicated()) {
this.isUnderReplicated = true;
doRotate = true;
} else {
this.isUnderReplicated = false;
}
if ((rollCount > 0) && (rollCount <= eventCounter)) {
LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
doRotate = true;
}
if ((rollSize > 0) && (rollSize <= processSize)) {
LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
doRotate = true;
}
return doRotate;
}
ファイルをスクロールするかどうかを判断しますが、この中の第1の判断条件は、現在のHDFSWriterがブロックをコピーしているかどうかを判断することです.public boolean isUnderReplicated() {
try {
int numBlocks = getNumCurrentReplicas();
if (numBlocks == -1) {
return false;
}
int desiredBlocks;
if (configuredMinReplicas != null) {
desiredBlocks = configuredMinReplicas;
} else {
desiredBlocks = getFsDesiredReplication();
}
return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {
logger.error("Unexpected error while checking replication factor", e);
}
return false;
}
読み出した構成コピーブロック数と現在コピー中のブロックとを比較することにより、コピー中か否かを判定するif (shouldRotate()) {
boolean doRotate = true;
if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
LOG.error("Hit max consecutive under-replication rotations ({}); " +
"will not continue rolling files under this path due to " +
"under-replication", maxConsecUnderReplRotations);
}
} else {
LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {
consecutiveUnderReplRotateCount = 0;
}
以上の方法で、入り口はshouldRotate()の方法で、つまりrollcountを配置して、rollsizeが0より大きい場合、あなたの配置に従ってスクロールしますが、入り口が入ってきた後、発見して、またブロックがコピーされているかどうかを判断しました.中には固定変数m a x C o n s e c U nderReplRotations=30、つまりコピー中のブロックが読み込まれています.最大30ファイルまでスクロールできます.30回を超えると、このブロックがコピー中であれば、データもスクロールされません.doRotate=false、スクロールされません.
しばらく実行すると30個のファイルが表示されることに気づいた人もいます
上のソースコードと合わせて見てみましょう.
10秒で1回スクロールし、2秒書いた場合、ちょうどそのファイルの内容があるブロックがコピー中であれば、10秒未満でもファイルをスクロールします.ファイルサイズ、イベント数の構成は同じです.
上記の問題を解決するために、プログラムに書かれたファイルがコピーされていることを感知させなければいいのですが、どうすればいいのでしょうか.
isUnderReplicated()メソッドを常にfalseに戻せばよい
この方法は、現在コピーされているブロックと構成で読み込まれたコピーブロックの数を比較することで、構成項目のコピーブロックの数だけを変更できますが、公式に与えられたflume構成項目にはこの項目があります.
hdfs.minBlockReplicas
Specify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath. デフォルトはhadoopのdfsです.replicationプロパティ、このプロパティのデフォルト値は3です.
ここではこのhadoopの構成もせず、flumeに上記の属性を1として追加すればよい
次のように構成されています.
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.path=hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sinks.k1.hdfs.filePrefix=cmcc
a1.sinks.k1.hdfs.minBlockReplicas=1
#a1.sinks.k1.hdfs.fileType=DataStream
#a1.sinks.k1.hdfs.writeFormat=Text
a1.sinks.k1.hdfs.rollInterval=60
a1.sinks.k1.hdfs.rollSize=0
a1.sinks.k1.hdfs.rollCount=0
a1.sinks.k1.hdfs.idleTimeout=0
このようにプログラムは永遠にファイルのあるブロックの複製のためにファイルをスクロールすることはできなくて、あなたの構成項目によってファイルをスクロールするだけで、試してみましょう!!各位のネットユーザーが指导を惜しまないことを望みます!!!