flink実戦(一)flink-sql関連hbase次元データ処理

151414 ワード

概要
最近のプロジェクトではflinkを使用してリアルタイム計算を行い、kafkaからデータを読み出し、次元データであればhbaseに挿入し、リアルタイム計算が必要なデータであればリアルタイム計算を行い、計算結果をMySQLに保存します.リアルタイム計算ではhbaseの次元データが使用される可能性があり、開発の効率化のためにflink-sqlを使用して実現されます.flink-sqlはflinkフロー計算に基づいて高度に抽象化され、開発過程をより簡単に、より効率的にするが、sql実行の背後にある原理を理解するにはflinkフロー計算に関する内容をよく学ぶ必要がある.本文は主にflink-sqlで関連機能を実現する.
次のように仮定します.
  • ユーザデータは、kafkaから読み出し、hbaseに次元データとして保存する必要がある.
  • 商品データは、kafkaから読み出し、hbaseに次元データとして保存する必要がある.
  • オーダーデータは、kafkaから読み取り、リアルタイム計算を行い、最後に計算結果をMySQL(支払い成功後のオーダーデータを示す)に保存します.

  • ユーザーの毎日の注文数を統計する必要があるとします.
    ユーザデータ:userID,userName,sex,address(ユーザID,ユーザ名,性別,アドレス)
    商品データ:productID,productName,productType(商品ID,商品名,商品タイプ)
    オーダーデータ:orderID,userID,productID,productCount,money,buyDate(オーダーID,ユーザID,商品ID,商品数,商品単価,購入日)
    最終計算結果:userName,productType,productName,buyDate,productCount,totalMoney(ユーザー名,商品タイプ,商品名,購入日,商品数量,商品総価格)つまり、ある人がある日あるカテゴリを購入した、ある商品名の商品数を計算するように要求され、いくらかかったか
    maven依存
    <dependencies>
            <dependency>
                <groupId>org.slf4jgroupId>
                <artifactId>slf4j-apiartifactId>
                <version>1.7.7version>
            dependency>
            <dependency>
                <groupId>redis.clientsgroupId>
                <artifactId>jedisartifactId>
                <version>2.9.0version>
            dependency>
            <dependency>
                <groupId>log4jgroupId>
                <artifactId>log4jartifactId>
                <version>1.2.17version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clients_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-javaartifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-java_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-kafka-0.10_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-commonartifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-scala_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.58version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-hbase_2.11artifactId>
                <version>1.10.0version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.hbasegroupId>
                        <artifactId>hbase-clientartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>org.apache.hbasegroupId>
                        <artifactId>hbase-serverartifactId>
                    exclusion>
                exclusions>
            dependency>
            <dependency>
                <groupId>org.apache.kafkagroupId>
                <artifactId>kafka-clientsartifactId>
                <version>0.10.2.1version>
            dependency>
            <dependency>
                <groupId>org.apache.hbasegroupId>
                <artifactId>hbase-shaded-clientartifactId>
                <version>1.4.3version>
            dependency>
            <dependency>
                <groupId>org.apache.hbasegroupId>
                <artifactId>hbase-commonartifactId>
                <version>1.4.3version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-sql-client_2.11artifactId>
                <version>1.10.0version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.flinkgroupId>
                        <artifactId>flink-shaded-hadoop2artifactId>
                    exclusion>
                    <exclusion>
                        <groupId>org.apache.flinkgroupId>
                        <artifactId>flink-shaded-hiveartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>org.eclipse.jettygroupId>
                        <artifactId>jetty-allartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>org.slf4jgroupId>
                        <artifactId>slf4j-apiartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>log4jgroupId>
                        <artifactId>log4jartifactId>
                    exclusion>
                    <exclusion>
                        <groupId>org.slf4jgroupId>
                        <artifactId>slf4j-log4j12artifactId>
                    exclusion>
                exclusions>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-jdbc_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-planner-blink_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-api-java-bridge_2.11artifactId>
                <version>1.10.0version>
            dependency>
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <version>5.1.46version>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>druidartifactId>
                <version>1.1.21version>
            dependency>
            <dependency>
                <groupId>cglibgroupId>
                <artifactId>cglibartifactId>
                <version>2.2version>
            dependency>
        dependencies>
    

    エンティティークラス
    @Data
    public class User implements Serializable {
        private Integer userID;
        private String userName;
        private String sex;
        private String address;
    }
    @Data
    public class Product implements Serializable {
        private Integer productID;
        private String productName;
        private String productType;
    }
    @Data
    public class Order implements Serializable {
        private Integer orderID;
        private Integer userID;
        private Integer productID;
        private Integer productCount;
        private Double money;
        private String buyDate;
    }
    @Data
    public class Result implements Serializable {
        private String userName;
        private String productType;
        private String productName;
        private String buyDate;
        private Integer productCount;
        private Double totalMoney;
    }
    

    実行環境の作成
    public class Main {
        public static void main(String[] args)throws Exception{
            //1.       
            //       ,        
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //      sql table  ,      
            EnvironmentSettings settings =  EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    
            //    ,          ,                  
            env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
            //          
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            // checkpoint     
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            checkpointConfig.setTolerableCheckpointFailureNumber(0);
            //      :3   ,      60s
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000));
            Map<String, String> maps = new ImmutableMap.Builder<String, String>().
                    put("kafka.url", args[0]).
                    put("hbase.zookeeper.quorum", args[1]).
                    put("dbUrl", args[2]).
                    put("dbName", args[3]).
                    put("user", args[4]).
                    put("psw", args[5]).build();
            //           ,      function   
            env.getConfig().setGlobalJobParameters(ParameterTool.fromMap(maps));
            //table       
            tEnv.registerFunction("time_format", new DateFormatFunction());
    
            //2.   hbase  
            //        hbase zookeeper  
            String zkUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("hbase.zookeeper.quorum", "");
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", zkUrl);
    
            //2.1   hbase 
            TableDefine.defineUserHbaseTable(tEnv, conf);//     
            TableDefine.defineProductHbaseTable(tEnv, conf);//     
    
    
            //3.   kafka  
            //3.1         kafka    
            String kafkaUrl = env.getConfig().getGlobalJobParameters().toMap().getOrDefault("kafka.url", "");
            Properties props = new Properties();
            props.setProperty("bootstrap.servers",kafkaUrl);
            props.setProperty("group.id", "   groupID");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            //3.2     Hbase                          ,         
            //3.2.1        
            JobDefine.userJob(env, props);
            JobDefine.productJob(env, props);
    
            //       
            JobDefine.orderJob(env,tEnv,props);
            //    
            env.execute("orderJob");
        }
    }
    

    kafkaが読み出したjson列を具体的にJavaエンティティクラスに変換する
    public class MyKafkaRichFlatMapFunction<OUT> extends RichFlatMapFunction<String, OUT> {
        private static Logger logger = LogManager.getLogger(MyKafkaRichFlatMapFunction.class);
        @Override
        public void flatMap(String value, Collector<OUT> collector) {
            try {
                if(value != null && !"".equals(value)){
                    Class<OUT> tClass = (Class<OUT>)((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0];
                    OUT out = JSONObject.parseObject(value, tClass);
                    collector.collect(out);
                }
            } catch (Exception e) {
                logger.error("AbstractKafkaRichFlatMapFunction     :" + value, e);
            }
        }
    }
    

    sqlで使用される時間変換関数
    public class DateFormatFunction extends ScalarFunction {
        public String eval(Timestamp time, String format) {
            return new SimpleDateFormat(format).format(new Date(time.getTime()));
        }
    }
    

    hbaseテーブル定義
    public class TableDefine {
    
        public static void defineUserHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
            HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_user");
            //  hbase  rowKey    
            hBaseTableSource.setRowKey("rowKey", Integer.class);
            //  hbase                 ,         (                ),        
            hBaseTableSource.addColumn("f", "uid", Integer.class);
            hBaseTableSource.addColumn("f", "uname", String.class);
            hBaseTableSource.addColumn("f", "sex", String.class);
            hBaseTableSource.addColumn("f", "address", String.class);
            // flinktable       
            //             , flink-sql    , new HBaseTableSource(conf, "t_user")  t_user hbase   
            //         TableFunction,             , hbase addColumn    ,   rowkey,    rowkey userID    
            tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
        }
    
        public static void defineProductHbaseTable(StreamTableEnvironment tEnv,Configuration conf){
            HBaseTableSource hBaseTableSource = new HBaseTableSource(conf, "t_product");
            //  hbase  rowKey    
            hBaseTableSource.setRowKey("rowKey", Integer.class);
            //  hbase                 ,         (                ),        
            hBaseTableSource.addColumn("f", "pid", Integer.class);
            hBaseTableSource.addColumn("f", "pname", String.class);
            hBaseTableSource.addColumn("f", "pt", String.class);
            // flinktable       
            //             , flink-sql    , new HBaseTableSource(conf, "t_product")  t_product hbase   
            //         TableFunction,             , hbase addColumn    ,   rowkey,    rowkey userID    
            tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
        }
    }
    

    hbase sink定義
    //   
    public abstract class AbstractHbaseSinkFunction<OUT> extends RichSinkFunction<OUT> {
        protected static String cf_String = "f";
        protected static byte[] cf = Bytes.toBytes(cf_String);
        protected String tableName = null;
        private Connection connection;
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ExecutionConfig.GlobalJobParameters globalParams = getRuntimeContext().getExecutionConfig()
                    .getGlobalJobParameters();
            org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum", globalParams.toMap().get("hbase.zookeeper.quorum"));
            if (null == connection) {
                connection = ConnectionFactory.createConnection(conf);
            }
        }
    
        @Override
        public void invoke(OUT value, Context context) throws Exception {
            HTable table = null;
            try  {
                table = (HTable) connection.getTable(TableName.valueOf(tableName));
                handle(value, context, table);
            } finally {
                table.close();
            }
        }
    
        protected abstract void handle(OUT value, Context context, HTable table) throws Exception;
    
        @Override
        public void close() throws Exception {
            connection.close();
        }
    
        protected byte[] getByteValue(Object value){
            if(value==null){
                return Bytes.toBytes("");
            }else{
                return Bytes.toBytes(value.toString());
            }
        }
    
    }
    //  sink
    public class UserHbaseSinkFunction extends AbstractHbaseSinkFunction<User> {
    
        public UserHbaseSinkFunction(){
            tableName = "t_user";
        }
        @Override
        protected void handle(User value, Context context, HTable table) throws Exception {
            Integer rowkey1 = value.getUserID();
            Put put1 = new Put(Bytes.toBytes(rowkey1));
            //                   
            put1.addColumn(cf, Bytes.toBytes("uid"), getByteValue(value.getUserID()));
            put1.addColumn(cf, Bytes.toBytes("uname"), getByteValue(value.getUserName()));
            put1.addColumn(cf, Bytes.toBytes("sex"), getByteValue(value.getSex()));
            put1.addColumn(cf, Bytes.toBytes("addr"), getByteValue(value.getAddress()));
            table.put(put1);
        }
    }
    //  sink
    public class ProductHbaseSinkFunction extends AbstractHbaseSinkFunction<Product> {
        public ProductHbaseSinkFunction(){
            tableName = "t_product";
        }
        @Override
        protected void handle(Product value, Context context, HTable table) throws Exception {
             //                   
            Integer rowkey1 = value.getProductID();
            Put put1 = new Put(Bytes.toBytes(rowkey1));
            put1.addColumn(cf, Bytes.toBytes("pid"), getByteValue(value.getProductID()));
            put1.addColumn(cf, Bytes.toBytes("pname"), getByteValue(value.getProductName()));
            put1.addColumn(cf, Bytes.toBytes("pt"), getByteValue(value.getProductType()));
            table.put(put1);
        }
    }
    

    MySQL sink定義
    public class OrderMysqlSinkFunction extends RichSinkFunction<List<Result>> {
        public static Logger logger = LogManager.getLogger(OrderMysqlSinkFunction.class);
        private DataSource dataSource = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            logger.info("MysqlSinkFunction open");
            super.open(parameters);
            Map<String, String> globalParams = getRuntimeContext().getExecutionConfig().getGlobalJobParameters().toMap();
    
            String dbUrl = globalParams.get("dbUrl");
            String dbName = globalParams.get("dbName");
            String user = globalParams.get("user");
            String psw = globalParams.get("psw");
            try {
                dataSource = DBUtil.getDataSource(dbUrl, dbName, user, psw);
            } catch (Exception e) {
                logger.error("" + e);
            }
        }
    
        @Override
        public void invoke(List<Result> results, Context context) throws Exception {
            Connection connection = dataSource.getConnection();
            PreparedStatement ps = null;
            try {
                //  sql userName, productType, productName, buyDate, productCount, totalMoney
                String sql = "INSERT INTO t_result (userName, productType, productName, buyDate, productCount, totalMoney) " +
                        " values (?,?,?,?,?,?) on duplicate key " +
                        " update productCount = values(productCount),totalMoney=values(totalMoney) ";
                ps = connection.prepareStatement(sql);
    
                for (Result record : results) {
                    ps.setObject(1, record.getUserName());
                    ps.setObject(2, record.getProductType());
                    ps.setObject(3, record.getProductName());
                    ps.setObject(4, record.getBuyDate());
                    ps.setObject(5, record.getProductCount());
                    ps.setObject(6, record.getTotalMoney());
                    ps.addBatch();
                }
                ps.executeBatch();
            }catch (Exception e){
                logger.error("      :"+e);
                throw new RuntimeException("gcoll_vmotask_sum       :"+e);
            }finally {
                DBUtil.close(ps);
            }
        }
    
        @Override
        public void close() {
            logger.info("MysqlSinkFunction close");
            DBUtil.closeDataSource();
        }
    }
    

    計算タスク定義
    public class JobDefine {
    //        hbase
        public static void userJob(StreamExecutionEnvironment env,Properties props){
            //  kafka   source 
            SingleOutputStreamOperator<String> userSource = env.addSource(new FlinkKafkaConsumer010<>("user    topic", new SimpleStringSchema(), props));
            // kafka     ( json   SimpleStringSchema)   Java       ,      .returns(User.class),       
            SingleOutputStreamOperator<User> userMessageStream=userSource.flatMap(new MyKafkaRichFlatMapFunction<User>()).returns(User.class);
            // Java     hbase 
            userMessageStream.addSink(new UserHbaseSinkFunction()).name("UserHbaseSinkFunction");
        }
    //        hbase
        public static void productJob(StreamExecutionEnvironment env,Properties props){
            //  kafka   source 
            SingleOutputStreamOperator<String> productSource = env.addSource(new FlinkKafkaConsumer010<>("product    topic", new SimpleStringSchema(), props));
            // kafka     ( json   SimpleStringSchema)   Java       ,      .returns(Product.class),       
            SingleOutputStreamOperator<Product> productStream=productSource.flatMap(new MyKafkaRichFlatMapFunction<Product>()).returns(Product.class);
            // Java     hbase 
            productStream.addSink(new ProductHbaseSinkFunction()).name("ProductHbaseSinkFunction");
        }
    //            
        public static void orderJob(StreamExecutionEnvironment env, StreamTableEnvironment tEnv,Properties props){
            SingleOutputStreamOperator orderSource = env.addSource(new FlinkKafkaConsumer010<>("order    topic", new SimpleStringSchema(), props));
            // kafka     ( json   SimpleStringSchema)   Java       ,      .returns(Order.class),       
            SingleOutputStreamOperator<Order> orderMessageStream = orderSource.flatMap(new MyKafkaRichFlatMapFunction<Order>()).returns(Order.class);
            //         flinktable  ,         ,          
            tEnv.createTemporaryView("t_order", orderMessageStream);
            // order               MySQL 
            new OrderFunction().handle(orderMessageStream, env,tEnv);
        }
    }
    

    じつじかんけいさん
    public class OrderFunction {
    
        public void handle(SingleOutputStreamOperator streamOperator,StreamExecutionEnvironment env,
                           StreamTableEnvironment tEnv){
            /**
             *      ,     :
             * 1.        ,select            Result    ,       Field types of query result and registered TableSink  do not match
             *    flink         Result              ,                select           ,           
             *            Result ,            :[buyDate: STRING, productCount: INT, productName: STRING, productType: STRING, totalMoney: DOUBLE, userName: STRING]
             *         select           :[STRING, INT, STRING, STRING,DOUBLE,STRING]
             *         sql (sum(o.productCount) ,o.buyDate     ):select sum(o.productCount) ,o.buyDate,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname     Field types of query result and registered TableSink  do not match
             *         sql (pt pname     ):select o.buyDate,sum(o.productCount),p.pt,p.pname,sum(o.money*o.productCount) ,u.uname,  Result 
             *           productName     pt  ,productType     pname  
             * 2.   hbase    ,          , username    uname,    select        hbase        , p.pname,u.uname
             * 3.   select       result          ,   sql           
             * 4.     t_order   JobDefine    table    【    】  , tEnv.createTemporaryView("t_order", orderMessageStream);
             * 5.    t_user   TableDefine    table    【     】   , :tEnv.registerFunction("t_user", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
             * 6.   t_product   TableDefine    table    【     】   , :tEnv.registerFunction("t_product", hBaseTableSource.getLookupFunction(new String[]{"rowKey"}));
             *
             */
            Table collectionTaskResult = tEnv.sqlQuery("select o.buyDate,sum(o.productCount) ,p.pname,p.pt,sum(o.money*o.productCount) ,u.uname" +
                    " from t_order o,lateral table(t_user(o.userID)) u,lateral table(t_product(o.productID)) p " +
                    " where o.userID=u.uid and o.productID=p.pid group by u.uname,p.pt,p.pname,o.buyDate");
            DataStream<Tuple2<Boolean, Result>> stream = tEnv.toRetractStream(collectionTaskResult, Result.class);
            //windAll              , 10      ,           ,             
            stream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                    .process(new ProcessAllWindowFunction<Tuple2<Boolean, Result>, List<Result>, TimeWindow>() {
                
                //values                    
                public void process(Context context, Iterable<Tuple2<Boolean, Result>> values, Collector<List<Result>> out) throws Exception {
                    Iterator<Tuple2<Boolean, Result>> iterator = values.iterator();
                    Map<String, Result> result = Maps.newHashMap();
                    while (iterator.hasNext()) {
                        Tuple2<Boolean, Result> tuple = iterator.next();
                        //       
                        if (tuple.getField(0)) {
                            Result item = tuple.getField(1);
                            //  map                 
                            result.put(item.getUserName()+":"+item.getProductType()+":"+item.getProductName()+":"+item.getBuyDate(), item);
                        }
                    }
                    out.collect(Lists.newArrayList(result.values()));
                }
            }).setParallelism(1)//           1,                  
                    // Java     MySQL 
                    .addSink(new OrderMysqlSinkFunction()).name("OrderMysqlSinkFunction");
    
    
        }
    }
    

    flink-sqlはすごいと思いますが、最初に述べたようにflink-sqlはflinkフロー計算に基づいて高度に抽象化されており、すべての機能を実現することはできません.
    注文書の支払いに成功した後、返金できます.返金が完了すると、注文書の状態が失効します.統計結果に返金成功後の関連データが含まれるべきではありません.では、sqlを使用して計算するにはどうすればいいですか.私もわかりませんが、フロー計算を使って操作できるので、次回まとめてみます.
    MySQLデータベース接続ツールクラス
    public class DBUtil {
        private static Logger logger = LogManager.getLogger(DBUtil.class);
        private static DruidDataSource dataSource;
    
        public static DruidDataSource getDataSource(String dbUrl, String dbName, String username, String password) {
            try {
                if (dataSource == null) {
                    synchronized (DBUtil.class) {
                        if (dataSource == null) {
                            dataSource = new DruidDataSource();
                            dataSource.setDriverClassName("com.mysql.jdbc.Driver");
                            dataSource.setUrl("jdbc:mysql://#/%?autoReconnect=true&useUnicode=true&useAffectedRows=true&characterEncoding=utf8".replace("#", dbUrl).replace("%", dbName));
                            dataSource.setUsername(username);
                            dataSource.setPassword(password);
    
                            //configuration
                            dataSource.setInitialSize(20);
                            dataSource.setMinIdle(20);
                            dataSource.setMaxActive(100);
                            dataSource.setMaxWait(60000);
                            dataSource.setTimeBetweenEvictionRunsMillis(60000);
                            dataSource.setMinEvictableIdleTimeMillis(300000);
                            dataSource.setValidationQuery("SELECT 1 FROM DUAL");
                            dataSource.setTestWhileIdle(true);
                            dataSource.setTestOnBorrow(false);
                            dataSource.setTestOnReturn(false);
                            dataSource.setPoolPreparedStatements(true);
                            dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
                            dataSource.setFilters("stat,wall,log4j");
                            dataSource.setConnectionProperties("druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000");
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("          ....", e);
            }
            return dataSource;
        }
    
        /**
         *         
         */
        public static DataSource getDataSource() {
            return dataSource;
        }
    
        /**
         *         
         */
        public static void closeDataSource() {
            logger.info("method datasource close !");
            if (dataSource != null) {
                dataSource.close();
            }
        }
    
        /**
         *        
         */
        public static Connection getConnection() {
            Connection conn = null;
            try {
                conn = dataSource.getConnection();
            } catch (SQLException e) {
                logger.error("         ", e);
            }
            return conn;
        }
    
        /**
         *        
         */
        public static void close(Connection conn,Statement st,ResultSet rs) {
            if(rs!=null) {
                try {
                    rs.close();
                } catch (SQLException e) {
                    logger.error("         ", e);
                }
            }
            close(conn,st);
        }
        public static void close(Connection conn,Statement st) {
            if(st!=null) {
                try {
                    st.close();
                } catch (SQLException e) {
                    logger.error("  Statement  ", e);
                }
            }
            close(conn);
        }
    
        public static void close(Statement st) {
            if(st!=null) {
                try {
                    st.close();
                } catch (SQLException e) {
                    logger.error("  Statement  ", e);
                }
            }
        }
    
        public static void close(Connection conn) {
            if(conn!=null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    logger.error("  SConnection  ", e);
                }
            }
        }
        public static void commit(Connection conn){
            if(conn!=null) {
                try {
                    conn.commit();
                } catch (SQLException e) {
                    logger.error("      ", e);
                }
            }
        }
        public static void rollback(Connection conn){
            if(conn!=null) {
                try {
                    conn.rollback();
                } catch (SQLException e) {
                    logger.error("    ", e);
                }
            }
        }
    }