spark + cassandraの始め方


spark

sparkをdownloadして、buildする。

cd spark-1.5.0
make_distribution.sh

spark-cassandra-connector

spark-cassandra-connectorをgit cloneして build※ここで、ビルド失敗してだめ

git clone https://github.com/datastax/spark-cassandra-connector
cd spark-cassandra-connector
git checkout b1.5
sbt package
sbt assembly

これで、spark-cassandra-connector/target/scala-2.10以下にassembly jarが生成される。

spark-cassandra-connectorのビルドに失敗する場合は、maven cetralからjarを個別にdownloadして使う。

downloadしたのは、

  • guava-0.18
  • spark-cassandra-connector_2.10-1.5.0-M2.jar
  • cassandra-driver-core-2.2.0-rc2.jar

guava-0.18はなぜかないと怒られるので、追加した。

spark-shellを起動

spark-cassandra-connectorのビルドに成功した場合

bin/spark-shell --conf spark.cassandra.connection.host=127.0.0.1
--jars spark-cassandra-connector-assembly-1.5.0-M2-SNAPSHOT.jar 

spark-cassandra-connectorのビルドに失敗した場合

bin/spark-shell --jars spark-cassandra-connector_2.10-1.5.0-M2.jar,cassandra-driver-core-2.2.0-rc2.jar,guava-18.0.jar --conf spark.cassandra.connection.host=127.0.0.1

on mac and linux

  • macでlocalのcassandraでの動作確認は 127.0.0.1, cassandra.yamlのseedsには 127.0.0.1を記述
  • linuxで分散モードでcassandraでの動作確認は host=ipaddress of host, このipはcassandra.yamlのseedsに書いてあるip

cassandra keyspace, tableの作成動作確認

spark-shellで以下を実行

import com.datastax.spark.connector._ //Imports basic rdd functions
import com.datastax.spark.connector.cql._ //(Optional) Imports java driver helper functions

val c = CassandraConnector(sc.getConf)
c.withSessionDo ( session => session.execute("CREATE KEYSPACE test_from_spark WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
c.withSessionDo ( session => session.execute("CREATE TABLE test_from_spark.fun (k int PRIMARY KEY, v int)"))

cqlshでkeyspace, tableができているかの確認

cqlsh:mykeyspace> describe keyspaces;

system_auth  mykeyspace          test           test_from_spark
system       system_distributed  system_traces

data insertの動作確認

scala> c.withSessionDo ( session => session.execute("insert into test_from_spark.fun (k,v) values(1, 10)"))
res6: com.datastax.driver.core.ResultSet = ResultSet[ exhausted: true, Columns[]]

cqlshでtableに入っているかの確認

cqlsh:test_from_spark> select * from fun;

 k | v
---+----
 1 | 10
 2 | 20

(2 rows)

cassandraTableでtableをRDDとして取得動作確認

scala> val d = sc.cassandraTable("test_from_spark", "fun");
d: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> d.count
res4: Long = 2                                                                  
                                                                          scala> d
res7: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15

scala> d.collect
res9: Array[com.datastax.spark.connector.CassandraRow] = Array(CassandraRow{k: 1, v: 10}, CassandraRow{k: 2, v: 20})

scala> d.collect.foreach(println)
CassandraRow{k: 1, v: 10}                                                       
CassandraRow{k: 2, v: 20}

実施中のエラー情報

  • com.google.util.concurrent.**がclasspathにありませんエラー => guave.0.18.jarを自前でjarsに追加すると治った。

  • cassandra connectの動作確認中に以下のエラー ※cassandraの設定をいじったら治った?

scala
scala> c.withSessionDo ( session => session.execute("CREATE KEYSPACE test WITH replication={'class':'SimpleStrategy', 'replication_factor':1}"))
15/11/14 11:21:10 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.
java.io.IOException: Failed to open native connection to Cassandra at {192.168.11.2}:9042
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:164)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:150)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:39)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:49)
    at $iwC$$iwC$$iwC.<init>(<console>:51)
    at $iwC$$iwC.<init>(<console>:53)
    at $iwC.<init>(<console>:55)
    at <init>(<console>:57)
    at .<init>(<console>:61)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.11.2:9042 (com.datastax.driver.core.TransportException: [/192.168.11.2:9042] Cannot connect))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:229)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:84)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1264)
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:338)
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:157)
    ... 56 more


scala>