SparkでCLIを使ってCassandraデータを読み取ります.
最近はSparkをCassandraに架ける研究をしています.この方面の情報が比較的に少ないことを発見して、学習の過程の中で多くの問題にも出会って、そのためここで記録して、みんなと分かち合います.この例は最も古典的なWordCountの例である.
まず、私が使っている環境とバージョンを説明します.SparkとCassandraの更新が速いので、もし後のバージョンが違ったら運行がうまくいかないかもしれません.いくつかの微調整が必要です.
とりあえずWindows 7を使って、その後Linuxに移行しますが、この影響はあまりありません.Scal 2.9.3、Spark 0.8、Cassandra 1.2.10、sbt 0.3.0、Java 7を使用します.
まず私達が自分でSparkのjarカバンを作る必要があります.これは私たちがsbtコマンドを実行して得る必要があります.Sparkのディレクトリに移動して実行します.
sbt\sbt assmbly(Linuxならsbt/sbt assiebly)です.運行が終わったら、spark\assmbly\target\scala-2.9.3の下にspark-assiembly-08.0-*.jarと似た名前のカバンを見つけられます.私たちはこのカバンをコンパイルパスに追加する必要があります.
次に、私たちはCassandraにデータを挿入する必要があります.まずコマンドラインでcassandra/binの下のcassandraコマンドを実行してサービスを開始する必要があります.そしてcassandra-cliを実行して、私達は必要なデータを入力することができます.本明細書の終わりには、この例で使用されるデータの例を見つけることができる.
そして私たちは始められます.
また、他のパラメータはslice predicateなどに設定できます.ここでは省略して、簡単な設定だけを紹介します.
そして私たちはSpark独自のRDDオブジェクトを作成し、それを使って私たちの仕事を完成させます.
これで簡単なWordCount方法を完成しました.
私たちは以下のコードで結果をプリントアウトして見ることができます.
ここではSparkでCassandraデータを読み出して処理することを紹介します.次のセクションでは、書き込みデータを紹介します.
まず、私が使っている環境とバージョンを説明します.SparkとCassandraの更新が速いので、もし後のバージョンが違ったら運行がうまくいかないかもしれません.いくつかの微調整が必要です.
とりあえずWindows 7を使って、その後Linuxに移行しますが、この影響はあまりありません.Scal 2.9.3、Spark 0.8、Cassandra 1.2.10、sbt 0.3.0、Java 7を使用します.
まず私達が自分でSparkのjarカバンを作る必要があります.これは私たちがsbtコマンドを実行して得る必要があります.Sparkのディレクトリに移動して実行します.
sbt\sbt assmbly(Linuxならsbt/sbt assiebly)です.運行が終わったら、spark\assmbly\target\scala-2.9.3の下にspark-assiembly-08.0-*.jarと似た名前のカバンを見つけられます.私たちはこのカバンをコンパイルパスに追加する必要があります.
次に、私たちはCassandraにデータを挿入する必要があります.まずコマンドラインでcassandra/binの下のcassandraコマンドを実行してサービスを開始する必要があります.そしてcassandra-cliを実行して、私達は必要なデータを入力することができます.本明細書の終わりには、この例で使用されるデータの例を見つけることができる.
そして私たちは始められます.
val sc = new SparkContext("local[3]", "casDemo")
新しいSparkコンテキストオブジェクトを作成します.最初のパラメータは接続するClusterアドレスです.ここではlocal hostだけを使って実行しますので、簡単にlocal[*]に設定できます.*は1、2、3などの数字です.二つ目はただ一つの表示パラメータです.val job = new Job()
新しいHadoop jobを作成します.Sparkは直接APIを提供していないので、Cassandraにアクセスしますが、SparkはHadoopの上に建てられています.CassandraはHadoopにアクセスするインターフェースを提供しています.job.setInputFormatClass(classOf[ColumnFamilyInputFormat])
inputのクラスを設定します.これは他のオプションがありません.これはCassandraのデフォルトのjarパッケージに提供されているインターフェースです.ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost")
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160")
ConfigHelper.setInputColumnFamily(job.getConfiguration(), "casDemo", "Words")
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner")
Cassandraが提供する静的なクラスConfigHelperによって対応するいくつかのパラメータを設定する.「cas Demo「この例で使われているkeyspaceです.wordsはcolumn familyです.9160はCassandraのデフォルトで使用されています.最後の一つは複数のマシンが動作する時に使用するPartioner hashアルゴリズムです.org.apphe.cssandrar.Mumar 3 Partner、org.apper 3番目の値が選択できます.紹介します.ここで初めて使います.また、他のパラメータはslice predicateなどに設定できます.ここでは省略して、簡単な設定だけを紹介します.
そして私たちはSpark独自のRDDオブジェクトを作成し、それを使って私たちの仕事を完成させます.
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[ColumnFamilyInputFormat],
classOf[ByteBuffer],//key
classOf[SortedMap[ByteBuffer, IColumn]]) //value
ここでRDDオブジェクトが作成されているのを見ることができます.最初のパラメータは私たちが配置したパラメータです.二つ目は前に述べたCassandraが提供したインターフェースです.3番目と4番目のパラメータは他のオプションがありません.この2つのパラメータはColumnFamilyInputFormatに制限されています.これはSparkAPIを見ても分かります.ここでは多くは言いません.val paraRdd = casRdd flatMap {
case (key, value) => {
value.filter(v => {
ByteBufferUtil.string(v._1).compareTo("paragraph") == 0
}).map(v => ByteBufferUtil.string(v._2.value()))
}
}
ここでmapperを実行する方法です.hadoopの中のmapperと同じ概念です.ここのkeyはByteBufferの対象です.valueはSorteMap[ByteBuffer、IColum]です.この二つは前にRDD設定を作成した最後の二つのパラメータです.ここで作ったのはcolomn nameをフィルタリングしたのです.paragrapphとは等しくないです.そしてcolomnをparagrapphの値をIcolumnからByteBufferのタイプに変えました.val counts = paraRdd.flatMap(p => p.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
最後にreduce方法です.段落をスペースで崩して単語にして、ワードを元のグループに変えます.最後にワードの出現回数を統計します.これで簡単なWordCount方法を完成しました.
私たちは以下のコードで結果をプリントアウトして見ることができます.
counts.collect() foreach {
case (word, count) => println(word + ":" + count)
}
ここではSparkでCassandraデータを読み出して処理することを紹介します.次のセクションでは、書き込みデータを紹介します.