flumeのカスタムsinkコンポーネント

18339 ワード

flumeの内部には、logger、file_など、多くのsinkがあります.roll、avro、hdfs、kafak、esなど、eventデータをローカルディスク、または他のサードパーティストレージに直接接続するのに便利です.場合によっては、特別なニーズを達成するためにsourceをカスタマイズする必要があります.この文書では、eventデータをMysqlに格納するためにカスタムsinkを開発する方法について説明します.
1、pom.xml
 


  4.0.0
  
    com.abc
    ttbrain-log
    0.0.1-SNAPSHOT
  
  
  com.abc
  ttbrain-log-flume
  0.0.1-SNAPSHOT
  ttbrain-log-flume
  
  
    1.7.0
  
  
  
  
	
      junit
      junit
      test
    
    
		org.slf4j
		slf4j-log4j12
	
    
    
    
       org.apache.flume
       flume-ng-core
       ${version.flume}
    
    
       org.apache.flume
       flume-ng-configuration
       ${version.flume}
    
    
	
	
		c3p0
		c3p0
	
	
		com.alibaba
		druid
	
	
		mysql
		mysql-connector-java
	
  
  
  
		
			dev
			
				dev
			
			
				true
			
		
		
			test
			
				test
			
		
		
			product
			
				product
			
		
	
	
	
    ttbrain-log-flume-MysqlSink
    
			${basedir}/filters/filter-${profile.env.name}.properties
	
	
		
			src/main/resources
			true
			
				**/*.xml
				conf/*.properties
				**/*.properties
				**/*.json
			
		
	
	
			
			
				org.apache.maven.plugins
				maven-assembly-plugin
				2.4
				
					  
						jar-with-dependencies  
					
					
                        
                            com.abc.ttbrain.log.flume.sink.MysqlSink
                        
                     
				
				
					
						make-assembly
						package
						
							single
						
					
				
			
		
  


2、カスタムsinkを開発し、AbstractSinkを継承する
 
 
package com.abc.ttbrain.log.flume.sink;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
import com.abc.ttbrain.log.flume.sink.db.DataSourceUtils;

/**
 * MysqlSink 
 * @author kevinliu
 *
 */
public class MysqlSink extends AbstractSink implements Configurable {
	
