Spark SQL実行メモ


spark-shellからバッチに変えた際に少しハマったので、その時のメモ。

事前準備

環境

Proxy環境下にあるPCでVirtualBOXを利用して作成した仮想マシン
HadoopはCDH5.5.1を利用。擬似分散環境。
SparkもCDHのものを利用。インストール方法は以下を参照。
http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_spark_install.html

Scala

CDHのSparkがscala 2.10.4でコンパイルされたもののようなので、同じバージョンのScalaをインストールする。以下からダウンロードし、PATHを通しておく。
※バージョンを合わせないとコンパイル時にエラーが発生
http://www.scala-lang.org/download/2.10.4.html

sbt

以下からダウンロードし、解凍後、sbt-launch.jarを~/bin配下に配置する。
http://www.scala-sbt.org/download.html

~bin/sbtを作成

SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $JAVA_OPTS $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

JAVA_OPTS環境変数にProxy情報を設定

JAVA_OPTS= -Dhttp.proxyHost={Proxyサーバ名} -Dhttp.proxyPort={Port番号} -Dhttps.proxyHost={Proxyサーバ名} -Dhttps.proxyPort={Port番号}

sbt-assemblyプラグインを利用するために、project/plugins.sbtに以下を記載

plugins.sbt
resolvers += "Bintray sbt plugin releases" at "http://dl.bintray.com/sbt/sbt-plugin-releases/"

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

以下のディレクトリを作成。ソースはsrc/main/scala配下に配置。

src/main/scala/
src/test/scala/
lib
project
target

SparkSQLサンプルプログラムコンパイル及び実行

test.scala

import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame

object Test{
  case class TUser(tId: String, attribute: String)
  case class NUser(nId: String)

  def exec1(dirPath:String, sc: SparkContext,sqlContext: SQLContext, nDF:DataFrame){
    try {
      val filePath = dirPath+"t.txt"
      import sqlContext.implicits._
      val tDF = sc.textFile(filePath).map { record =>
        val splitRecord = record.split(",")
        val tId = splitRecord(0)
        val attribute = splitRecord(1)
        TUser(tId,attribute)
      }.toDF

      val tuser = tDF.distinct().count()
      printf("The number of user is %s \n",tuser)

      val nuser = nDF.distinct().count()
      printf("The number of nuser is %s \n",nuser)

      val tnDF = tDF.join(nDF,tDF("tId") === nDF("nId"),"inner").select($"tId")
      val numtnuser = tnDF.distinct().count()
      printf("The number of tnuser is %s \n",numtnuser)
    }
  }

  def main(args: Array[String]){

    require(args.length >=1, "Pls specify path")
    val dirPath = args(0)
    val conf = new SparkConf
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    try {
      val filePath = dirPath+"n.txt"
      import sqlContext.implicits._
      val nDF = sc.textFile(filePath).map { record =>
        val splitRecord = record.split(",")
        val nId = splitRecord(0)
        NUser(nId)
      }.toDF

    exec1(dirPath,sc,sqlContext,nDF)
    }
  }
}
build.sbt
name := "Test"
version := "0.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10" % "1.5.0" % "provided", "org.apache.spark" % "spark-sql_2.10" % "1.5.0" % "provided")
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

コンパイル

$ sbt assembly

ジョブ実行

$ spark-submit --class Test--name Test target/scala-2.10/Test-assembly-0.1.jar /user/yotsu/input/