spark streaming Receiver
package order
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* create by roy 2019-08-12
* spark streaming Receiver
*/
object WinStreamTest {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
def main(args: Array[String]): Unit = {
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamTest")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val customReceiverStream = ssc.receiverStream(new CustomTestReceiver("a a b b c"))
val words = customReceiverStream.flatMap(_.split(" "))
println("print start ----")
// words.print()
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
println("wordCounts print start ----")
wordCounts.print()
println("wordCounts print end ----")
ssc.start()
ssc.awaitTermination()
}
}
CustomTestReceiver , 。
package order
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import scala.util.Random
class CustomTestReceiver(dayTime: String)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() {
receive(dayTime)
}
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive(dayTime: String) {
try {
println("dayTime==" + dayTime)
val randomNum = (new Random).nextInt(20)
val returnStr = dayTime + " r " + " " + randomNum
println("receive print=" + returnStr)
store(returnStr)
restart("Trying to connect again")
} catch {
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
output:
dayTime==a a b b c
receive print=a a b b c r 3
19/08/12 16:52:43 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Trying to connect again
19/08/12 16:52:43 ERROR ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Trying to connect again
19/08/12 16:52:43 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/08/12 16:52:43 WARN BlockManager: Block input-0-1565599963600 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1565599964000 ms
-------------------------------------------
(b,2)
(,1)
(r,1)
(a,2)
(3,1)
(c,1)
-------------------------------------------
Time: 1565599965000 ms
-------------------------------------------
dayTime==a a b b c
receive print=a a b b c r 10