Sparkアプリケーションのテストとリリース
3981 ワード
Sparkアプリケーションのテストとリリース
1、テストはローカルの方法を採用し、masterをlocalに設定すればよい
ここでlocal[3]は、ローカルが3つのスレッドで実行されていることを示す.
スクリプトのコミット
2、生産環境を発表し、クラスター方式で運行する
コードの
次のように変更
スクリプトのコミット
3、典型的な問題と解答1)言語とツールの選択Scala+eclipseは実際にIntelliJ IDEAはscalaに対するサポートがより完備しており、後でIDEAに移動することを提案する2)キャッシュウィンドウのサイズに対してキャッシュのデータ時間を設定する必要がある
3)Kafkaのデータは複数回消費することができ、1つのreduceByKeyAndwindowは1回の消費に対応する
1、テストはローカルの方法を採用し、masterをlocalに設定すればよい
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("NewsTopNByDayAndHour").setMaster("local[3]");
conf.set("spark.streaming.blockInterval", "50ms");
val sc = new SparkContext(conf);
val ssc = new StreamingContext(sc, Seconds(25));
// 36
ssc.remember(Minutes(2160));
val sqlContext = new HiveContext(sc);
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);
//1. UDF
val udf = UDFUtils();
udf.registerUdf(sqlContext);
//2.kafka
val kafkaService = KakfaService();
val urlClickLogPairsDStream = kafkaService.kafkaDStream(ssc);
//3. hive
val cacheUtils = CacheUtils();
cacheUtils.cacheHiveData(sqlContext);
//4.
val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Seconds(3600 * 24),
Seconds(250));
//5. 24h
urlClickCntDay(urlClickCountDaysDStream, sqlContext);
// .
val urlClickCountsHourDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60),
Seconds(175));
// . 1h
urlClickCntHour(urlClickCountsHourDStream, sqlContext);
//6. streaming
ssc.start();
ssc.awaitTermination();
}
ここでlocal[3]は、ローカルが3つのスレッドで実行されていることを示す.
スクリプトのコミット
#!/bin/bash
source /etc/profile
nohup /opt/modules/spark/bin/spark-submit \
--conf "spark.executor.extraJavaOptions=-XX:PermSize=8m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--driver-memory 3g \
--executor-memory 3g \
--total-executor-cores 32 \
--conf spark.ui.port=5666 \
--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-
kafka_2.10-1.4.1.jar \
--class com.hexun.streaming.StockerRealRank \
StockerRealRank.jar \
>stocker.log 2>&1 & \
2、生産環境を発表し、クラスター方式で運行する
コードの
val conf = new SparkConf().setAppName("NewsTopNByDayAndHour").setMaster("local[3]");
次のように変更
val conf = new SparkConf().setAppName("NewsTopNByDayAndHour");
スクリプトのコミット
#!/bin/bash
source /etc/profile
nohup /opt/modules/spark/bin/spark-submit \
--master spark://10.130.2.20:7077 \
--conf "spark.executor.extraJavaOptions=-XX:PermSize=8m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--driver-memory 3g \
--executor-memory 3g \
--total-executor-cores 32 \
--conf spark.ui.port=5666 \
--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-
kafka_2.10-1.4.1.jar \
--class com.hexun.streaming.StockerRealRank \
StockerRealRank.jar \
>stocker.log 2>&1 & \
ローカル機器に比べてmasterノードを設定する構成が多くなっています3、典型的な問題と解答1)言語とツールの選択Scala+eclipseは実際にIntelliJ IDEAはscalaに対するサポートがより完備しており、後でIDEAに移動することを提案する2)キャッシュウィンドウのサイズに対してキャッシュのデータ時間を設定する必要がある
// 2
ssc.remember(Minutes(60 * 48));
3)Kafkaのデータは複数回消費することができ、1つのreduceByKeyAndwindowは1回の消費に対応する
//
val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 2),
Seconds(25));
// kafka
val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 48),
Seconds(35));