flumeのカスタムsinkコンポーネント
18339 ワード
flumeの内部には、logger、file_など、多くのsinkがあります.roll、avro、hdfs、kafak、esなど、eventデータをローカルディスク、または他のサードパーティストレージに直接接続するのに便利です.場合によっては、特別なニーズを達成するためにsourceをカスタマイズする必要があります.この文書では、eventデータをMysqlに格納するためにカスタムsinkを開発する方法について説明します.
1、pom.xml
2、カスタムsinkを開発し、AbstractSinkを継承する
説明:
1)configureメソッドでは,flumeのプロファイルから対応するプロファイル情報を読み取ることができる.プロファイルが変更されるとflumeフレームワークが自動的に再ロードされ、configureメソッドが呼び出されます.
2)startとstopメソッドはflumeの起動と停止時に実行する.
3、梱包:
maven packageを使用してテープをttbrain-log-flume-MysqlSink-jar-with-dependenciesにパッケージします.jar
4、配置:
1)flumeプロファイルの構成:
説明:
A、agent1.sinks.mysql.typeカスタムsinkクラスのフルパスを指定します.
B、agent1.sinks.mysql.deviceKeysは、カスタムsinkのconfigureメソッドで取得できるカスタム構成情報です.
2)ttbrain-log-flume-MysqlSink-jar-with-dependencies.JArをflume_に配置ホームのlibディレクトリの下で;
3)flumeを起動する:
1、pom.xml
4.0.0
com.abc
ttbrain-log
0.0.1-SNAPSHOT
com.abc
ttbrain-log-flume
0.0.1-SNAPSHOT
ttbrain-log-flume
1.7.0
junit
junit
test
org.slf4j
slf4j-log4j12
org.apache.flume
flume-ng-core
${version.flume}
org.apache.flume
flume-ng-configuration
${version.flume}
c3p0
c3p0
com.alibaba
druid
mysql
mysql-connector-java
dev
dev
true
test
test
product
product
ttbrain-log-flume-MysqlSink
${basedir}/filters/filter-${profile.env.name}.properties
src/main/resources
true
**/*.xml
conf/*.properties
**/*.properties
**/*.json
org.apache.maven.plugins
maven-assembly-plugin
2.4
jar-with-dependencies
com.abc.ttbrain.log.flume.sink.MysqlSink
make-assembly
package
single
2、カスタムsinkを開発し、AbstractSinkを継承する
package com.abc.ttbrain.log.flume.sink;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.abc.ttbrain.log.flume.sink.db.DataSourceUtils;
/**
* MysqlSink
* @author kevinliu
*
*/
public class MysqlSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class);
private String tableName = "ttengine_history";
private int batchSize = 100;
private Set deviceKeySet = new HashSet<>();
private String sql = "";
public MysqlSink() {
logger.info("MysqlSink start...");
}
@Override
public void start() {
super.start();
deviceKeySet.add("0B466578-F542-4928-A66B-B12FCFA6DA23");
deviceKeySet.add("CB13721AE4668DEA151F3F72877A2256");
deviceKeySet.add("033C179D-8987-4D08-AF4F-94070F8632E0");
deviceKeySet.add("0CD570CE-4B62-45AA-B86A-7828CFE4F195");
deviceKeySet.add("D9157D45-C17E-4CF9-9C28-EE573F8CAD19");
deviceKeySet.add("3ACE3422774FEE190304604277E389F9");
deviceKeySet.add("2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35");
deviceKeySet.add("A1AD2FAF-86DD-45DE-B85D-7FA23CCDE4C4");
logger.info("deviceKey:"+deviceKeySet.toString());
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(tableName)
.append(" (uid,ppuid,ch_id,f_num,cost,usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,"
+ "rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15"
+ ",p_16,p_17,p_18,p_19,p_20,p_21,p_22,p_23,p_24,p_25,p_26,p_27,p_28,p_29,p_30,p_31,p_32"
+ ",p_33,p_34,p_35,p_36,p_37,p_38,p_39,p_40,p_41,p_42,p_43,p_44,p_45,p_46,p_47,p_48,p_49,p_50"
+ ") values")
.append(" (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,"
+ "?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?"
+ ",?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");
sql = sb.toString();
}
@Override
public void stop() {
super.stop();
//DataSourceUtils.closeDs();
}
@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Transaction transaction = null;
Event event = null;
String content = "";
List actions = Lists.newArrayList();
PreparedStatement preparedStatement = null;
Connection conn = null;
try {
//db
conn = DataSourceUtils.getConnection();
preparedStatement = conn.prepareStatement(sql);
//flume
Channel channel = getChannel();
transaction = channel.getTransaction();
transaction.begin();
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event == null) {
result = Status.BACKOFF;
break;
} else {
content = new String(event.getBody(),"UTF-8");
String[] split = content.split("\t");
if (split.length < 37) {
continue;
}
String uid = split[0];
boolean contains = deviceKeySet.contains(uid);
if (!contains) {
continue;
}
//logger.info(content);
actions.add(content);
}
}
if (actions.size() > 0) {
conn.setAutoCommit(false);
preparedStatement.clearBatch();
//logger.info(actions.get(0));
for (String line : actions) {
String[] split = line.split("\t");
//uid,ppuid,ch_id,f_num,cost,
preparedStatement.setString(1, split[0]);//uid
preparedStatement.setString(2, split[1]);//ppuid
preparedStatement.setString(3, split[2]);//ch_id
preparedStatement.setInt(4, Integer.parseInt(split[3]));//f_num
preparedStatement.setInt(5, Integer.parseInt(split[4]));//cost
//usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,
preparedStatement.setInt(6, Integer.parseInt(split[5]));//usg
preparedStatement.setString(7, split[6]);//prior
preparedStatement.setString(8, split[7]);//req_id
preparedStatement.setString(9, split[8]);//vers
preparedStatement.setString(10, split[9]);//rg
preparedStatement.setInt(11, Integer.parseInt(split[10]));//rh
preparedStatement.setString(12, split[11]);//pg
preparedStatement.setInt(13, Integer.parseInt(split[12]));//ph
preparedStatement.setString(14, split[13]);//sg
preparedStatement.setInt(15, Integer.parseInt(split[14]));//sh
String time_stamp = split[15];
if ("null".equals(time_stamp) || StringUtils.isBlank(time_stamp)) {
time_stamp = "0";
}
preparedStatement.setLong(16, Long.parseLong(time_stamp));//time_stamp
preparedStatement.setString(17, split[16]);//host
//rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15
preparedStatement.setString(18, split[17]);//rec_feed_ids
preparedStatement.setString(19, split[18]);//txt
preparedStatement.setString(20, split[19]);//vedio
preparedStatement.setString(21, split[20]);//gallery
preparedStatement.setString(22, split[21]);//p_1
preparedStatement.setString(23, split[22]);//p_1
preparedStatement.setString(24, split[23]);//p_1
preparedStatement.setString(25, split[24]);//p_1
preparedStatement.setString(26, split[25]);//p_1
preparedStatement.setString(27, split[26]);//p_1
preparedStatement.setString(28, split[27]);//p_1
preparedStatement.setString(29, split[28]);//p_1
preparedStatement.setString(30, split[29]);//p_1
preparedStatement.setString(31, split[30]);//p_1
preparedStatement.setString(32, split[31]);//p_1
preparedStatement.setString(33, split[32]);//p_1
preparedStatement.setString(34, split[33]);//p_1
preparedStatement.setString(35, split[34]);//p_1
preparedStatement.setString(36, split[35]);//p_1
preparedStatement.setString(37, split[36]);
preparedStatement.setString(38, split[37]);
preparedStatement.setString(39, split[38]);
preparedStatement.setString(40, split[39]);
preparedStatement.setString(41, split[40]);
preparedStatement.setString(42, split[41]);
preparedStatement.setString(43, split[42]);
preparedStatement.setString(44, split[43]);
preparedStatement.setString(45, split[44]);
preparedStatement.setString(46, split[45]);
preparedStatement.setString(47, split[46]);
preparedStatement.setString(48, split[47]);
preparedStatement.setString(49, split[48]);
preparedStatement.setString(50, split[49]);
preparedStatement.setString(51, split[50]);
preparedStatement.setString(52, split[51]);
preparedStatement.setString(53, split[52]);
preparedStatement.setString(54, split[53]);
preparedStatement.setString(55, split[54]);
preparedStatement.setString(56, split[55]);
preparedStatement.setString(57, split[56]);
preparedStatement.setString(58, split[57]);
preparedStatement.setString(59, split[58]);
preparedStatement.setString(60, split[59]);
preparedStatement.setString(61, split[60]);
preparedStatement.setString(62, split[61]);
preparedStatement.setString(63, split[62]);
preparedStatement.setString(64, split[63]);
preparedStatement.setString(65, split[64]);
preparedStatement.setString(66, split[65]);
preparedStatement.setString(67, split[66]);
preparedStatement.setString(68, split[67]);
preparedStatement.setString(69, split[68]);
preparedStatement.setString(70, split[69]);
preparedStatement.setString(71, split[70]);
preparedStatement.addBatch();
logger.info("device:{}",split[0]);
}
preparedStatement.executeBatch();
conn.commit();
}
transaction.commit();
} catch (Throwable e) {
try {
if (transaction != null) {
transaction.rollback();
}
} catch (Exception e2) {
logger.error("flume transaction rollback error.", e2);
}
logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e);
//Throwables.propagate(e);
} finally {
if (transaction != null) {
transaction.close();
}
if (preparedStatement != null) {
try {
preparedStatement.close();
} catch (SQLException e) {
logger.error("statement close error.", e);
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
logger.error("connection close error.", e);
}
}
}
return result;
}
@Override
public void configure(Context context) {
String deviceKeys = context.getString("deviceKeys");
if (StringUtils.isNotBlank(deviceKeys)) {
String[] split = deviceKeys.split(",");
for (String deviceKey : split) {
deviceKeySet.add(deviceKey);
}
logger.info("sink configure deivceKeys:"+deviceKeys);
} else {
logger.info("sink configure deivceKeys is empty...");
}
}
}
説明:
1)configureメソッドでは,flumeのプロファイルから対応するプロファイル情報を読み取ることができる.プロファイルが変更されるとflumeフレームワークが自動的に再ロードされ、configureメソッドが呼び出されます.
2)startとstopメソッドはflumeの起動と停止時に実行する.
3、梱包:
maven packageを使用してテープをttbrain-log-flume-MysqlSink-jar-with-dependenciesにパッケージします.jar
4、配置:
1)flumeプロファイルの構成:
agent1.sources = ngrinder
agent1.channels = mc2
agent1.sinks = mysql
#source
agent1.sources.ngrinder.type = exec
agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log
agent1.sources.ngrinder.channels = mc2
#filter
agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4
agent1.sources.ngrinder.interceptors.filt1.type=regex_filter
agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.*
agent1.sources.ngrinder.interceptors.filt2.type=host
agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname
agent1.sources.ngrinder.interceptors.filt2.useIP=true
agent1.sources.ngrinder.interceptors.filt3.type=timestamp
agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder
#channel2
agent1.channels.mc2.type = file
agent1.channels.mc2.checkpointDir = /data/flume/ckdir/mc2_ck
agent1.channels.mc2.dataDirs = /data/flume/datadir/mc2_data
#sink2
agent1.sinks.mysql.type = com.abc.ttbrain.log.flume.sink.MysqlSink
agent1.sinks.mysql.deviceKeys = 2DCFE0C8-2DD6-4FB7-A2E6-1A210F7C7C07,3F4DA241-B827-4FF8-BB3F-624CFDEDA58D,89C574A2-1E44-468F-9AB4-96737D2FF7F2,f614ca8a42bd121a8bb971d89a078a08267b4df2,A78DB69352E74B744EDD15DE2B91BE40,2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35,B9D0A82EE8A4BB9B20CEEBB0977A9CFC,3BD40EB0-8171-4481-8B6E-002DB8C6924D,81F37B3C-5E29-40BC-A030-424E137268C2,E59909BC-D62E-4393-9D06-41BF03F81DA9
agent1.sinks.mysql.channel = mc2
説明:
A、agent1.sinks.mysql.typeカスタムsinkクラスのフルパスを指定します.
B、agent1.sinks.mysql.deviceKeysは、カスタムsinkのconfigureメソッドで取得できるカスタム構成情報です.
2)ttbrain-log-flume-MysqlSink-jar-with-dependencies.JArをflume_に配置ホームのlibディレクトリの下で;
3)flumeを起動する:
nohup flume-ng agent -c /usr/local/apache-flume-1.7.0-bin/conf -f /usr/local/apache-flume-1.7.0-bin/conf/engine-api-log.conf -n agent1 >/dev/null 2>&1 &