Spark SQL実行メモ
spark-shellからバッチに変えた際に少しハマったので、その時のメモ。
事前準備
環境
Proxy環境下にあるPCでVirtualBOXを利用して作成した仮想マシン
HadoopはCDH5.5.1を利用。擬似分散環境。
SparkもCDHのものを利用。インストール方法は以下を参照。
http://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_spark_install.html
Scala
CDHのSparkがscala 2.10.4でコンパイルされたもののようなので、同じバージョンのScalaをインストールする。以下からダウンロードし、PATHを通しておく。
※バージョンを合わせないとコンパイル時にエラーが発生
http://www.scala-lang.org/download/2.10.4.html
sbt
以下からダウンロードし、解凍後、sbt-launch.jarを~/bin配下に配置する。
http://www.scala-sbt.org/download.html
~bin/sbtを作成
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $JAVA_OPTS $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"
JAVA_OPTS環境変数にProxy情報を設定
JAVA_OPTS= -Dhttp.proxyHost={Proxyサーバ名} -Dhttp.proxyPort={Port番号} -Dhttps.proxyHost={Proxyサーバ名} -Dhttps.proxyPort={Port番号}
sbt-assemblyプラグインを利用するために、project/plugins.sbtに以下を記載
resolvers += "Bintray sbt plugin releases" at "http://dl.bintray.com/sbt/sbt-plugin-releases/"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
以下のディレクトリを作成。ソースはsrc/main/scala配下に配置。
src/main/scala/
src/test/scala/
lib
project
target
SparkSQLサンプルプログラムコンパイル及び実行
test.scala
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
object Test{
case class TUser(tId: String, attribute: String)
case class NUser(nId: String)
def exec1(dirPath:String, sc: SparkContext,sqlContext: SQLContext, nDF:DataFrame){
try {
val filePath = dirPath+"t.txt"
import sqlContext.implicits._
val tDF = sc.textFile(filePath).map { record =>
val splitRecord = record.split(",")
val tId = splitRecord(0)
val attribute = splitRecord(1)
TUser(tId,attribute)
}.toDF
val tuser = tDF.distinct().count()
printf("The number of user is %s \n",tuser)
val nuser = nDF.distinct().count()
printf("The number of nuser is %s \n",nuser)
val tnDF = tDF.join(nDF,tDF("tId") === nDF("nId"),"inner").select($"tId")
val numtnuser = tnDF.distinct().count()
printf("The number of tnuser is %s \n",numtnuser)
}
}
def main(args: Array[String]){
require(args.length >=1, "Pls specify path")
val dirPath = args(0)
val conf = new SparkConf
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
try {
val filePath = dirPath+"n.txt"
import sqlContext.implicits._
val nDF = sc.textFile(filePath).map { record =>
val splitRecord = record.split(",")
val nId = splitRecord(0)
NUser(nId)
}.toDF
exec1(dirPath,sc,sqlContext,nDF)
}
}
}
build.sbt
name := "Test"
version := "0.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10" % "1.5.0" % "provided", "org.apache.spark" % "spark-sql_2.10" % "1.5.0" % "provided")
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
コンパイル
$ sbt assembly
ジョブ実行
$ spark-submit --class Test--name Test target/scala-2.10/Test-assembly-0.1.jar /user/yotsu/input/
import org.apache.spark.{SparkConf,SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.DataFrame
object Test{
case class TUser(tId: String, attribute: String)
case class NUser(nId: String)
def exec1(dirPath:String, sc: SparkContext,sqlContext: SQLContext, nDF:DataFrame){
try {
val filePath = dirPath+"t.txt"
import sqlContext.implicits._
val tDF = sc.textFile(filePath).map { record =>
val splitRecord = record.split(",")
val tId = splitRecord(0)
val attribute = splitRecord(1)
TUser(tId,attribute)
}.toDF
val tuser = tDF.distinct().count()
printf("The number of user is %s \n",tuser)
val nuser = nDF.distinct().count()
printf("The number of nuser is %s \n",nuser)
val tnDF = tDF.join(nDF,tDF("tId") === nDF("nId"),"inner").select($"tId")
val numtnuser = tnDF.distinct().count()
printf("The number of tnuser is %s \n",numtnuser)
}
}
def main(args: Array[String]){
require(args.length >=1, "Pls specify path")
val dirPath = args(0)
val conf = new SparkConf
val sc = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
try {
val filePath = dirPath+"n.txt"
import sqlContext.implicits._
val nDF = sc.textFile(filePath).map { record =>
val splitRecord = record.split(",")
val nId = splitRecord(0)
NUser(nId)
}.toDF
exec1(dirPath,sc,sqlContext,nDF)
}
}
}
name := "Test"
version := "0.1"
scalaVersion := "2.10.4"
libraryDependencies ++= Seq("org.apache.spark" % "spark-core_2.10" % "1.5.0" % "provided", "org.apache.spark" % "spark-sql_2.10" % "1.5.0" % "provided")
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
$ sbt assembly
$ spark-submit --class Test--name Test target/scala-2.10/Test-assembly-0.1.jar /user/yotsu/input/
Author And Source
この問題について(Spark SQL実行メモ), 我々は、より多くの情報をここで見つけました https://qiita.com/t-yotsu/items/e5f70e69fee046b62994著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .