【Spark六十七】Spark Standalone完全分散実装

20054 ワード


1.Spark 1.2.1(with hadoop 2.4)をダウンロードして解凍する
http://mirror.bit.edu.cn/apache/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz

 
2.Scala-2.10.4をダウンロードして解凍する
http://www.scala-lang.org/files/archive/scala-2.10.4.tgz

 
3.Scala環境変数の構成:
3.1 SCALA_の設定HOME
 
/home/hadoop/software/scala-2.10.4
 
 
3.2.$SCALA_をHOME/binシステムPATH変数に
 
3.3 consoleでコマンドscala-versionを実行してScalaインストールのバージョンを表示する
 
Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_67).
 
 
 
4.Spark環境変数の構成(vim/etc/profile)
 
export JAVA_HOME=/home/hadoop/software/jdk1.7.0_67
export HADOOP_HOME=/home/hadoop/software/hadoop-2.5.2
export SPARK_HOME=/home/hadoop/software/spark-1.2.1-bin-hadoop2.4
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

 
5.Sparkが持っているspark-env.shファイルを編集する(すべてのノードが設定されている)
 
export JAVA_HOME=/home/hadoop/software/jdk1.7.0_67
export HADOOP_HOME=/home/hadoop/software/hadoop-2.5.2
export SPARK_HOME=/home/hadoop/software/spark-1.2.1-bin-hadoop2.4
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SCALA_HOME=/home/hadoop/software/scala-2.10.4
export SPARK_MASTER_IP=192.168.26.131
export SPARK_WORKER_INSTANCES=1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=7087
export SPARK_WORKER_PORT=8077

 
6.マスターノードでslavesファイルを修正する
 
Masterノードで/conf/slavesファイルを変更してslavesを構成します.Slaveノードは以下の設定をしないでください
 
192.168.26.133
192.168.26.134

 
7.Masterと2つのslavesを起動する
7.1 Masterノードで次のコマンドでMasterと2つのSlaveを起動する
 
sbin/start-all.sh
 
 
 
7.2起動は二つのSlaveが起動したが、Masterは起動しなかった.Masterの異常は以下の通りである.
 
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Spark Command: /home/hadoop/software/jdk1.7.0_67/bin/java -cp ::/home/hadoop/software/spark-1.2.1-bin-hadoop2.4/sbin/../conf:/home/hadoop/software/spark-1.2.1-bin-hadoop2.4/lib/spark-assembly-1.2.1-hadoop2.4.0.jar:/home/hadoop/software/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar:/home/hadoop/software/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar:/home/hadoop/software/spark-1.2.1-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar:/home/hadoop/software/hadoop-2.5.2/etc/hadoop:/home/hadoop/software/hadoop-2.5.2/etc/hadoop -XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip 192.168.26.131 --port 7077 --webui-port 7087
========================================

15/02/18 01:47:16 INFO master.Master: Registered signal handlers for [TERM, HUP, INT]
15/02/18 01:47:17 INFO spark.SecurityManager: Changing view acls to: hadoop
15/02/18 01:47:17 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/02/18 01:47:17 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/02/18 01:47:21 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/02/18 01:47:24 INFO Remoting: Starting remoting
Exception in thread "main" java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at akka.remote.Remoting.start(Remoting.scala:180)
        at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
        at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
        at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
        at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
        at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
        at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
        at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
        at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
        at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
        at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
        at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756)
        at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
        at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849)
        at org.apache.spark.deploy.master.Master$.main(Master.scala:829)
        at org.apache.spark.deploy.master.Master.main(Master.scala)
15/02/18 01:47:33 ERROR Remoting: Remoting error: [Startup timed out] [
akka.remote.RemoteTransportException: Startup timed out
        at akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:136)
        at akka.remote.Remoting.start(Remoting.scala:198)
        at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
        at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618)
        at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615)
        at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615)
        at akka.actor.ActorSystemImpl.start(ActorSystem.scala:632)
        at akka.actor.ActorSystem$.apply(ActorSystem.scala:141)
        at akka.actor.ActorSystem$.apply(ActorSystem.scala:118)
        at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121)
        at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
        at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
        at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1765)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
        at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1756)
        at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
        at org.apache.spark.deploy.master.Master$.startSystemAndActor(Master.scala:849)
        at org.apache.spark.deploy.master.Master$.main(Master.scala:829)
        at org.apache.spark.deploy.master.Master.main(Master.scala)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10000 milliseconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at akka.remote.Remoting.start(Remoting.scala:180)
        ... 17 more
]
15/02/18 01:47:37 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/02/18 01:47:37 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports
.

 
 
7.3 stop-all.shを指定してすべてのMasterとSlavesプロセスを閉じ、start-all.shを再起動します.Masterと2つのSlaveが正常に起動しました.
7.3.1 Masterの起動ログは次のとおりです.
 
15/02/18 01:54:50 INFO master.Master: Registered signal handlers for [TERM, HUP, INT]
15/02/18 01:54:50 INFO spark.SecurityManager: Changing view acls to: hadoop
15/02/18 01:54:50 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/02/18 01:54:50 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/02/18 01:54:51 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/02/18 01:54:51 INFO Remoting: Starting remoting
15/02/18 01:54:52 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:7077]
15/02/18 01:54:52 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:7077]
15/02/18 01:54:52 INFO util.Utils: Successfully started service 'sparkMaster' on port 7077.
15/02/18 01:54:52 INFO master.Master: Starting Spark master at spark://192.168.26.131:7077
15/02/18 01:54:52 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/02/18 01:54:52 INFO server.AbstractConnector: Started [email protected]:7087
15/02/18 01:54:52 INFO util.Utils: Successfully started service 'MasterUI' on port 7087.
15/02/18 01:54:52 INFO ui.MasterWebUI: Started MasterWebUI at http://hadoop.master:7087
15/02/18 01:54:53 INFO master.Master: I have been elected leader! New state: ALIVE
15/02/18 01:54:57 INFO master.Master: Registering worker hadoop.slave2:8077 with 1 cores, 971.0 MB RAM
15/02/18 01:54:57 INFO master.Master: Registering worker hadoop.slave1:8077 with 1 cores, 971.0 MB RAM

表示
7.3.1.1.Masterは7077ポートで起動して傍受する.
7.3.1.2.MasterのWebUIが起動し、アクセスアドレスは
7.3.1.3 hadoop.slave 1とhadoop.slave 2はいずれも8077ポートで傍受され、Masterノードに登録されている
 
 
7.3.2 Slave側のログは以下の通りである.
 
15/02/18 01:54:54 INFO worker.Worker: Registered signal handlers for [TERM, HUP, INT]
15/02/18 01:54:54 INFO spark.SecurityManager: Changing view acls to: hadoop
15/02/18 01:54:54 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/02/18 01:54:54 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/02/18 01:54:55 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/02/18 01:54:56 INFO Remoting: Starting remoting
15/02/18 01:54:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:8077]
15/02/18 01:54:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:8077]
15/02/18 01:54:56 INFO util.Utils: Successfully started service 'sparkWorker' on port 8077.
15/02/18 01:54:56 INFO worker.Worker: Starting Spark worker hadoop.slave1:8077 with 1 cores, 971.0 MB RAM
15/02/18 01:54:56 INFO worker.Worker: Spark home: /home/hadoop/software/spark-1.2.1-bin-hadoop2.4
15/02/18 01:54:56 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/02/18 01:54:56 INFO server.AbstractConnector: Started [email protected]:8081
15/02/18 01:54:56 INFO util.Utils: Successfully started service 'WorkerUI' on port 8081.
15/02/18 01:54:56 INFO ui.WorkerWebUI: Started WorkerWebUI at http://hadoop.slave1:8081
15/02/18 01:54:56 INFO worker.Worker: Connecting to master spark://192.168.26.131:7077...
15/02/18 01:54:57 INFO worker.Worker: Successfully registered with master spark://192.168.26.131:7077

 
8 UI展示
 
8.1 MasterのUI表示(7087ポート):
  【Spark六十七】Spark Standalone完全分布式安装_第1张图片   
8.2 SlaveのUI展示(8081ポート):
  【Spark六十七】Spark Standalone完全分布式安装_第2张图片  
 
9 Sparkクラスタテスト:
 
 
9.1.MasterでSpark Shellを起動し、次の操作を行います.すべての結果がWorkerではなくMasterに存在することを発見しました.これはなぜですか.bin/spark-shellコマンドのデフォルトでは、ローカルlocalをmaster、すなわちspark-shell--master localとして使用しています.
 
bin>./spark-shell
scala> val rdd = sc.parallelize(List(1,3,7,7,8,9,11,2,11,33,44,99,111,2432,4311,111,111), 7)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:12

scala> rdd.saveAsTextFile("file:///home/hadoop/output")

 9.2.上のshellを終了し、次の操作を行います.
./spark-shell --master spark://192.168.26.131:7077

 上のIPは、hadoop.masterなどのドメイン名を指定すると、接続できない、なぜか分からないというヒントが表示されます.
ログから、SparkのデータはすでにWorkerに提出されて実行されていることがわかります.
scala> rdd.saveAsTextFile("file:///home/hadoop/output2")
15/02/18 02:37:55 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
15/02/18 02:37:55 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
15/02/18 02:37:55 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
15/02/18 02:37:55 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
15/02/18 02:37:55 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
15/02/18 02:37:55 INFO spark.SparkContext: Starting job: saveAsTextFile at <console>:15
15/02/18 02:37:55 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at <console>:15) with 7 output partitions (allowLocal=false)
15/02/18 02:37:55 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at <console>:15)
15/02/18 02:37:55 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/02/18 02:37:55 INFO scheduler.DAGScheduler: Missing parents: List()
15/02/18 02:37:55 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[1] at saveAsTextFile at <console>:15), which has no missing parents
15/02/18 02:37:55 INFO storage.MemoryStore: ensureFreeSpace(112056) called with curMem=0, maxMem=280248975
15/02/18 02:37:55 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 109.4 KB, free 267.2 MB)
15/02/18 02:37:55 INFO storage.MemoryStore: ensureFreeSpace(67552) called with curMem=112056, maxMem=280248975
15/02/18 02:37:55 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 66.0 KB, free 267.1 MB)
15/02/18 02:37:55 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop.master:44435 (size: 66.0 KB, free: 267.2 MB)
15/02/18 02:37:55 INFO storage.BlockManagerMaster: Updated info of block broadcast_0_piece0
15/02/18 02:37:55 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838
15/02/18 02:37:55 INFO scheduler.DAGScheduler: Submitting 7 missing tasks from Stage 0 (MappedRDD[1] at saveAsTextFile at <console>:15)
15/02/18 02:37:55 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 7 tasks
15/02/18 02:37:55 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop.slave1, PROCESS_LOCAL, 1208 bytes)
15/02/18 02:37:55 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, hadoop.slave2, PROCESS_LOCAL, 1208 bytes)
15/02/18 02:38:06 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop.slave1:41802 (size: 66.0 KB, free: 267.2 MB)
15/02/18 02:38:06 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop.slave2:34337 (size: 66.0 KB, free: 267.2 MB)
15/02/18 02:38:24 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, hadoop.slave1, PROCESS_LOCAL, 1212 bytes)
15/02/18 02:38:24 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 28274 ms on hadoop.slave1 (1/7)
15/02/18 02:38:24 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, hadoop.slave2, PROCESS_LOCAL, 1208 bytes)
15/02/18 02:38:24 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 28719 ms on hadoop.slave2 (2/7)
15/02/18 02:38:26 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, hadoop.slave2, PROCESS_LOCAL, 1212 bytes)
15/02/18 02:38:26 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 1908 ms on hadoop.slave2 (3/7)
15/02/18 02:38:26 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, hadoop.slave1, PROCESS_LOCAL, 1208 bytes)
15/02/18 02:38:26 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 2595 ms on hadoop.slave1 (4/7)
15/02/18 02:38:27 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, hadoop.slave2, PROCESS_LOCAL, 1212 bytes)
15/02/18 02:38:27 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 440 ms on hadoop.slave2 (5/7)
15/02/18 02:38:27 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 336 ms on hadoop.slave1 (6/7)
15/02/18 02:38:27 INFO scheduler.TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 193 ms on hadoop.slave2 (7/7)
15/02/18 02:38:27 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
15/02/18 02:38:27 INFO scheduler.DAGScheduler: Stage 0 (saveAsTextFile at <console>:15) finished in 31.266 s
15/02/18 02:38:27 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at <console>:15, took 31.932050 s

 
その結果,Slave 1とSlave 2には既に出力ディレクトリがあるが,ディレクトリの下にはデータがないことが分かった.
 
9.3 workdcountの実行もその結果です
bin/spark-shell --master spark://192.168.26.131:7077
scala> var rdd = sc.textFile("file:///home/hadoop/history.txt.used.byspark", 7)
rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _,5).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2, x._1)).saveAsTextFile("file:///home/hadoop/output")

 
9.4  Spark独自のSparkPiプログラムを実行し、
 
./run-example SparkPi 1000 --master spark://192.168.26.131:7077
 結果出力可能:Pi is roughly 3.14173708
 
 9.5 workdcountの実行(HDFSの読み書き)
bin/spark-shell --master spark://192.168.26.131:7077
scala> var rdd = sc.textFile("/user/hadoop/history.txt.used.byspark", 7)
rdd.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _,5).map(x => (x._2, x._1)).sortByKey(false).map(x => (x._2, x._1)).saveAsTextFile("/user/hadoop/output")

正しい結果が得られるのは正しいpart-00000~part-0004
 
  【Spark六十七】Spark Standalone完全分布式安装_第3张图片  slave 1は3つのタスクを実行し、slave 2は2つのTaskを実行し、Shuffleによって661 Bと1248 Bをそれぞれ読み出した.