TachyonでSparkアプリケーションを実行する

8444 ワード

「Tachyon 0.7.0擬似分散クラスタのインストールとテスト」では、擬似分散Tachyonクラスタの構築方法について説明します.公式文書によるとSpark 1.4.xとTachyon 0.6.4バージョンは互換性があり、最新版のTachyon 0.7.1とSpark 1.5である.xは互換性があり、現在最新版のSparkは1.4.1であるため、以下の操作手順はすべてTachyon 0.6.4プラットフォームに基づいており、Tachyon 0.6.4の構築手順はTachyon 0.7.0と類似している.
くだらないことは言わないで、紹介しましょう.まずHDFSにiteblogなどのファイルをアップロードします.txt、格納ディレクトリ/data:
[blog@node1 hadoop]$  bin/hadoop fs -put blog.txt /data
Spark-shellを起動
[blog@node1 spark]$  bin/spark-shell
Tachyonでiteblogを取得できますtxtファイルは、次のとおりです.
scala>  val s = sc.textFile("tachyon://localhost:19998/data/iteblog.txt")
15/08/31 14:15:24 INFO storage.MemoryStore: ensureFreeSpace(156896) called with curMem=216700, maxMem=280248975
15/08/31 14:15:24 INFO storage.MemoryStore: Block broadcast_3 stored as values in memory (estimated size 153.2 KB, free 266.9 MB)
15/08/31 14:15:24 INFO storage.MemoryStore: ensureFreeSpace(14945) called with curMem=373596, maxMem=280248975
15/08/31 14:15:24 INFO storage.MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 14.6 KB, free 266.9 MB)
15/08/31 14:15:24 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:55566 (size: 14.6 KB, free: 267.2 MB)
15/08/31 14:15:24 INFO spark.SparkContext: Created broadcast 3 from textFile at :21
s: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at textFile at :21
tachyon://localhost:19998あなたのTachyonの通信アドレスです.ji/data/iteblog.txtはHDFS上のファイルに対応していますが、これはどうやって取得したのでしょうか?実はTachyonのconf/tachyon-envです.shファイルに配置されている、export TACHYON_UNDERFS_ADDRESS=hdfs://iteblog.com:8020構成、これは私たちのHDFSクラスタの通信アドレスを構成して、このように私たちはtachyonを通じてそのファイルを見つけることができます.では、アクションを実行します.
scala> s.count()
15/08/31 14:15:45 INFO : getFileStatus(/data/iteblog.txt): HDFS Path: hdfs://localhost:8020/data/iteblog.txt TPath: tachyon://localhost:19998/data/iteblog.txt
15/08/31 14:15:45 INFO : Loading to /data/iteblog.txt hdfs://localhost:8020/data/iteblog.txt 
15/08/31 14:15:45 INFO : Loading: hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO : Create tachyon file /data/iteblog.txt/iteblog.txt with file id 15 and checkpoint location hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO : listStatus(tachyon://localhost:19998/data/iteblog.txt): HDFS Path: hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO : getFileStatus(tachyon://localhost:19998/data/iteblog.txt/iteblog.txt): 
HDFS Path: hdfs://localhost:8020/data/iteblog.txt/iteblog.txt TPath: tachyon://localhost:19998/data/iteblog.txt/iteblog.txt
15/08/31 14:15:46 INFO mapred.FileInputFormat: Total input paths to process : 1
15/08/31 14:15:46 INFO spark.SparkContext: Starting job: count at :24
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Got job 2 (count at :24) with 2 output partitions (allowLocal=false)
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(count at :24)
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Missing parents: List()
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[4] at textFile at :21), which has no missing parents
15/08/31 14:15:46 INFO storage.MemoryStore: ensureFreeSpace(2992) called with curMem=388541, maxMem=280248975
15/08/31 14:15:46 INFO storage.MemoryStore: Block broadcast_4 stored as values in memory (estimated size 2.9 KB, free 266.9 MB)
15/08/31 14:15:46 INFO storage.MemoryStore: ensureFreeSpace(1828) called with curMem=391533, maxMem=280248975
15/08/31 14:15:46 INFO storage.MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1828.0 B, free 266.9 MB)
15/08/31 14:15:46 INFO storage.BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:55566 (size: 1828.0 B, free: 267.2 MB)
15/08/31 14:15:46 INFO spark.SparkContext: Created broadcast 4 from broadcast at DAGScheduler.scala:874
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[4] at textFile at :21)
15/08/31 14:15:46 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
15/08/31 14:15:46 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 2.0 (TID 4, localhost, PROCESS_LOCAL, 1427 bytes)
15/08/31 14:15:46 INFO executor.Executor: Running task 0.0 in stage 2.0 (TID 4)
15/08/31 14:15:46 INFO rdd.HadoopRDD: Input split: tachyon://localhost:19998/data/iteblog.txt/iteblog.txt:0+5840
15/08/31 14:15:46 INFO : open(tachyon://localhost:19998/data/iteblog.txt/iteblog.txt, 65536)
15/08/31 14:15:46 INFO : /mnt/ramdisk/tachyonworker/users/2/16106127360 was created!
15/08/31 14:15:46 INFO : Try to find remote worker and read block 16106127360 from 0, with len 11680
15/08/31 14:15:46 INFO : Block locations:[NetAddress(mHost:localhost, mPort:-1, mSecondaryPort:-1)]
15/08/31 14:15:46 INFO : Block locations:[NetAddress(mHost:localhost, mPort:-1, mSecondaryPort:-1)]
15/08/31 14:15:46 INFO : Opening stream from underlayer fs: hdfs://localhost:8020/data/iteblog.txt
15/08/31 14:15:46 INFO executor.Executor: Finished task 0.0 in stage 2.0 (TID 4). 1830 bytes result sent to driver
15/08/31 14:15:46 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 2.0 (TID 5, localhost, PROCESS_LOCAL, 1427 bytes)
15/08/31 14:15:46 INFO executor.Executor: Running task 1.0 in stage 2.0 (TID 5)
15/08/31 14:15:46 INFO rdd.HadoopRDD: Input split: tachyon://localhost:19998/data/iteblog.txt/iteblog.txt:5840+5840
15/08/31 14:15:46 INFO : open(tachyon://localhost:19998/data/iteblog.txt/iteblog.txt, 65536)
15/08/31 14:15:46 INFO executor.Executor: Finished task 1.0 in stage 2.0 (TID 5). 1830 bytes result sent to driver
15/08/31 14:15:46 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 2.0 (TID 4) in 543 ms on localhost (1/2)
15/08/31 14:15:46 INFO scheduler.DAGScheduler: ResultStage 2 (count at :24) finished in 0.555 s
15/08/31 14:15:46 INFO scheduler.DAGScheduler: Job 2 finished: count at :24, took 0.651055 s
res2: Long = 212
でiteblogを入手しましたtxtファイルの行数.実際にはTachyon自身がiteblogをtxtファイルはメモリにロードされ、自分のファイルシステムに保存されます.tachyon://localhost:19998/data/iteblog.txt/iteblog.txtTachyonのWEB UIインタフェース(私のパスではhttp://localhost:19999/browse?path=%2Fdata%2Fiteblog.txt%offset=0&limit=1)このファイルが見えます.
計算結果をTachyonに保存することもできます.
scala> s.saveAsTextFile("tachyon://localhost:19998/blog")
TachyonのWEB UIインタフェースでこのファイルを見ることができ、iteblogフォルダを作成しました.中のデータはRDDのデータです.注意深い読者はまた、HDFS上にRDDのデータが格納されているフォルダも生成されていることを発見します.以下のようにします.
[blog@node1 hadoop]$  bin/hadoop fs -ls /tachyon/data
Found 6 items
-rwxrwxrwx   3 blog supergroup      13367 2015-08-31 14:02 /tachyon/data/11
-rwxrwxrwx   3 blog supergroup          0 2015-08-31 14:02 /tachyon/data/12
-rwxrwxrwx   3 blog supergroup       5890 2015-08-31 14:21 /tachyon/data/21
-rwxrwxrwx   3 blog supergroup       5790 2015-08-31 14:21 /tachyon/data/23
-rwxrwxrwx   3 blog supergroup          0 2015-08-31 14:21 /tachyon/data/24
-rwxrwxrwx   3 blog supergroup      13491 2015-08-31 14:02 /tachyon/data/9
実はこの経路はconf/tachyon-envを通る.-中のdata.folder=$TACHYON_UNDERFS_ADDRESS/tachyon/data構成.
また、RDD cacheをTachyonに設定することで、ストレージレベルを設定することもできます.OFF_HEAPでいいです.これはGCの周波数を減らし、executorsが占有するリソースを減らすことができ、異なるApplication間でRDDのデータを共有できるようにすることが望ましい.次に例を示します.
scala> import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel

scala> val data=sc.parallelize(List("www", "iteblog", "com"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at :22

scala> val tmp = data.map(item => item +"good")
tmp: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at map at :24

scala> tmp.persist(StorageLevel.OFF_HEAP)
res9: tmp.type = MapPartitionsRDD[8] at map at :24

scala> tmp.count
では、TachyonのWEB UIインタフェースにキャッシュされたRDDストレージディレクトリが表示されます.また、これらのRDDはメモリファイルシステムに格納されている.使用中にsparkを通過することができます.externalBlockStore.urlパラメータ設定Tachyon filesystemのURL、デフォルトはtachyon://localhost:19998;sparkを通ります.externalBlockStore.BaseDirは、Tachyon File SystemにRDDを格納するベースパスを設定し、デフォルトはSystemである.getProperty(「java.io.tmpdir」)の値.