SparkはHBAseデータ解析を読み込み、結果をHBAse解析結果テーブルおよびRedis(Javaコード)に格納する
24854 ワード
Spark分析が必要なHBAseテーブル:
顧客入金表customer_in_List流水番号:serial_num
顧客番号:customer_num
入金通貨:currency_in
入金金額:amount_in
日付:date_in
時間:time_in
解析結果テーブルresult_analysis顧客番号:customer_num
合計金額:amount_count
需要:顧客入金表を顧客番号によって入金金額を統計し、結果を分析結果表に保存し、顧客ランキングをRedisに更新する
Sparkコードは次のとおりです.
Redisツールクラス(Jedis直結方式)
HBAseツールクラス:https://blog.csdn.net/Godlike77/article/details/80913252
pom.xml:
転載添付リンク:https://blog.csdn.net/Godlike77/article/details/80912928
顧客入金表customer_in_List流水番号:serial_num
顧客番号:customer_num
入金通貨:currency_in
入金金額:amount_in
日付:date_in
時間:time_in
解析結果テーブルresult_analysis顧客番号:customer_num
合計金額:amount_count
需要:顧客入金表を顧客番号によって入金金額を統計し、結果を分析結果表に保存し、顧客ランキングをRedisに更新する
Sparkコードは次のとおりです.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
public class SparkToHbase {
public static void main(String[] args) {
SparkConf sparkconf = new SparkConf();
sparkconf.setAppName("test").setMaster("local");
JavaSparkContext jsc = new JavaSparkContext(sparkconf);
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node02,node03,node04");
final Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("fileInfo"));
scan.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("customer_num"));
scan.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("amount_in"));
try {
ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String tableName = "customer_in_list";
conf.set(TableInputFormat.INPUT_TABLE, tableName);
String scanToString = Base64.encodeBytes(proto.toByteArray());
conf.set(TableInputFormat.SCAN, scanToString);
} catch (IOException e) {
e.printStackTrace();
}
// HBase RDD
JavaPairRDD, Result> hbaseRDD = jsc.newAPIHadoopRDD(conf, TableInputFormat.class,
ImmutableBytesWritable.class, Result.class);
JavaPairRDD, Double> mapToPair1 = hbaseRDD.mapToPair(new PairFunction, Result>, String, Double>() {
@Override
public Tuple2, Double> call(Tuple2, Result> resultTuple2) throws Exception {
byte[] o1 = resultTuple2._2.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("customer_num"));
byte[] o2 = resultTuple2._2.getValue(Bytes.toBytes("fileInfo"), Bytes.toBytes("amount_in"));
return new Tuple2, Double>(new String(o1), new Double(new String(o2)));
}
});
JavaPairRDD, Double> reduceByKey = mapToPair1.reduceByKey(new Function2, Double, Double>() {
@Override
public Double call(Double aDouble, Double aDouble2) throws Exception {
return aDouble + aDouble2;
}
});
JavaPairRDD, String> mapToPair2 = reduceByKey.mapToPair(new PairFunction, Double>, Double, String>() {
@Override
public Tuple2, String> call(Tuple2, Double> stringDoubleTuple2) throws Exception {
return new Tuple2, String>(stringDoubleTuple2._2, stringDoubleTuple2._1);
}
});
JavaPairRDD, String> sortByKey = mapToPair2.sortByKey(false);
JavaPairRDD, Double> result = sortByKey.mapToPair(new PairFunction, String>, String, Double>() {
@Override
public Tuple2, Double> call(Tuple2, String> doubleStringTuple2) throws Exception {
return new Tuple2, Double>(doubleStringTuple2._2, doubleStringTuple2._1);
}
});
JavaPairRDD, String> result1 = result.mapToPair(new PairFunction, Double>, String, String>() {
@Override
public Tuple2, String> call(Tuple2, Double> stringDoubleTuple2) throws Exception {
return new Tuple2, String>(stringDoubleTuple2._1, new String(String.valueOf(stringDoubleTuple2._2)));
}
});
/*
result.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 stringDoubleTuple2) throws Exception {
System.out.println(stringDoubleTuple2);
}
});
*/
// Hbase
List, String>> collect = result1.collect();
for (Tuple2 tuple : collect) {
System.out.println(tuple._1() + " " + tuple._2());
try {
Table table = HBaseConn.getTable("result_analysis");
Put put = new Put(Bytes.toBytes((String) tuple._1));
put.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("customer_num"), Bytes.toBytes((String) tuple._1));
put.addColumn(Bytes.toBytes("fileInfo"), Bytes.toBytes("amount_count"), Bytes.toBytes((String) tuple._2));
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
// redis
List, String>> collect1 = result1.collect();
for (Tuple2 tuple2 :collect1){
RedisTest.add("key2",Double.parseDouble((String) tuple2._2), String.valueOf(tuple2._1));
}
jsc.close();
}
}
Redisツールクラス(Jedis直結方式)
import redis.clients.jedis.Jedis;
public class RedisTest {
public static boolean add(String key, Double v1, String v2) {
//spark
Jedis jedis = new Jedis("192.168.198.21",7003);
jedis.zadd(key,v1,v2);
return true;
}
}
HBAseツールクラス:https://blog.csdn.net/Godlike77/article/details/80913252
pom.xml:
xml version="1.0" encoding="UTF-8"?>
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.yt.test
sparkTest
1.0-SNAPSHOT
org.apache.maven.plugins
maven-compiler-plugin
1.6
1.6
org.apache.spark
spark-core_2.10
1.6.0
org.apache.hbase
hbase
1.2.4
org.apache.hbase
hbase-client
1.2.4
org.apache.hbase
hbase-common
1.2.4
junit
junit
4.12
test
org.apache.hadoop
hadoop-common
2.6.5
org.apache.hbase
hbase-server
1.2.4
転載添付リンク:https://blog.csdn.net/Godlike77/article/details/80912928