sparkstreamingの完全な例
7248 ワード
サマリ
本文は主に1つの簡単なsparkstreamingの小さい栗を実現して、全体の流れはkafkaからリアルタイムにデータを読み取って、pv、uvを計算して、sum(money)の操作、最後に計算結果をredisの中に保存して、sqlで大体
select time,page,count(*),count(distinct user) uv,sum(money) from test group by page,time
サンプルデータフォーマット:
user,page,money,time
データをkafkaにpushする
スタートkafka
データの作成
コンソール消費は以下の通りです.
pv,uvおよび累計金額の計算
データがredisに格納されるため、redisクライアントコードは以下のように取得される.
sparkstreamingはbatchでデータを処理し、例えばbatchDuration=10を設定すると、ロットごとに10秒以内に受信したデータを処理し、pvを計算する際に、直接countを加算すればよい.しかしuvを計算する場合、この10秒以内に現れるユーザは、前のbatchにも現れる可能性がありますが、sparkはbatchでデータを処理しているので、前のユーザが現れたことがあるかどうかはわかりませんが、簡単な積算だけでは、一日でuvのデータが実際のuvよりずっと大きいので、この問題を解決するにはHyperLogLogを導入し、redisはこの機能を提供しているので、具体的な使用状況は栗を直接見ることができます.
a b c d e f g h i jこれらはuserと理解でき、userが来るたびにpfadd user操作を実行する.pfcount keyを使用すると、直接重量除去後のuvを得ることができますが、このアルゴリズムには誤差があることに注意してください.関連ドキュメントの誤差は約0.8%程度で、uvを計算するために使用されています.この誤差は受け入れられます.具体的な誤差はテストしてもいいです.ここでは測りません.
リアルタイム計算コードは次のとおりです.
結果をredisで表示
pvの表示
sumの表示
UVを見ると、テストデータは8つのuserしかないので、uvはすべて8です.
今データはすでにredisの中で、タイミングタスクを書いてmysqlにデータpushを書くことができて、フロントエンドは展示することができて、リアルタイム計算は大体このような考え方です
本文は主に1つの簡単なsparkstreamingの小さい栗を実現して、全体の流れはkafkaからリアルタイムにデータを読み取って、pv、uvを計算して、sum(money)の操作、最後に計算結果をredisの中に保存して、sqlで大体
select time,page,count(*),count(distinct user) uv,sum(money) from test group by page,time
サンプルデータフォーマット:
user,page,money,time
smith,iphone4.html,578.02,1500618981283
andrew,mac.html,277.62,1500618981285
smith,note.html,388.56,1500618981285
データをkafkaにpushする
スタートkafka
データの作成
package com.fan.spark.stream
import java.text.DecimalFormat
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Random
/**
* Created by http://www.fanlegefan.com on 17-7-21.
*/
object ProduceMessage {
def main(args: Array[String]): Unit = {
val props = newProperties()
props.put("bootstrap.servers","localhost:9092")
props.put("acks","all")
props.put("retries","0")
props.put("batch.size","16384")
props.put("linger.ms","1")
props.put("buffer.memory","33554432")
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")
val producer = newKafkaProducer[String, String](props)
val users = Array("jack","leo","andy","lucy","jim","smith","iverson","andrew")
val pages = Array("iphone4.html","huawei.html","mi.html","mac.html","note.html","book.html","fanlegefan.com")
val df = newDecimalFormat("#.00")
val random = newRandom()
val num = 10
for(i
コンソール消費は以下の通りです.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
andrew,book.html,309.58,1500620213384
jack,book.html,954.01,1500620213456
iverson,book.html,823.07,1500620213456
iverson,iphone4.html,486.76,1500620213456
lucy,book.html,14.00,1500620213457
iverson,note.html,206.30,1500620213457
jack,book.html,25.30,1500620213457
jim,iphone4.html,513.82,1500620213457
lucy,mac.html,677.29,1500620213457
smith,mi.html,571.30,1500620213457
lucy,iphone4.html,113.83,1500620213457
pv,uvおよび累計金額の計算
データがredisに格納されるため、redisクライアントコードは以下のように取得される.
package com.fan.spark.stream
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool
/**
* Created by http://www.fanlegefan.com on 17-7-21.
*/
object RedisClient {
val redisHost = "127.0.0.1"
val redisPort = 6379
val redisTimeout = 30000
lazy val pool = newJedisPool(newGenericObjectPoolConfig(), redisHost, redisPort, redisTimeout)
lazy val hook = newThread {
override def run = {
println("Execute hook thread: " + this)
pool.destroy()
}
}
sys.addShutdownHook(hook.run)
}
sparkstreamingはbatchでデータを処理し、例えばbatchDuration=10を設定すると、ロットごとに10秒以内に受信したデータを処理し、pvを計算する際に、直接countを加算すればよい.しかしuvを計算する場合、この10秒以内に現れるユーザは、前のbatchにも現れる可能性がありますが、sparkはbatchでデータを処理しているので、前のユーザが現れたことがあるかどうかはわかりませんが、簡単な積算だけでは、一日でuvのデータが実際のuvよりずっと大きいので、この問題を解決するにはHyperLogLogを導入し、redisはこの機能を提供しているので、具体的な使用状況は栗を直接見ることができます.
redis 127.0.0.1:6379> PFADD mykey a b c d e f g h i j
(integer) 1
redis 127.0.0.1:6379> PFCOUNT mykey
(integer) 10
a b c d e f g h i jこれらはuserと理解でき、userが来るたびにpfadd user操作を実行する.pfcount keyを使用すると、直接重量除去後のuvを得ることができますが、このアルゴリズムには誤差があることに注意してください.関連ドキュメントの誤差は約0.8%程度で、uvを計算するために使用されています.この誤差は受け入れられます.具体的な誤差はテストしてもいいです.ここでは測りません.
リアルタイム計算コードは次のとおりです.
package com.fan.spark.stream
import java.text.SimpleDateFormat
import java.util.Date
import kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by http://www.fanlegefan.com on 17-7-21.
*/
object UserActionStreaming {
def main(args: Array[String]): Unit = {
val df = newSimpleDateFormat("yyyyMMdd")
val group = "test"
val topics = "test"
val sparkConf = newSparkConf().setAppName("pvuv").setMaster("local[3]")
val sc = newSparkContext(sparkConf)
val ssc = newStreamingContext(sc, Seconds(10))
ssc.checkpoint("/home/work/IdeaProjects/sparklearn/checkpoint")
val topicSets = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list"-> "localhost:9092",
"group.id"-> group
)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
kafkaParams, topicSets)
stream.foreachRDD(rdd=>rdd.foreachPartition(partition=>{
val jedis = RedisClient.pool.getResource
partition.foreach(tuple=>{
val line = tuple._2
val arr = line.split(",")
val user = arr(0)
val page = arr(1)
val money = arr(2)
val day = df.format(newDate(arr(3).toLong))
//uv
jedis.pfadd(day + "_"+ page , user)
//pv
jedis.hincrBy(day+"_pv", page, 1)
//sum
jedis.hincrByFloat(day+"_sum", page, money.toDouble)
})
}))
ssc.start()
ssc.awaitTermination()
}
}
結果をredisで表示
127.0.0.1:6379> keys *
1)"20170721_note.html"
2)"20170721_book.html"
3)"20170721_fanlegefan.com"
4)"20170721_mac.html"
5)"20170721_pv"
6)"20170721_mi.html"
7)"20170721_iphone4.html"
8)"20170721_sum"
9)"20170721_huawei.html"
pvの表示
127.0.0.1:6379> HGETALL 20170721_pv
1)"mi.html"
2)"112"
3)"note.html"
4)"107"
5)"fanlegefan.com"
6)"124"
7)"huawei.html"
8)"122"
9)"iphone4.html"
10)"92"
11)"mac.html"
12)"103"
13)"book.html"
14)"135"
sumの表示
127.0.0.1:6379> HGETALL 20170721_sum
1)"mi.html"
2)"56949.65999999999998948"
3)"note.html"
4)"56803.50999999999999801"
5)"fanlegefan.com"
6)"59622.50999999999999801"
7)"huawei.html"
8)"64456.50000000000000711"
9)"iphone4.html"
10)"48643.07000000000001094"
11)"mac.html"
12)"51693.17999999999998906"
13)"book.html"
14)"67724.17999999999999261"
UVを見ると、テストデータは8つのuserしかないので、uvはすべて8です.
127.0.0.1:6379> PFCOUNT 20170721_huawei.html
(integer) 8
127.0.0.1:6379> PFCOUNT 20170721_fanlegefan.com
(integer) 8
今データはすでにredisの中で、タイミングタスクを書いてmysqlにデータpushを書くことができて、フロントエンドは展示することができて、リアルタイム計算は大体このような考え方です