Sparkアプリケーションのテストとリリース

3981 ワード

Sparkアプリケーションのテストとリリース
1、テストはローカルの方法を採用し、masterをlocalに設定すればよい
def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("NewsTopNByDayAndHour").setMaster("local[3]");
    conf.set("spark.streaming.blockInterval", "50ms");

    val sc = new SparkContext(conf);
    val ssc = new StreamingContext(sc, Seconds(25));
    // 36 
    ssc.remember(Minutes(2160));
    val sqlContext = new HiveContext(sc);

    Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);

    //1. UDF
    val udf = UDFUtils();
    udf.registerUdf(sqlContext);

    //2.kafka 
    val kafkaService = KakfaService();
    val urlClickLogPairsDStream = kafkaService.kafkaDStream(ssc);

    //3. hive 
    val cacheUtils = CacheUtils();
    cacheUtils.cacheHiveData(sqlContext);

    //4. 
    val urlClickCountDaysDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Seconds(3600 * 24),
      Seconds(250));

    //5. 24h 
    urlClickCntDay(urlClickCountDaysDStream, sqlContext);

    
    // .  
    val urlClickCountsHourDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
      (v1: Int, v2: Int) => {
        v1 + v2
      },
      Minutes(60),
      Seconds(175));

    // . 1h 
    urlClickCntHour(urlClickCountsHourDStream, sqlContext);

    //6. streaming 
    ssc.start();
    ssc.awaitTermination();

  }

ここでlocal[3]は、ローカルが3つのスレッドで実行されていることを示す.
スクリプトのコミット
#!/bin/bash
source /etc/profile


nohup /opt/modules/spark/bin/spark-submit \
--conf "spark.executor.extraJavaOptions=-XX:PermSize=8m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--driver-memory 3g \
--executor-memory 3g \
--total-executor-cores 32 \
--conf spark.ui.port=5666  \
--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-
kafka_2.10-1.4.1.jar \
--class com.hexun.streaming.StockerRealRank \
StockerRealRank.jar \
>stocker.log 2>&1 & \

2、生産環境を発表し、クラスター方式で運行する
コードの
val conf = new SparkConf().setAppName("NewsTopNByDayAndHour").setMaster("local[3]");

次のように変更
val conf = new SparkConf().setAppName("NewsTopNByDayAndHour");

スクリプトのコミット
#!/bin/bash
source /etc/profile


nohup /opt/modules/spark/bin/spark-submit \
--master spark://10.130.2.20:7077 \
--conf "spark.executor.extraJavaOptions=-XX:PermSize=8m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--driver-memory 3g \
--executor-memory 3g \
--total-executor-cores 32 \
--conf spark.ui.port=5666  \
--jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-
kafka_2.10-1.4.1.jar \
--class com.hexun.streaming.StockerRealRank \
StockerRealRank.jar \
>stocker.log 2>&1 & \
ローカル機器に比べてmasterノードを設定する構成が多くなっています
3、典型的な問題と解答1)言語とツールの選択Scala+eclipseは実際にIntelliJ IDEAはscalaに対するサポートがより完備しており、後でIDEAに移動することを提案する2)キャッシュウィンドウのサイズに対してキャッシュのデータ時間を設定する必要がある
// 2 
ssc.remember(Minutes(60 * 48));

3)Kafkaのデータは複数回消費することができ、1つのreduceByKeyAndwindowは1回の消費に対応する
// 
val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 2),
Seconds(25));

// kafka 
val urlClickCountsDStreamByDay = urlClickLogPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => {
v1 + v2
},
Minutes(60 * 48),
Seconds(35));