	private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class);
	
	private String tableName = "ttengine_history";
	private int batchSize = 100;
	
	private Set deviceKeySet = new HashSet<>();
	private  String sql = "";
	
	public MysqlSink() {
		logger.info("MysqlSink start...");
    }
	
	@Override
    public void start() {
        super.start();
        deviceKeySet.add("0B466578-F542-4928-A66B-B12FCFA6DA23");
        deviceKeySet.add("CB13721AE4668DEA151F3F72877A2256");
        deviceKeySet.add("033C179D-8987-4D08-AF4F-94070F8632E0");
        deviceKeySet.add("0CD570CE-4B62-45AA-B86A-7828CFE4F195");
        deviceKeySet.add("D9157D45-C17E-4CF9-9C28-EE573F8CAD19");
        deviceKeySet.add("3ACE3422774FEE190304604277E389F9");
        deviceKeySet.add("2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35");
        deviceKeySet.add("A1AD2FAF-86DD-45DE-B85D-7FA23CCDE4C4");
        
        logger.info("deviceKey:"+deviceKeySet.toString());
        
        StringBuilder sb = new StringBuilder();
        sb.append("insert into ").append(tableName)
        .append(" (uid,ppuid,ch_id,f_num,cost,usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,"
        		+ "rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15"
        		+ ",p_16,p_17,p_18,p_19,p_20,p_21,p_22,p_23,p_24,p_25,p_26,p_27,p_28,p_29,p_30,p_31,p_32"
        		+ ",p_33,p_34,p_35,p_36,p_37,p_38,p_39,p_40,p_41,p_42,p_43,p_44,p_45,p_46,p_47,p_48,p_49,p_50"
        		+ ") values")
        .append(" (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,"
        		+ "?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?"
        		+ ",?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");
        
        sql = sb.toString();
    }
	
	@Override
    public void stop() {
        super.stop();
        
        //DataSourceUtils.closeDs();
    }
	
	@Override
	public Status process() throws EventDeliveryException {
		Status result = Status.READY;
        Transaction transaction = null;
        Event event = null;
        String content = "";
        List actions = Lists.newArrayList();
        
        PreparedStatement preparedStatement = null;
        Connection conn = null;
        try {
        	//db
        	conn = DataSourceUtils.getConnection();
        	preparedStatement = conn.prepareStatement(sql);
        	
        	//flume
        	Channel channel = getChannel();
        	transaction = channel.getTransaction();
        	transaction.begin();
        	
        	for (int i = 0; i < batchSize; i++) {
        		event = channel.take();
            	if (event == null) {
            		result = Status.BACKOFF;
            		break;
            	} else {
            		content = new String(event.getBody(),"UTF-8");
            		String[] split = content.split("\t");
            		if (split.length < 37) {
            			continue;
            		}
            		String uid = split[0];
            		boolean contains = deviceKeySet.contains(uid);
            		if (!contains) {
            			continue;
            		}
            		//logger.info(content);
            		actions.add(content);
            	}
        	}
        	
        	if (actions.size() > 0) {
        		conn.setAutoCommit(false);
                preparedStatement.clearBatch();
                //logger.info(actions.get(0));
                for (String line : actions) {
                	String[] split = line.split("\t");
                	//uid,ppuid,ch_id,f_num,cost,
                	preparedStatement.setString(1, split[0]);//uid
                    preparedStatement.setString(2, split[1]);//ppuid
                    preparedStatement.setString(3, split[2]);//ch_id
                    preparedStatement.setInt(4, Integer.parseInt(split[3]));//f_num
                    preparedStatement.setInt(5, Integer.parseInt(split[4]));//cost
                    
                    //usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,
                    preparedStatement.setInt(6, Integer.parseInt(split[5]));//usg
                    preparedStatement.setString(7, split[6]);//prior
                    preparedStatement.setString(8, split[7]);//req_id
                    preparedStatement.setString(9, split[8]);//vers
                    preparedStatement.setString(10, split[9]);//rg
                    preparedStatement.setInt(11, Integer.parseInt(split[10]));//rh
                    preparedStatement.setString(12, split[11]);//pg
                    preparedStatement.setInt(13, Integer.parseInt(split[12]));//ph
                    preparedStatement.setString(14, split[13]);//sg
                    preparedStatement.setInt(15, Integer.parseInt(split[14]));//sh
                    String time_stamp = split[15];
                    if ("null".equals(time_stamp) || StringUtils.isBlank(time_stamp)) {
                    	time_stamp = "0";
                    }
                    preparedStatement.setLong(16, Long.parseLong(time_stamp));//time_stamp
                    preparedStatement.setString(17, split[16]);//host
                    
                    
                    //rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15
                    preparedStatement.setString(18, split[17]);//rec_feed_ids
                    preparedStatement.setString(19, split[18]);//txt
                    preparedStatement.setString(20, split[19]);//vedio
                    preparedStatement.setString(21, split[20]);//gallery
                    preparedStatement.setString(22, split[21]);//p_1
                    preparedStatement.setString(23, split[22]);//p_1
                    preparedStatement.setString(24, split[23]);//p_1
                    preparedStatement.setString(25, split[24]);//p_1
                    preparedStatement.setString(26, split[25]);//p_1
                    preparedStatement.setString(27, split[26]);//p_1
                    preparedStatement.setString(28, split[27]);//p_1
                    preparedStatement.setString(29, split[28]);//p_1
                    preparedStatement.setString(30, split[29]);//p_1
                    preparedStatement.setString(31, split[30]);//p_1
                    preparedStatement.setString(32, split[31]);//p_1
                    preparedStatement.setString(33, split[32]);//p_1
                    preparedStatement.setString(34, split[33]);//p_1
                    preparedStatement.setString(35, split[34]);//p_1
                    preparedStatement.setString(36, split[35]);//p_1
                    preparedStatement.setString(37, split[36]);
                    preparedStatement.setString(38, split[37]);
                    preparedStatement.setString(39, split[38]);
                    preparedStatement.setString(40, split[39]);
                    preparedStatement.setString(41, split[40]);
                    preparedStatement.setString(42, split[41]);
                    preparedStatement.setString(43, split[42]);
                    preparedStatement.setString(44, split[43]);
                    preparedStatement.setString(45, split[44]);
                    preparedStatement.setString(46, split[45]);
                    preparedStatement.setString(47, split[46]);
                    preparedStatement.setString(48, split[47]);
                    preparedStatement.setString(49, split[48]);
                    preparedStatement.setString(50, split[49]);
                    preparedStatement.setString(51, split[50]);
                    preparedStatement.setString(52, split[51]);
                    preparedStatement.setString(53, split[52]);
                    preparedStatement.setString(54, split[53]);
                    preparedStatement.setString(55, split[54]);
                    preparedStatement.setString(56, split[55]);
                    preparedStatement.setString(57, split[56]);
                    preparedStatement.setString(58, split[57]);
                    preparedStatement.setString(59, split[58]);
                    preparedStatement.setString(60, split[59]);
                    preparedStatement.setString(61, split[60]);
                    preparedStatement.setString(62, split[61]);
                    preparedStatement.setString(63, split[62]);
                    preparedStatement.setString(64, split[63]);
                    preparedStatement.setString(65, split[64]);
                    preparedStatement.setString(66, split[65]);
                    preparedStatement.setString(67, split[66]);
                    preparedStatement.setString(68, split[67]);
                    preparedStatement.setString(69, split[68]);
                    preparedStatement.setString(70, split[69]);
                    preparedStatement.setString(71, split[70]);
                    
                    
                    preparedStatement.addBatch();
                    
                    logger.info("device:{}",split[0]);
                }
                
                preparedStatement.executeBatch();
                conn.commit();
            }
        	
            transaction.commit();
        } catch (Throwable e) {
            try {
            	if (transaction != null) {
            		transaction.rollback();
            	}
            } catch (Exception e2) {
            	logger.error("flume transaction rollback error.", e2);
            }
            logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e);
            //Throwables.propagate(e);
        } finally {
        	if (transaction != null) {
        		transaction.close();
        	}
        	
        	if (preparedStatement != null) {
        		try {
					preparedStatement.close();
				} catch (SQLException e) {
					logger.error("statement close error.", e);
				}
        	}
        	
        	if (conn != null) {
        		try {
        			conn.close();
				} catch (SQLException e) {
					logger.error("connection close error.", e);
				}
        	}
        }
        return result;
	}

	@Override
	public void configure(Context context) {
		String deviceKeys = context.getString("deviceKeys");
		if (StringUtils.isNotBlank(deviceKeys)) {
			String[] split = deviceKeys.split(",");
			for (String deviceKey : split) {
				deviceKeySet.add(deviceKey);
			}
			logger.info("sink configure deivceKeys:"+deviceKeys);
		} else {
			logger.info("sink configure deivceKeys is empty...");
		}
	}
}

