SparkはHBAseデータ解析を読み込み、結果をHBAse解析結果テーブルおよびRedis(Javaコード)に格納する

24854 ワード

Spark分析が必要なHBAseテーブル:
顧客入金表customer_in_List流水番号:serial_num
顧客番号:customer_num
入金通貨:currency_in
入金金額:amount_in
日付:date_in
時間:time_in
解析結果テーブルresult_analysis顧客番号:customer_num
合計金額:amount_count
需要:顧客入金表を顧客番号によって入金金額を統計し、結果を分析結果表に保存し、顧客ランキングをRedisに更新する
Sparkコードは次のとおりです.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.io.IOException;
import java.util.*;

public class SparkToHbase {


    public static void main(String[] args) {
        SparkConf sparkconf = new SparkConf();
        sparkconf.setAppName("test").setMaster("local");
        JavaSparkContext jsc = new JavaSparkContext(sparkconf);

        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "node02,node03,node04");

        final Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("fileInfo"));
        scan.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("customer_num"));
        scan.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("amount_in"));

        try {
            ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
            String tableName = "customer_in_list";
            conf.set(TableInputFormat.INPUT_TABLE, tableName);
            String scanToString = Base64.encodeBytes(proto.toByteArray());
            conf.set(TableInputFormat.SCAN, scanToString);
        } catch (IOException e) {
            e.printStackTrace();
        }


        // HBase      RDD
        JavaPairRDD, Result> hbaseRDD = jsc.newAPIHadoopRDD(conf, TableInputFormat.class,
                ImmutableBytesWritable.class, Result.class);

        JavaPairRDD, Double> mapToPair1 = hbaseRDD.mapToPair(new PairFunction, Result>, String, Double>() {
            @Override
            public Tuple2, Double> call(Tuple2, Result> resultTuple2) throws Exception {
                byte[] o1 = resultTuple2._2.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("customer_num"));
                byte[] o2 = resultTuple2._2.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("amount_in"));
                return new Tuple2, Double>(new String(o1), new Double(new String(o2)));
            }
        });

        JavaPairRDD, Double> reduceByKey = mapToPair1.reduceByKey(new Function2, Double, Double>() {
            @Override
            public Double call(Double aDouble, Double aDouble2) throws Exception {
                return aDouble + aDouble2;
            }
        });

        JavaPairRDD, String> mapToPair2 = reduceByKey.mapToPair(new PairFunction, Double>, Double, String>() {
            @Override
            public Tuple2, String> call(Tuple2, Double> stringDoubleTuple2) throws Exception {
                return new Tuple2, String>(stringDoubleTuple2._2, stringDoubleTuple2._1);
            }
        });

        JavaPairRDD, String> sortByKey = mapToPair2.sortByKey(false);


        JavaPairRDD, Double> result = sortByKey.mapToPair(new PairFunction, String>, String, Double>() {
            @Override
            public Tuple2, Double> call(Tuple2, String> doubleStringTuple2) throws Exception {
                return new Tuple2, Double>(doubleStringTuple2._2, doubleStringTuple2._1);
            }
        });

        JavaPairRDD, String> result1 = result.mapToPair(new PairFunction, Double>, String, String>() {
            @Override
            public Tuple2, String> call(Tuple2, Double> stringDoubleTuple2) throws Exception {
                return new Tuple2, String>(stringDoubleTuple2._1, new String(String.valueOf(stringDoubleTuple2._2)));
            }
        });

/*
        result.foreach(new VoidFunction>() {
            @Override
            public void call(Tuple2 stringDoubleTuple2) throws Exception {
                System.out.println(stringDoubleTuple2);
            }
        });
*/


        //  Hbase
        List, String>> collect = result1.collect();
        for (Tuple2 tuple : collect) {
            System.out.println(tuple._1() + " " + tuple._2());
            try {
                Table table = HBaseConn.getTable("result_analysis");
                Put put = new Put(Bytes.toBytes((String) tuple._1));
                put.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("customer_num"), Bytes.toBytes((String) tuple._1));
                put.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("amount_count"), Bytes.toBytes((String) tuple._2));
                table.put(put);

            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        //  redis
        List, String>> collect1 =  result1.collect();

        for (Tuple2 tuple2 :collect1){
            RedisTest.add("key2",Double.parseDouble((String) tuple2._2), String.valueOf(tuple2._1));
        }
        jsc.close();
    }
}

Redisツールクラス(Jedis直結方式)
import redis.clients.jedis.Jedis;

public class RedisTest {

    public static boolean add(String key, Double v1, String v2) {

        //spark
        Jedis jedis = new Jedis("192.168.198.21",7003);
        jedis.zadd(key,v1,v2);
        return  true;
    }

}

HBAseツールクラス:https://blog.csdn.net/Godlike77/article/details/80913252
pom.xml:
xml version="1.0" encoding="UTF-8"?>
xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    4.0.0

    com.yt.test
    sparkTest
    1.0-SNAPSHOT
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                
                    1.6
                    1.6
                
            
        
    


    
        
            org.apache.spark
            spark-core_2.10
            1.6.0
        
        
            org.apache.hbase
            hbase
            1.2.4
        
        
            org.apache.hbase
            hbase-client
            1.2.4
        
        
            org.apache.hbase
            hbase-common
            1.2.4
        
        
            junit
            junit
            4.12
            test
        
        
            org.apache.hadoop
            hadoop-common
            2.6.5
        
        
            org.apache.hbase
            hbase-server
            1.2.4
        
    

転載添付リンク:https://blog.csdn.net/Godlike77/article/details/80912928