はじめてのSpark
導入から実行まで
1.scalaをinstallする
brew install scala
2.sparkをclone
git clone https://github.com/apache/spark.git
3.spark直下へ移動
cd spark
4.versionを1.3.1に変更
git checkout -b version2 v1.3.1
5.mvnを使用し、install
mvn -DskipTests clean package
6.work/logs/test.txtにテストデータを作成
7.spark-shellを実行
macbook:spark user$ ./bin/spark-shell
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.3.1
/_/
8.ローカルのファイルからRDDを生成
scala> val file = sc.textFile("work/logs/test.txt")
file: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21
9.user1というカラムでmaping
scala> val filter = file.filter(_.contains("user1"))
filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at filter at <console>:23
10.print処理
scala> println(filter.count)
2
spark-submitを使用しstandaloneを作成
1.installしたsparkにpathを通す
echo 'alias spark-shell="~/work/spark/bin/spark-shell"' >>~/.bash_aliases
echo 'alias spark-submit="~/work/spark/bin/spark-submit"'>>~/.bash_aliases
source ~/.bash_alias
2.sbtをinstall
brew install sbt
3.file構成
$ mkdir ~/work
$ cd work
$ mkdir spark
$ mkdir spark/src
$ mkdir spark/src/main
$ mkdir spark/src/main/scala
$ mkdir spark/project
$ mkdir spark/log
$ vi spark/log/access_log #access_logは適宜用意
$ mkdir spark/target
4.build.sbtを作成 ※1
//build-version
version := "0.1"
//scalaVersion
scalaVersion := "2.11.7"
5.依存ライブラリのinstall
libraryDependencies ++= Seq(
"org.xerial" % "sqlite-jdbc" % "3.7.2",
"org.apache.spark" %% "spark-core" % "1.3.1" % "provided",
//"org.apache.spark" %% "spark-streaming" % "1.3.1"
//"org.apache.spark" %% "spark-streaming-kafka" % "1.3.1"
)
//※3 重複防止
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
case "application.conf" => MergeStrategy.concat
case "unwanted.txt" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
6.plugin.sbtを作成
logLevel := Level.Warn
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
7.access_logをParseし件数を取得(Example.scala)
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
object ExampleApp {
def main(args: Array[String]) {
//用意したaccess_log
val logFile = "log/access_log"
//AppNameの設定コメントで書いているがsubmitコマンド押下時に読み込まれるAppNameはファイル名
val conf = new SparkConf().setAppName("Example Application")
//sparkcontextのインスタンス生成
val sc = new SparkContext(conf)
//access_logデータを取得と同時にキャッシュに読み込み
val logData = sc.textFile(logFile, 2).cache()
//logDataの中からlinkを取得+count
val index = logData.filter(line => line.contains("index.html")).count()
val html1 = logData.filter(line => line.contains("1.html")).count()
val html2 = logData.filter(line => line.contains("2.html")).count()
val gif1 = logData.filter(line => line.contains("1.gif")).count()
val gif2 = logData.filter(line => line.contains("2.gif")).count()
//表示
println("Lines with index.html: %s".format(index))
println("Lines with 1.html: %s".format(html1))
println("Lines with 2.html: %s".format(html2))
println("Lines with 1.gif: %s".format(gif1))
println("Lines with 2.gif: %s".format(gif2))
}
}
8.jarファイルの作成 ※1
sbt assembly
9.spark-submitの実行
spark-submit --class ExampleApp target/scala-2.11/scala-assembly-0.1.jar
10.実行結果(結果のみ)
Lines with index.html: 141882
Lines with 1.html: 19996
Lines with 2.html: 18567
Lines with 1.gif: 20996
Lines with 2.gif: 52393
11.etc
※1
Name, Version, ScalaVersion, LibraryDependencies などを
間に空行を入れて記述します。最後にも空行を入れます。
※2
sbtのversionによって記述が必要なようです。
今回のversionではあるとエラー
※3
初期設定時この記述が無く、依存ライブラリが重複しjarファイル作成時にエラーが出ていた
Author And Source
この問題について(はじめてのSpark), 我々は、より多くの情報をここで見つけました https://qiita.com/gozuqi/items/979598ccad7866c2bc28著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .