flumeのHFS Sink詳細解(転載)
原文の住所:http://www.aboutyun.com/thread-21422-1-1.html
一、重要な問題点
1.どのような構成がhdfsファイルのクローズに影響しますか?2.HFS Sinkで毎回トリガするイベントは何ですか?3.HFS Sinkにおけるパラメータの優先度はどうなりますか?
二、HFS Sinkとファイルを書く関連配置
hdfs.path->hdfsディレクトリパス
hdfs.filePrefix->ファイルプレフィックス。標準値FlueData
hdfs.fileSuffix->ファイルの拡張子
hdfs.rollInterval->どのぐらいの時間後にclose hdfsファイルを作成しますか?単位は秒で、デフォルトは30秒です。0に設定すると時間によってはclose hdfsファイルが表示されません。
hdfs.rollSize->ファイルサイズが一定値を超えたら、closeファイル。デフォルトの値1024、単位はバイトです。0に設定するとファイルサイズに基づいていないことを示します。
hdfs.rollCount->いくつかのイベントを書き込んだ後、closeファイルを作成します。標準値は10個です。0に設定するとイベント数に基づいていないことを表します。
hdfs.fileType->ファイルフォーマットは、3つのフォーマットがあります。選択できます。
hdfs.batch Size->バッチ数は、HFS Sinkが毎回Channelから取るイベントの個数です。標準値100
hdfs.minBlockReplicas->HFSの各ブロックの最小のreplicas数字は、セットしないとhadoopの中の配置を取ります。
hdfs.maxOpenFiles->最大開いているファイル数を許可します。デフォルトは5000です。この値を超えると、早いファイルは閉じられます。
serializer->HFS Sinkはファイルを書く時、プログレッシブ操作を行います。対応するSerializer言い訳を呼び出しますので、ご希望のSerializerをカスタマイズできます。
hdfs.retryInterval->HFSファイルのクローズに失敗した後、再度オフを試みる遅延数は、単位が秒です。
hdfs.callTimeout->HFS操作が許容される時間、例えばhdfsファイルのopen、write、flash、close操作。単位はミリ秒で、標準値は10000です。
三、HDFSSイベントSink分析
hdfs.pathで、hdfs.filePrefixとhdfs.fileSuffixはそれぞれ/data/%Y/%m/%H,flume,txtを例にして、ソースを分析します。
HDCFEventSinkのprocess方法を直接見る:
次にBucketWriterのアプリとflushの方法を見ます。
apped方法:
各BucketWriterに対応するのは一つのHFS Writerだけです。
HDFSS Writerは一つのインターフェースで、三つの具体的な実装クラスがあります。それぞれfileTypeに対応するのは、DataStream、SequenceFile、CommpresedStreamです。HDFSS Data Streamを例にとって、BUCET Writerで使用するHFSS Writerのいくつかの方法を分析します。
apped方法、hdfsファイルを書く:
hdfs.rollInterval,hdfs.rollSize,hdfs.rollCount,hdfs.minBlockReplicas,hdfs.batSizeの5つの構成はhdfsファイルのクローズに影響しています。
なお、この5つの構成はhdfsファイルであり、hdfsファイルである。hdfsファイルが閉じたら、これらの構成指標は計算を再開します。BucketWriterのopenメソッドでは、reetCountersメソッドを呼び出しますので、この方法はカウンタをリセットします。また、hdfs.rollIntervalに基づくtimedRollFutureスレッドの戻り値は、close方式では破棄されている。したがって、この5つの属性は、closeファイルであり、openファイルである限り、計算を再開します。
hdfs.rollIntervalは時間と関係があり、時間がhdfs.rollInterval配置の秒数に達したら、closeファイルになります。
hdfs.rollSizeは、各eventのバイトサイズに関連しています。一つのeventのバイトが合計されてhdfs.rollSizeに等しい場合、closeファイルになります。
hdfs.rollCountは、イベントの数がhdfs.rollCountに等しい場合、closeファイルになります。
hdfs.batSizeは、イベントがhdfs.batSize個に追加された場合、つまりHDFS Sinkはhdfs.batSize個のイベントを毎回持つということで、これらのすべてのイベントが同じhdfsファイルに書き込まれているため、今回の条件をトリガし、他の4つの構成はすべて達成されていない。そしてファイルをcloseします。
hdfs.minBlockReplicasは、ファイルに対して最も小さいコピーブロック数を期待していることを示しています。したがって、hdfs.rollInterval,hdfs.rollSize,hdfs.rollCountの3つのパラメータを設定し、これらの3つのパラメータは条件に合致していませんでしたが、複数のファイルを生成しました。このパラメータの優先度はhdfs.rollSize,hdfs.rollCountよりも高いです。
一、重要な問題点
1.どのような構成がhdfsファイルのクローズに影響しますか?2.HFS Sinkで毎回トリガするイベントは何ですか?3.HFS Sinkにおけるパラメータの優先度はどうなりますか?
二、HFS Sinkとファイルを書く関連配置
hdfs.path->hdfsディレクトリパス
hdfs.filePrefix->ファイルプレフィックス。標準値FlueData
hdfs.fileSuffix->ファイルの拡張子
hdfs.rollInterval->どのぐらいの時間後にclose hdfsファイルを作成しますか?単位は秒で、デフォルトは30秒です。0に設定すると時間によってはclose hdfsファイルが表示されません。
hdfs.rollSize->ファイルサイズが一定値を超えたら、closeファイル。デフォルトの値1024、単位はバイトです。0に設定するとファイルサイズに基づいていないことを示します。
hdfs.rollCount->いくつかのイベントを書き込んだ後、closeファイルを作成します。標準値は10個です。0に設定するとイベント数に基づいていないことを表します。
hdfs.fileType->ファイルフォーマットは、3つのフォーマットがあります。選択できます。
hdfs.batch Size->バッチ数は、HFS Sinkが毎回Channelから取るイベントの個数です。標準値100
hdfs.minBlockReplicas->HFSの各ブロックの最小のreplicas数字は、セットしないとhadoopの中の配置を取ります。
hdfs.maxOpenFiles->最大開いているファイル数を許可します。デフォルトは5000です。この値を超えると、早いファイルは閉じられます。
serializer->HFS Sinkはファイルを書く時、プログレッシブ操作を行います。対応するSerializer言い訳を呼び出しますので、ご希望のSerializerをカスタマイズできます。
hdfs.retryInterval->HFSファイルのクローズに失敗した後、再度オフを試みる遅延数は、単位が秒です。
hdfs.callTimeout->HFS操作が許容される時間、例えばhdfsファイルのopen、write、flash、close操作。単位はミリ秒で、標準値は10000です。
三、HDFSSイベントSink分析
hdfs.pathで、hdfs.filePrefixとhdfs.fileSuffixはそれぞれ/data/%Y/%m/%H,flume,txtを例にして、ソースを分析します。
HDCFEventSinkのprocess方法を直接見る:
public Status process() throws EventDeliveryException {
// Channel
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
// BucketWriter ,BucketWriter hdfs
List<BucketWriter> writers = Lists.newArrayList();
// Channel
transaction.begin();
try {
int txnEventCount = 0;
// batchSize 。 batchSize hdfs.batchSize
for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
Event event = channel.take();
if (event == null) {
break;
}
// hdfs
String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
// hdfs , fileName hdfs.filePrefix, flume
String realName = BucketPath.escapeString(fileName, event.getHeaders(),
timeZone, needRounding, roundUnit, roundValue, useLocalTime);
// hdfs , path,filePrefix,fileSuffix
// lookupPath /data/2015/07/20/15/flume
String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
BucketWriter bucketWriter;
HDFSWriter hdfsWriter = null;
//
WriterCallback closeCallback = new WriterCallback() {
@Override
public void run(String bucketPath) {
LOG.info("Writer callback called.");
synchronized (sfWritersLock) {
// sfWriters HashMap, maxOpenFiles 。 maxOpenFiles
// hdfs close sfWriters 。 maxOpenFiles
// sfWriters Map key hdfs ,value BucketWriter
sfWriters.remove(bucketPath);
}
}
};
synchronized (sfWritersLock) {
// sfWriters key /data/2015/07/20/15/flume BucketWriter
bucketWriter = sfWriters.get(lookupPath);
if (bucketWriter == null) {
// BucketWriter
// fileType HDFSWriter,fileType 3 , SequenceFile, DataStream or CompressedStream
hdfsWriter = writerFactory.getWriter(fileType);
// BucketWriter, hdfsWriter ,BucketWriter hdfs HDFSWriter
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
// BucketWriter sfWriters
sfWriters.put(lookupPath, bucketWriter);
}
}
// BucketWriter writers
if (!writers.contains(bucketWriter)) {
writers.add(bucketWriter);
}
// hdfs
try {
bucketWriter.append(event);
} catch (BucketClosedException ex) {
LOG.info("Bucket was closed while trying to append, " +
"reinitializing bucket and writing event.");
hdfsWriter = writerFactory.getWriter(fileType);
bucketWriter = initializeBucketWriter(realPath, realName,
lookupPath, hdfsWriter, closeCallback);
synchronized (sfWritersLock) {
sfWriters.put(lookupPath, bucketWriter);
}
bucketWriter.append(event);
}
}
if (txnEventCount == 0) {
sinkCounter.incrementBatchEmptyCount();
} else if (txnEventCount == batchSize) {
sinkCounter.incrementBatchCompleteCount();
} else {
sinkCounter.incrementBatchUnderflowCount();
}
// flush hdfs
for (BucketWriter bucketWriter : writers) {
bucketWriter.flush();
}
//
transaction.commit();
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
sinkCounter.addToEventDrainSuccessCount(txnEventCount);
return Status.READY;
}
} catch (IOException eIO) {
//
transaction.rollback();
LOG.warn("HDFS IO error", eIO);
return Status.BACKOFF;
} catch (Throwable th) {
//
transaction.rollback();
LOG.error("process failed", th);
if (th instanceof Error) {
throw (Error) th;
} else {
throw new EventDeliveryException(th);
}
} finally {
//
transaction.close();
}
}
四、BucketWriter分析次にBucketWriterのアプリとflushの方法を見ます。
apped方法:
public synchronized void append(final Event event)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
// If idleFuture is not null, cancel it before we move forward to avoid a
// close call in the middle of the append.
if(idleFuture != null) {
idleFuture.cancel(false);
// There is still a small race condition - if the idleFuture is already
// running, interrupting it can cause HDFS close operation to throw -
// so we cannot interrupt it while running. If the future could not be
// cancelled, it is already running - wait for it to finish before
// attempting to write.
if(!idleFuture.isDone()) {
try {
idleFuture.get(callTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
LOG.warn("Timeout while trying to cancel closing of idle file. Idle" +
" file close may have failed", ex);
} catch (Exception ex) {
LOG.warn("Error while trying to cancel closing of idle file. ", ex);
}
}
idleFuture = null;
}
// hdfs
if (!isOpen) {
// hdfs
if (closed) {
throw new BucketClosedException("This bucket writer was closed and " +
"this handle is thus no longer valid");
}
// hdfs
open();
}
//
if (shouldRotate()) {
boolean doRotate = true;
if (isUnderReplicated) {
if (maxConsecUnderReplRotations > 0 &&
consecutiveUnderReplRotateCount >= maxConsecUnderReplRotations) {
doRotate = false;
if (consecutiveUnderReplRotateCount == maxConsecUnderReplRotations) {
LOG.error("Hit max consecutive under-replication rotations ({}); " +
"will not continue rolling files under this path due to " +
"under-replication", maxConsecUnderReplRotations);
}
} else {
LOG.warn("Block Under-replication detected. Rotating file.");
}
consecutiveUnderReplRotateCount++;
} else {
consecutiveUnderReplRotateCount = 0;
}
if (doRotate) {
// , 。 close ,
close();
open();
}
}
// event
try {
sinkCounter.incrementEventDrainAttemptCount();
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
// HDFSWriter append
writer.append(event); // could block
return null;
}
});
} catch (IOException e) {
LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" +
bucketPath + ") and rethrowing exception.",
e.getMessage());
try {
close(true);
} catch (IOException e2) {
LOG.warn("Caught IOException while closing file (" +
bucketPath + "). Exception follows.", e2);
}
throw e;
}
// +
processSize += event.getBody().length;
// +1
eventCounter++;
// +1
batchCounter++;
// hdfs.batchSize flush
if (batchCounter == batchSize) {
flush();
}
}
まずopen方法を見て、hdfsファイルを開く方法:private void open() throws IOException, InterruptedException {
// hdfs HDFSWriter
if ((filePath == null) || (writer == null)) {
throw new IOException("Invalid file settings");
}
final Configuration config = new Configuration();
// disable FileSystem JVM shutdown hook
config.setBoolean("fs.automatic.close", false);
// Hadoop is not thread safe when doing certain RPC operations,
// including getFileSystem(), when running under Kerberos.
// open() must be called by one thread at a time in the JVM.
// NOTE: tried synchronizing on the underlying Kerberos principal previously
// which caused deadlocks. See FLUME-1231.
synchronized (staticLock) {
checkAndThrowInterruptedException();
try {
// fileExtensionCounter AtomicLong ,
// , , open 。 BucketWriter open 1
// counter
long counter = fileExtensionCounter.incrementAndGet();
// , flume
// fullFileName flume.1437375933234
String fullFileName = fileName + "." + counter;
// , fullFileName flume.1437375933234.txt
if (fileSuffix != null && fileSuffix.length() > 0) {
fullFileName += fileSuffix;
} else if (codeC != null) {
fullFileName += codeC.getDefaultExtension();
}
// inUsePrefix inUseSuffix。 "" ".tmp"
// buckerPath /data/2015/07/20/15/flume.1437375933234.txt.tmp
bucketPath = filePath + "/" + inUsePrefix
+ fullFileName + inUseSuffix;
// targetPath /data/2015/07/20/15/flume.1437375933234.txt
targetPath = filePath + "/" + fullFileName;
LOG.info("Creating " + bucketPath);
callWithTimeout(new CallRunner<Void>() {
@Override
public Void call() throws Exception {
if (codeC == null) {
// Need to get reference to FS using above config before underlying
// writer does in order to avoid shutdown hook &
// IllegalStateExceptions
if(!mockFsInjected) {
fileSystem = new Path(bucketPath).getFileSystem(
config);
}
// HDFSWriter
writer.open(bucketPath);
} else {
// need to get reference to FS before writer does to
// avoid shutdown hook
if(!mockFsInjected) {
fileSystem = new Path(bucketPath).getFileSystem(
config);
}
// HDFSWriter
writer.open(bucketPath, codeC, compType);
}
return null;
}
});
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw Throwables.propagate(ex);
}
}
}
isClosedMethod = getRefIsClosed();
sinkCounter.incrementConnectionCreatedCount();
//
resetCounters();
// hdfs.rollInterval , close
if (rollInterval > 0) {
Callable<Void> action = new Callable<Void>() {
public Void call() throws Exception {
LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.",
bucketPath, rollInterval);
try {
// Roll the file and remove reference from sfWriters map.
close(true);
} catch(Throwable t) {
LOG.error("Unexpected error", t);
}
return null;
}
};
// 。 timedRollFuture
timedRollFuture = timedRollerPool.schedule(action, rollInterval,
TimeUnit.SECONDS);
}
isOpen = true;
}
flashメソッドは、closeとappedメソッド(処理されたイベント数がバッチ数に等しい)でのみ呼び出されます。
public synchronized void flush() throws IOException, InterruptedException {
checkAndThrowInterruptedException();
if (!isBatchComplete()) { //isBatchComplete batchCount 0。 batchCount 0,
doFlush(); // doFlush HDFSWriter sync , batchCount 0
// idleTimeout ,
if(idleTimeout > 0) {
// if the future exists and couldn't be cancelled, that would mean it has already run
// or been cancelled
if(idleFuture == null || idleFuture.cancel(false)) {
Callable<Void> idleAction = new Callable<Void>() {
public Void call() throws Exception {
LOG.info("Closing idle bucketWriter {} at {}", bucketPath,
System.currentTimeMillis());
if (isOpen) {
close(true);
}
return null;
}
};
idleFuture = timedRollerPool.schedule(idleAction, idleTimeout,
TimeUnit.SECONDS);
}
}
}
}
close方法:public synchronized void close(boolean callCloseCallback)
throws IOException, InterruptedException {
checkAndThrowInterruptedException();
try {
// close flush , batchCount, HDFSWriter sync
flush();
} catch (IOException e) {
LOG.warn("pre-close flush failed", e);
}
boolean failedToClose = false;
LOG.info("Closing {}", bucketPath);
// , HDFSWriter close
CallRunner<Void> closeCallRunner = createCloseCallRunner();
if (isOpen) { //
try {
// HDFSWriter close
callWithTimeout(closeCallRunner);
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
LOG.warn(
"failed to close() HDFSWriter for file (" + bucketPath +
"). Exception follows.", e);
sinkCounter.incrementConnectionFailedCount();
failedToClose = true;
// ,retryInterval
final Callable<Void> scheduledClose =
createScheduledCloseCallable(closeCallRunner);
timedRollerPool.schedule(scheduledClose, retryInterval,
TimeUnit.SECONDS);
}
isOpen = false;
} else {
LOG.info("HDFSWriter is already closed: {}", bucketPath);
}
// timedRollFuture hdfs.rollInterval 。 hdfs.rollInterval 0,
// close , hdfs.rollInterval flush , ,
// timedRollFuture
if (timedRollFuture != null && !timedRollFuture.isDone()) {
timedRollFuture.cancel(false); // do not cancel myself if running!
timedRollFuture = null;
}
// hdfs.idleTimeout,
if (idleFuture != null && !idleFuture.isDone()) {
idleFuture.cancel(false); // do not cancel myself if running!
idleFuture = null;
}
// , ,
if (bucketPath != null && fileSystem != null && !failedToClose) {
// /data/2015/07/20/15/flume.1437375933234.txt.tmp /data/2015/07/20/15/flume.1437375933234.txt
renameBucket(bucketPath, targetPath, fileSystem);
}
if (callCloseCallback) { // callCloseCallback close
// , BucketWriter onCloseCallback
// onCloseCallback HDFSEventSink closeCallback。 sfWriters.remove(bucketPath);
// onCloseCallback true, BucketWriter open 。 。
// onCloseCallback append shouldRotate close false, true
runCloseAction();
closed = true;
}
}
振り返ってみますと、appedメソッドのshuldRotate方法は、shuldRotate方法が実行されていくとファイルを閉じて新しいファイルを再度開くことになります。private boolean shouldRotate() {
boolean doRotate = false;
// HDFSWriter isUnderReplicated , hdfs 。
if (writer.isUnderReplicated()) {
this.isUnderReplicated = true;
doRotate = true;
} else {
this.isUnderReplicated = false;
}
// rollCount hdfs.rollCount。 eventCounter rollCount , close ,
if ((rollCount > 0) && (rollCount <= eventCounter)) {
LOG.debug("rolling: rollCount: {}, events: {}", rollCount, eventCounter);
doRotate = true;
}
// rollSize hdfs.rollSize。processSize 。 processSize rollSize , close ,
if ((rollSize > 0) && (rollSize <= processSize)) {
LOG.debug("rolling: rollSize: {}, bytes: {}", rollSize, processSize);
doRotate = true;
}
return doRotate;
}
五、HDFSS Writer分析各BucketWriterに対応するのは一つのHFS Writerだけです。
HDFSS Writerは一つのインターフェースで、三つの具体的な実装クラスがあります。それぞれfileTypeに対応するのは、DataStream、SequenceFile、CommpresedStreamです。HDFSS Data Streamを例にとって、BUCET Writerで使用するHFSS Writerのいくつかの方法を分析します。
apped方法、hdfsファイルを書く:
@Override
public void append(Event e) throws IOException {
// , serializer write
// serializer org.apache.flume.serialization.EventSerializer
// Serializer BodyTextEventSerializer
serializer.write(e);
}
openメソッド:@Override
public void open(String filePath) throws IOException {
Configuration conf = new Configuration();
// hdfs
Path dstPath = new Path(filePath);
FileSystem hdfs = getDfs(conf, dstPath);
// doOpen
doOpen(conf, dstPath, hdfs);
}
protected void doOpen(Configuration conf,
Path dstPath, FileSystem hdfs) throws
IOException {
if(useRawLocalFileSystem) {
if(hdfs instanceof LocalFileSystem) {
hdfs = ((LocalFileSystem)hdfs).getRaw();
} else {
logger.warn("useRawLocalFileSystem is set to true but file system " +
"is not of type LocalFileSystem: " + hdfs.getClass().getName());
}
}
boolean appending = false;
// FSDataOutputStream, outStream
if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
(dstPath)) {
outStream = hdfs.append(dstPath);
appending = true;
} else {
outStream = hdfs.create(dstPath);
}
// Serializer
serializer = EventSerializerFactory.getInstance(
serializerType, serializerContext, outStream);
if (appending && !serializer.supportsReopen()) {
outStream.close();
serializer = null;
throw new IOException("serializer (" + serializerType +
") does not support append");
}
// must call superclass to check for replication issues
registerCurrentStream(outStream, hdfs, dstPath);
if (appending) {
serializer.afterReopen();
} else {
serializer.afterCreate();
}
}
close方法:@Override
public void close() throws IOException {
serializer.flush();
serializer.beforeClose();
outStream.flush();
outStream.sync();
outStream.close();
unregisterCurrentStream();
}
syncメソッド:@Override
public void sync() throws IOException {
serializer.flush();
outStream.flush();
outStream.sync();
}
isUnider Replicated方法は、Abstract HDCFSRで定義されています。@Override
public boolean isUnderReplicated() {
try {
// replication
int numBlocks = getNumCurrentReplicas();
if (numBlocks == -1) {
return false;
}
int desiredBlocks;
if (configuredMinReplicas != null) {
// hdfs.minBlockReplicas
desiredBlocks = configuredMinReplicas;
} else {
// hdfs.minBlockReplicas hdfs
desiredBlocks = getFsDesiredReplication();
}
// , true
return numBlocks < desiredBlocks;
} catch (IllegalAccessException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (InvocationTargetException e) {
logger.error("Unexpected error while checking replication factor", e);
} catch (IllegalArgumentException e) {
logger.error("Unexpected error while checking replication factor", e);
}
return false;
}
六、まとめhdfs.rollInterval,hdfs.rollSize,hdfs.rollCount,hdfs.minBlockReplicas,hdfs.batSizeの5つの構成はhdfsファイルのクローズに影響しています。
なお、この5つの構成はhdfsファイルであり、hdfsファイルである。hdfsファイルが閉じたら、これらの構成指標は計算を再開します。BucketWriterのopenメソッドでは、reetCountersメソッドを呼び出しますので、この方法はカウンタをリセットします。また、hdfs.rollIntervalに基づくtimedRollFutureスレッドの戻り値は、close方式では破棄されている。したがって、この5つの属性は、closeファイルであり、openファイルである限り、計算を再開します。
hdfs.rollIntervalは時間と関係があり、時間がhdfs.rollInterval配置の秒数に達したら、closeファイルになります。
hdfs.rollSizeは、各eventのバイトサイズに関連しています。一つのeventのバイトが合計されてhdfs.rollSizeに等しい場合、closeファイルになります。
hdfs.rollCountは、イベントの数がhdfs.rollCountに等しい場合、closeファイルになります。
hdfs.batSizeは、イベントがhdfs.batSize個に追加された場合、つまりHDFS Sinkはhdfs.batSize個のイベントを毎回持つということで、これらのすべてのイベントが同じhdfsファイルに書き込まれているため、今回の条件をトリガし、他の4つの構成はすべて達成されていない。そしてファイルをcloseします。
hdfs.minBlockReplicasは、ファイルに対して最も小さいコピーブロック数を期待していることを示しています。したがって、hdfs.rollInterval,hdfs.rollSize,hdfs.rollCountの3つのパラメータを設定し、これらの3つのパラメータは条件に合致していませんでしたが、複数のファイルを生成しました。このパラメータの優先度はhdfs.rollSize,hdfs.rollCountよりも高いです。