Flumeクライアントflume-ng-log 4 jappenderロードバランシング(LoadBalancingLog 4 jAppender)
6103 ワード
ホストDNS構成:
hadoop-maserとmachine-2はホストであり、他のマシンはcollectorマシンとしてHDFSに格納される.
hadoop-masterとmachine-2マシンのflume構成:
machine-1とmachine-0のflume構成:
log 4 jの構成:
テストコード:
まとめ:
最終的には負荷等化の役割を果たすことができるが,性能にはまだ不十分である.
1台のマシンが死ぬと、クライアントはリンクを試み続け、データを他のマシンに転送することに影響します.死んだマシンが回復すると、クライアントがバックアップしたデータがflume agentに再送信されます.データの正確性は達成されていますが、もしこのアプリが落ちたら、対応するログ情報は失われてしまうのではないでしょうか.これは問題であり、さらなる改善が必要である.
192.168.177.167 machine-1
192.168.177.168 machine-2
192.168.177.158 machine-0
192.168.177.174 hadoop-master hbase-master
hadoop-maserとmachine-2はホストであり、他のマシンはcollectorマシンとしてHDFSに格納される.
hadoop-masterとmachine-2マシンのflume構成:
agent.sources=s1
agent.channels=c1
agent.sinks=k1 k2
agent.sinkgroups = g1
agent.sinkgroups.g1.sinks = k1 k2
agent.sinkgroups.g1.processor.type = load_balance
agent.sinkgroups.g1.processor.selector = round_robin
agent.sinkgroups.g1.processor.backoff = true
agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
agent.sources.s1.interceptors=i1
agent.sources.s1.interceptors.i1.type=timestamp
agent.channels.c1.type=jdbc
agent.sinks.k1.channel = c1
agent.sinks.k1.type = avro
agent.sinks.k1.hostname = machine-0
agent.sinks.k1.port = 51515
agent.sinks.k2.channel = c1
agent.sinks.k2.type = avro
agent.sinks.k2.hostname = machine-1
agent.sinks.k2.port = 51515
machine-1とmachine-0のflume構成:
agent.sources=s1
agent.channels=c1
agent.sinks=k1
agent.sources.s1.type=avro
agent.sources.s1.channels=c1
agent.sources.s1.bind=0.0.0.0
agent.sources.s1.port=51515
agent.channels.c1.type=jdbc
agent.sinks.k1.type=hdfs
agent.sinks.k1.channel=c1
agent.sinks.k1.hdfs.path=/flume/%Y/%m
agent.sinks.k1.hdfs.filePrefix=flume
agent.sinks.k1.hdfs.fileSuffix=.log
agent.sinks.k1.hdfs.rollInterval=3600
agent.sinks.k1.hdfs.rollCount=0
agent.sinks.k1.hdfs.rollSize=0
agent.sinks.k1.hdfs.fileType=DataStream
agent.sinks.k1.hdfs.writeFormat=Text
agent.sinks.k1.hdfs.useLocalTimeStamp=false
log 4 jの構成:
# File Appender rootLog
log4j.rootLogger=DEBUG,stdout,rootLog
#console configure for DEV environment
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
log4j.appender.rootLog=org.apache.log4j.RollingFileAppender
log4j.appender.rootLog.File= rootLog.log
log4j.appender.rootLog.MaxFileSize=5000KB
log4j.appender.rootLog.MaxBackupIndex=20
log4j.appender.rootLog.layout=org.apache.log4j.PatternLayout
log4j.appender.rootLog.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
# File Appender boentel
#log4j.logger.com.boentel=DEBUG,boentel
#log4j.additivity.com.boentel=true
#log4j.appender.boentel=org.apache.log4j.RollingFileAppender
#log4j.appender.boentel.File= boentel.log
#log4j.appender.boentel.MaxFileSize=2000KB
#log4j.appender.boentel.MaxBackupIndex=20
#log4j.appender.boentel.layout=org.apache.log4j.PatternLayout
#log4j.appender.boentel.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
log4j.logger.com.loadbalance= DEBUG,loadbalance
log4j.additivity.com.loadbalance= true
log4j.appender.loadbalance = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.loadbalance.Hosts =machine-2:51515 hadoop-master:51515
#log4j.appender.loadbalance.UnsafeMode = true
log4j.appender.out2.MaxBackoff = 30000
#FQDN RANDOM ,default is ROUND_ROBIN
log4j.appender.loadbalance.Selector = RANDOM
log4j.appender.loadbalance.layout=org.apache.log4j.PatternLayout
log4j.appender.loadbalance.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p (%c:%L) - %m%n
テストコード:
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
public class Worker implements Runnable{
private static final Logger LOG = Logger.getLogger(Worker.class);
private String command;
/**
* @param args
*/
public static void main(String[] args) {
new Worker("0").init();
}
public void init(){
int numWorkers = 1;
int threadPoolSize = 3 ;
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(threadPoolSize);
//schedule to run after sometime
System.out.println("Current Time = "+new Date());
Worker worker = null;
for(int i=0; i< numWorkers; i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
worker = new Worker("do heavy processing");
// scheduledThreadPool.schedule(worker, 10, TimeUnit.SECONDS);
//scheduleAtFixedRate
// scheduledThreadPool.scheduleAtFixedRate(worker, 0, 1, TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(worker, 5, 10,
TimeUnit.SECONDS);
}
//add some delay to let some threads spawn by scheduler
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledThreadPool.shutdown();
while(!scheduledThreadPool.isTerminated()){
//wait for all tasks to finish
}
LOG.info("Finished all threads");
}
public Worker(String command){
this.command = command;
}
@Override
public void run() {
LOG.info(Thread.currentThread().getName()+" Start. Command = "+command);
processCommand();
LOG.info(Thread.currentThread().getName()+" End.");
}
private void processCommand() {
try {
for(int i = 1000; i < 1200; i++){
LOG.info("sequence:" + i);
}
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString(){
return this.command;
}
}
まとめ:
最終的には負荷等化の役割を果たすことができるが,性能にはまだ不十分である.
1台のマシンが死ぬと、クライアントはリンクを試み続け、データを他のマシンに転送することに影響します.死んだマシンが回復すると、クライアントがバックアップしたデータがflume agentに再送信されます.データの正確性は達成されていますが、もしこのアプリが落ちたら、対応するログ情報は失われてしまうのではないでしょうか.これは問題であり、さらなる改善が必要である.