flink実戦(一)flink-sql関連hbase次元データ処理
151414 ワード
概要
最近のプロジェクトではflinkを使用してリアルタイム計算を行い、kafkaからデータを読み出し、次元データであればhbaseに挿入し、リアルタイム計算が必要なデータであればリアルタイム計算を行い、計算結果をMySQLに保存します.リアルタイム計算ではhbaseの次元データが使用される可能性があり、開発の効率化のためにflink-sqlを使用して実現されます.flink-sqlはflinkフロー計算に基づいて高度に抽象化され、開発過程をより簡単に、より効率的にするが、sql実行の背後にある原理を理解するにはflinkフロー計算に関する内容をよく学ぶ必要がある.本文は主にflink-sqlで関連機能を実現する.
次のように仮定します.ユーザデータは、kafkaから読み出し、hbaseに次元データとして保存する必要がある. 商品データは、kafkaから読み出し、hbaseに次元データとして保存する必要がある. オーダーデータは、kafkaから読み取り、リアルタイム計算を行い、最後に計算結果をMySQL(支払い成功後のオーダーデータを示す)に保存します.
ユーザーの毎日の注文数を統計する必要があるとします.
ユーザデータ:userID,userName,sex,address(ユーザID,ユーザ名,性別,アドレス)
商品データ:productID,productName,productType(商品ID,商品名,商品タイプ)
オーダーデータ:orderID,userID,productID,productCount,money,buyDate(オーダーID,ユーザID,商品ID,商品数,商品単価,購入日)
最終計算結果:userName,productType,productName,buyDate,productCount,totalMoney(ユーザー名,商品タイプ,商品名,購入日,商品数量,商品総価格)つまり、ある人がある日あるカテゴリを購入した、ある商品名の商品数を計算するように要求され、いくらかかったか
maven依存
エンティティークラス
実行環境の作成
kafkaが読み出したjson列を具体的にJavaエンティティクラスに変換する
sqlで使用される時間変換関数
hbaseテーブル定義
hbase sink定義
MySQL sink定義
計算タスク定義
じつじかんけいさん
flink-sqlはすごいと思いますが、最初に述べたようにflink-sqlはflinkフロー計算に基づいて高度に抽象化されており、すべての機能を実現することはできません.
注文書の支払いに成功した後、返金できます.返金が完了すると、注文書の状態が失効します.統計結果に返金成功後の関連データが含まれるべきではありません.では、sqlを使用して計算するにはどうすればいいですか.私もわかりませんが、フロー計算を使って操作できるので、次回まとめてみます.
MySQLデータベース接続ツールクラス
最近のプロジェクトではflinkを使用してリアルタイム計算を行い、kafkaからデータを読み出し、次元データであればhbaseに挿入し、リアルタイム計算が必要なデータであればリアルタイム計算を行い、計算結果をMySQLに保存します.リアルタイム計算ではhbaseの次元データが使用される可能性があり、開発の効率化のためにflink-sqlを使用して実現されます.flink-sqlはflinkフロー計算に基づいて高度に抽象化され、開発過程をより簡単に、より効率的にするが、sql実行の背後にある原理を理解するにはflinkフロー計算に関する内容をよく学ぶ必要がある.本文は主にflink-sqlで関連機能を実現する.
次のように仮定します.
ユーザーの毎日の注文数を統計する必要があるとします.
ユーザデータ:userID,userName,sex,address(ユーザID,ユーザ名,性別,アドレス)
商品データ:productID,productName,productType(商品ID,商品名,商品タイプ)
オーダーデータ:orderID,userID,productID,productCount,money,buyDate(オーダーID,ユーザID,商品ID,商品数,商品単価,購入日)
最終計算結果:userName,productType,productName,buyDate,productCount,totalMoney(ユーザー名,商品タイプ,商品名,購入日,商品数量,商品総価格)つまり、ある人がある日あるカテゴリを購入した、ある商品名の商品数を計算するように要求され、いくらかかったか
maven依存
<dependencies>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
<version>1.7.7version>
dependency>
<dependency>
<groupId>redis.clientsgroupId>
<artifactId>jedisartifactId>
<version>2.9.0version>
dependency>
<dependency>
<groupId>log4jgroupId>
<artifactId>log4jartifactId>
<version>1.2.17version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka-0.10_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.58version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-hbase_2.11artifactId>
<version>1.10.0version>
<exclusions>
<exclusion>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-clientartifactId>
exclusion>
<exclusion>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-serverartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-clientsartifactId>
<version>0.10.2.1version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-shaded-clientartifactId>
<version>1.4.3version>
dependency>
<dependency>
<groupId>org.apache.hbasegroupId>
<artifactId>hbase-commonartifactId>
<version>1.4.3version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-sql-client_2.11artifactId>
<version>1.10.0version>
<exclusions>
<exclusion>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-shaded-hadoop2artifactId>
exclusion>
<exclusion>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-shaded-hiveartifactId>
exclusion>
<exclusion>
<groupId>org.eclipse.jettygroupId>
<artifactId>jetty-allartifactId>
exclusion>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
exclusion>
<exclusion>
<groupId>log4jgroupId>
<artifactId>log4jartifactId>
exclusion>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jdbc_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.11artifactId>
<version>1.10.0version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.46version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>druidartifactId>
<version>1.1.21version>
dependency>
<dependency>
<groupId>cglibgroupId>
<artifactId>cglibartifactId>
<version>2.2version>
dependency>
dependencies>
エンティティークラス
@Data
public class User implements Serializable {
private Integer userID;
private String userName;
private String sex;
private String address;
}
@Data
public class Product implements Serializable {
private Integer productID;
private String productName;
private String productType;
}
@Data
public class Order implements Serializable {
private Integer orderID;
private Integer userID;
private Integer productID;
private Integer productCount;
private Double money;
private String buyDate;
}
@Data
public class Result implements Serializable {
private String userName;
private String productType;
private String productName;
private String buyDate;
private Integer productCount;
private Double totalMoney;
}
実行環境の作成
public class Main {
public static void main(String[] args)throws Exception{
//1.
// ,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// sql table ,
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
// , ,
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
//
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// checkpoint
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setTolerableCheckpointFailureNumber(0);
// :3 , 60s
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000));
Map<String, String> maps = new ImmutableMap.Builder<String, String>().
put("kafka.url", args[0]).
put("hbase.zookeeper.quorum", args[1]).
put("dbUrl", args[2]).
put("dbName", args[3]).
put("user", args[4]).
put("psw", args[5]).build();
// , function
env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(maps));
//table
tEnv.registerFunction("time_format", new DateFormatFunction());
//2. hbase
// hbase zookeeper
String zkUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("hbase.zookeeper.quorum", "");
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkUrl);
//2.1 hbase
TableDefine.defineUserHbaseTable(tEnv, conf);//
TableDefine.defineProductHbaseTable(tEnv, conf);//
//3. kafka
//3.1 kafka
String kafkaUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("kafka.url", "");
Properties props = new Properties();
props.setProperty("bootstrap.servers",kafkaUrl);
props.setProperty("group.id", " groupID");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//3.2 Hbase ,
//3.2.1
JobDefine.userJob(env, props);
JobDefine.productJob(env, props);
//
JobDefine.orderJob(env,tEnv,props);
//
env.execute("orderJob");
}
}
kafkaが読み出したjson列を具体的にJavaエンティティクラスに変換する
public class MyKafkaRichFlatMapFunction<OUT> extends RichFlatMapFunction<String, OUT> {
private static Logger logger = LogManager.getLogger(MyKafkaRichFlatMapFunction.class);
@Override
public void flatMap(String value, Collector<OUT> collector) {
try {
if(value != null && !"".equals(value)){
Class<OUT> tClass = (Class<OUT>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
OUT out = JSONObject.parseObject(value, tClass);
collector.collect(out);
}
} catch (Exception e) {
logger.error("AbstractKafkaRichFlatMapFunction :" + value, e);
}
}
}
sqlで使用される時間変換関数
public class DateFormatFunction extends ScalarFunction {
public String eval(Timestamp time, String format) {
return new SimpleDateFormat(format).format(new Date(time.getTime()));
}
}
hbaseテーブル定義
public class TableDefine {
public static void defineUserHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_user");
// hbase rowKey
hBaseTableSource.setRowKey("rowKey", Integer.class);
// hbase , ( ),
hBaseTableSource.addColumn("f", "uid", Integer.class);
hBaseTableSource.addColumn("f", "uname", String.class);
hBaseTableSource.addColumn("f", "sex", String.class);
hBaseTableSource.addColumn("f", "address", String.class);
// flinktable
// , flink-sql , new HBaseTableSource(conf, "t_user") t_user hbase
// TableFunction, , hbase addColumn , rowkey, rowkey userID
tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
}
public static void defineProductHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_product");
// hbase rowKey
hBaseTableSource.setRowKey("rowKey", Integer.class);
// hbase , ( ),
hBaseTableSource.addColumn("f", "pid", Integer.class);
hBaseTableSource.addColumn("f", "pname", String.class);
hBaseTableSource.addColumn("f", "pt", String.class);
// flinktable
// , flink-sql , new HBaseTableSource(conf, "t_product") t_product hbase
// TableFunction, , hbase addColumn , rowkey, rowkey userID
tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
}
}
hbase sink定義
//
public abstract class AbstractHbaseSinkFunction<OUT> extends RichSinkFunction<OUT> {
protected static String cf_String = "f";
protected static byte[] cf = Bytes.toBytes(cf_String);
protected String tableName = null;
private Connection connection;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig()
.getGlobalJobParameters();
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", globalParams.toMap().get("hbase.zookeeper.quorum"));
if (null == connection) {
connection = ConnectionFactory.createConnection(conf);
}
}
@Override
public void invoke(OUT value, Context context) throws Exception {
HTable table = null;
try {
table = (HTable) connection.getTable(TableName.valueOf(tableName));
handle(value, context, table);
} finally {
table.close();
}
}
protected abstract void handle(OUT value, Context context, HTable table) throws Exception;
@Override
public void close() throws Exception {
connection.close();
}
protected byte[] getByteValue(Object value){
if(value==null){
return Bytes.toBytes("");
}else{
return Bytes.toBytes(value.toString());
}
}
}
// sink
public class UserHbaseSinkFunction extends AbstractHbaseSinkFunction<User> {
public UserHbaseSinkFunction(){
tableName = "t_user";
}
@Override
protected void handle(User value, Context context, HTable table) throws Exception {
Integer rowkey1 = value.getUserID();
Put put1 = new Put(Bytes.toBytes(rowkey1));
//
put1.addColumn(cf, Bytes.toBytes("uid"), getByteValue(value.getUserID()));
put1.addColumn(cf, Bytes.toBytes("uname"), getByteValue(value.getUserName()));
put1.addColumn(cf, Bytes.toBytes("sex"), getByteValue(value.getSex()));
put1.addColumn(cf, Bytes.toBytes("addr"), getByteValue(value.getAddress()));
table.put(put1);
}
}
// sink
public class ProductHbaseSinkFunction extends AbstractHbaseSinkFunction<Product> {
public ProductHbaseSinkFunction(){
tableName = "t_product";
}
@Override
protected void handle(Product value, Context context, HTable table) throws Exception {
//
Integer rowkey1 = value.getProductID();
Put put1 = new Put(Bytes.toBytes(rowkey1));
put1.addColumn(cf, Bytes.toBytes("pid"), getByteValue(value.getProductID()));
put1.addColumn(cf, Bytes.toBytes("pname"), getByteValue(value.getProductName()));
put1.addColumn(cf, Bytes.toBytes("pt"), getByteValue(value.getProductType()));
table.put(put1);
}
}
MySQL sink定義
public class OrderMysqlSinkFunction extends RichSinkFunction<List<Result>> {
public static Logger logger = LogManager.getLogger(OrderMysqlSinkFunction.class);
private DataSource dataSource = null;
@Override
public void open(Configuration parameters) throws Exception {
logger.info("MysqlSinkFunction open");
super.open(parameters);
Map<String, String> globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
String dbUrl = globalParams.get("dbUrl");
String dbName = globalParams.get("dbName");
String user = globalParams.get("user");
String psw = globalParams.get("psw");
try {
dataSource = DBUtil.getDataSource(dbUrl, dbName, user, psw);
} catch (Exception e) {
logger.error("" + e);
}
}
@Override
public void invoke(List<Result> results, Context context) throws Exception {
Connection connection = dataSource.getConnection();
PreparedStatement ps = null;
try {
// sql userName, productType, productName, buyDate, productCount, totalMoney
String sql = "INSERT INTO t_result (userName, productType, productName, buyDate, productCount, totalMoney) " +
" values (?,?,?,?,?,?) on duplicate key " +
" update productCount = values(productCount),totalMoney=values(totalMoney) ";
ps = connection.prepareStatement(sql);
for (Result record : results) {
ps.setObject(1, record.getUserName());
ps.setObject(2, record.getProductType());
ps.setObject(3, record.getProductName());
ps.setObject(4, record.getBuyDate());
ps.setObject(5, record.getProductCount());
ps.setObject(6, record.getTotalMoney());
ps.addBatch();
}
ps.executeBatch();
}catch (Exception e){
logger.error(" :"+e);
throw new RuntimeException("gcoll_vmotask_sum :"+e);
}finally {
DBUtil.close(ps);
}
}
@Override
public void close() {
logger.info("MysqlSinkFunction close");
DBUtil.closeDataSource();
}
}
計算タスク定義
public class JobDefine {
// hbase
public static void userJob(StreamExecutionEnvironment env,Properties props){
// kafka source
SingleOutputStreamOperator<String> userSource = env.addSource(new FlinkKafkaConsumer010<>("user topic", new SimpleStringSchema(), props));
// kafka ( json SimpleStringSchema) Java , .returns(User.class),
SingleOutputStreamOperator<User> userMessageStream=userSource.flatMap(new MyKafkaRichFlatMapFunction<User>()).returns(User.class);
// Java hbase
userMessageStream.addSink(new UserHbaseSinkFunction()).name("UserHbaseSinkFunction");
}
// hbase
public static void productJob(StreamExecutionEnvironment env,Properties props){
// kafka source
SingleOutputStreamOperator<String> productSource = env.addSource(new FlinkKafkaConsumer010<>("product topic", new SimpleStringSchema(), props));
// kafka ( json SimpleStringSchema) Java , .returns(Product.class),
SingleOutputStreamOperator<Product> productStream=productSource.flatMap(new MyKafkaRichFlatMapFunction<Product>()).returns(Product.class);
// Java hbase
productStream.addSink(new ProductHbaseSinkFunction()).name("ProductHbaseSinkFunction");
}
//
public static void orderJob(StreamExecutionEnvironment env, StreamTableEnvironment tEnv,Properties props){
SingleOutputStreamOperator orderSource = env.addSource(new FlinkKafkaConsumer010<>("order topic", new SimpleStringSchema(), props));
// kafka ( json SimpleStringSchema) Java , .returns(Order.class),
SingleOutputStreamOperator<Order> orderMessageStream = orderSource.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order.class);
// flinktable , ,
tEnv.createTemporaryView("t_order", orderMessageStream);
// order MySQL
new OrderFunction().handle(orderMessageStream, env,tEnv);
}
}
じつじかんけいさん
public class OrderFunction {
public void handle(SingleOutputStreamOperator streamOperator,StreamExecutionEnvironment env,
StreamTableEnvironment tEnv){
/**
* , :
* 1. ,select Result , Field types of query result and registered TableSink do not match
* flink Result , select ,
* Result , :[buyDate: STRING, productCount: INT, productName: STRING, productType: STRING, totalMoney: DOUBLE, userName: STRING]
* select :[STRING, INT, STRING, STRING,DOUBLE,STRING]
* sql (sum(o.productCount) ,o.buyDate ):select sum(o.productCount) ,o.buyDate,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname Field types of query result and registered TableSink do not match
* sql (pt pname ):select o.buyDate,sum(o.productCount),p.pt,p.pname,sum(o.money*o.productCount) ,u.uname, Result
* productName pt ,productType pname
* 2. hbase , , username uname, select hbase , p.pname,u.uname
* 3. select result , sql
* 4. t_order JobDefine table 【 】 , tEnv.createTemporaryView("t_order", orderMessageStream);
* 5. t_user TableDefine table 【 】 , :tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
* 6. t_product TableDefine table 【 】 , :tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
*
*/
Table collectionTaskResult = tEnv.sqlQuery("select o.buyDate,sum(o.productCount) ,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname" +
" from t_order o,lateral table(t_user(o.userID)) u,lateral table(t_product(o.productID)) p " +
" where o.userID=u.uid and o.productID=p.pid group by u.uname,p.pt,p.pname,o.buyDate");
DataStream<Tuple2<Boolean, Result>> stream = tEnv.toRetractStream(collectionTaskResult, Result.class);
//windAll , 10 , ,
stream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new ProcessAllWindowFunction<Tuple2<Boolean, Result>, List<Result>, TimeWindow>() {
//values
public void process(Context context, Iterable<Tuple2<Boolean, Result>> values, Collector<List<Result>> out) throws Exception {
Iterator<Tuple2<Boolean, Result>> iterator = values.iterator();
Map<String, Result> result = Maps.newHashMap();
while (iterator.hasNext()) {
Tuple2<Boolean, Result> tuple = iterator.next();
//
if (tuple.getField(0)) {
Result item = tuple.getField(1);
// map
result.put(item.getUserName()+":"+item.getProductType()+":"+item.getProductName()+":"+item.getBuyDate(), item);
}
}
out.collect(Lists.newArrayList(result.values()));
}
}).setParallelism(1)// 1,
// Java MySQL
.addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction");
}
}
flink-sqlはすごいと思いますが、最初に述べたようにflink-sqlはflinkフロー計算に基づいて高度に抽象化されており、すべての機能を実現することはできません.
注文書の支払いに成功した後、返金できます.返金が完了すると、注文書の状態が失効します.統計結果に返金成功後の関連データが含まれるべきではありません.では、sqlを使用して計算するにはどうすればいいですか.私もわかりませんが、フロー計算を使って操作できるので、次回まとめてみます.
MySQLデータベース接続ツールクラス
public class DBUtil {
private static Logger logger = LogManager.getLogger(DBUtil.class);
private static DruidDataSource dataSource;
public static DruidDataSource getDataSource(String dbUrl, String dbName, String username, String password) {
try {
if (dataSource == null) {
synchronized (DBUtil.class) {
if (dataSource == null) {
dataSource = new DruidDataSource();
dataSource.setDriverClassName("com.mysql.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://#/%?autoReconnect=true&useUnicode=true&useAffectedRows=true&characterEncoding=utf8".replace("#", dbUrl).replace("%", dbName));
dataSource.setUsername(username);
dataSource.setPassword(password);
//configuration
dataSource.setInitialSize(20);
dataSource.setMinIdle(20);
dataSource.setMaxActive(100);
dataSource.setMaxWait(60000);
dataSource.setTimeBetweenEvictionRunsMillis(60000);
dataSource.setMinEvictableIdleTimeMillis(300000);
dataSource.setValidationQuery("SELECT 1 FROM DUAL");
dataSource.setTestWhileIdle(true);
dataSource.setTestOnBorrow(false);
dataSource.setTestOnReturn(false);
dataSource.setPoolPreparedStatements(true);
dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
dataSource.setFilters("stat,wall,log4j");
dataSource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");
}
}
}
} catch (Exception e) {
logger.error(" ....", e);
}
return dataSource;
}
/**
*
*/
public static DataSource getDataSource() {
return dataSource;
}
/**
*
*/
public static void closeDataSource() {
logger.info("method datasource close !");
if (dataSource != null) {
dataSource.close();
}
}
/**
*
*/
public static Connection getConnection() {
Connection conn = null;
try {
conn = dataSource.getConnection();
} catch (SQLException e) {
logger.error(" ", e);
}
return conn;
}
/**
*
*/
public static void close(Connection conn,Statement st,ResultSet rs) {
if(rs!=null) {
try {
rs.close();
} catch (SQLException e) {
logger.error(" ", e);
}
}
close(conn,st);
}
public static void close(Connection conn,Statement st) {
if(st!=null) {
try {
st.close();
} catch (SQLException e) {
logger.error(" Statement ", e);
}
}
close(conn);
}
public static void close(Statement st) {
if(st!=null) {
try {
st.close();
} catch (SQLException e) {
logger.error(" Statement ", e);
}
}
}
public static void close(Connection conn) {
if(conn!=null) {
try {
conn.close();
} catch (SQLException e) {
logger.error(" SConnection ", e);
}
}
}
public static void commit(Connection conn){
if(conn!=null) {
try {
conn.commit();
} catch (SQLException e) {
logger.error(" ", e);
}
}
}
public static void rollback(Connection conn){
if(conn!=null) {
try {
conn.rollback();
} catch (SQLException e) {
logger.error(" ", e);
}
}
}
}