説明:
 
1)configureメソッドでは,flumeのプロファイルから対応するプロファイル情報を読み取ることができる.プロファイルが変更されるとflumeフレームワークが自動的に再ロードされ、configureメソッドが呼び出されます.
2)startとstopメソッドはflumeの起動と停止時に実行する.
3、梱包:
maven packageを使用してテープをttbrain-log-flume-MysqlSink-jar-with-dependenciesにパッケージします.jar
4、配置:
1)flumeプロファイルの構成:
 
agent1.sources = ngrinder
agent1.channels = mc2
agent1.sinks = mysql


#source
agent1.sources.ngrinder.type = exec
agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log
agent1.sources.ngrinder.channels = mc2

#filter
agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4
agent1.sources.ngrinder.interceptors.filt1.type=regex_filter
agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.*
agent1.sources.ngrinder.interceptors.filt2.type=host
agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname
agent1.sources.ngrinder.interceptors.filt2.useIP=true
agent1.sources.ngrinder.interceptors.filt3.type=timestamp
agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder


#channel2
agent1.channels.mc2.type = file
agent1.channels.mc2.checkpointDir = /data/flume/ckdir/mc2_ck
agent1.channels.mc2.dataDirs = /data/flume/datadir/mc2_data

#sink2
agent1.sinks.mysql.type = com.abc.ttbrain.log.flume.sink.MysqlSink
agent1.sinks.mysql.deviceKeys = 2DCFE0C8-2DD6-4FB7-A2E6-1A210F7C7C07,3F4DA241-B827-4FF8-BB3F-624CFDEDA58D,89C574A2-1E44-468F-9AB4-96737D2FF7F2,f614ca8a42bd121a8bb971d89a078a08267b4df2,A78DB69352E74B744EDD15DE2B91BE40,2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35,B9D0A82EE8A4BB9B20CEEBB0977A9CFC,3BD40EB0-8171-4481-8B6E-002DB8C6924D,81F37B3C-5E29-40BC-A030-424E137268C2,E59909BC-D62E-4393-9D06-41BF03F81DA9
agent1.sinks.mysql.channel = mc2

説明:
 
A、agent1.sinks.mysql.typeカスタムsinkクラスのフルパスを指定します.
B、agent1.sinks.mysql.deviceKeysは、カスタムsinkのconfigureメソッドで取得できるカスタム構成情報です.
 
2)ttbrain-log-flume-MysqlSink-jar-with-dependencies.JArをflume_に配置ホームのlibディレクトリの下で;
3)flumeを起動する:
 
nohup flume-ng agent -c /usr/local/apache-flume-1.7.0-bin/conf -f /usr/local/apache-flume-1.7.0-bin/conf/engine-api-log.conf  -n agent1 >/dev/null 2>&1 &