spark高度データ分析-scala学習(学習ノート)

18321 ワード

1.sparkに入る
cmd後に「spark-shell」と入力してscala版のsparkを行います
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.0
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_121)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
...
ark/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar."
17/04/06 17:17:24 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
17/04/06 17:17:25 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
SQL context available as sqlContext.

scala>

2.ファイルを開く
#    
val a=sc.textFile("D:/spark/PythonApplication1/PythonApplication1/README.md")#     #
#   a: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :27             ,string

var a=sc.textFile("readme.md")#var// Detected repl transcript paste: ctrl-D to finish.
// Replaying 1 commands from transcript.
scala> var a=sc.textFile("readme.md")
#  a: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at :27

3.クラスタからクライアントにデータを取得する
#          first  ,         RDD       :
a.first()
res1: String = # Apache Spark

#    take   ,     first  collect         ,        
#             。         take              
#10    :
val head=a.take(10)
'''
   head: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", 
'''
head.length
#   res3: Int = 10

#  
a.count()  #count     a      
#  res8: Long = 95

a.collect() #collect         RDD       Array(  ):
'''  
res9: Array[String] = Array(# Apache Spark, "", Spark is a fast and general cluster computing system for Big Data. It provides, high-level APIs in Scala, Java, Python, and R, and an optimized engine that, supports general computation graphs for data analysis. It also supports a, rich set of higher-level tools including Spark SQL for SQL and DataFrames,, MLlib for machine learning, GraphX for graph processing,, and Spark Streaming for stream processing., "", , "", "", ## Online Documentation, "", You can find the latest Spark documentation, including a programming, guide, on the [project web page](http://spark.apache.org/documentation.html), and [project wiki](https://cwiki.apache.org/confluence/display/SPARK)., This README file only contains basic setup instruc...
'''

a.saveAsTextFile("D:/spark/PythonApplication1/PythonApplication2/README.md")#saveAsTextFile    RDD            (  HDFS) 

#            ,     foreach      println            ,          :
 head.foreach(println)
 '''  
# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, Python, and R, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and DataFrames,
MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

//spark.apache.org/>
'''

'''
              ,                  。  ,CSV
             ,        。            "id_1"  
        ,       Scala                   , 
   :
'''
def isHeader(line: String) = line.contains("id_1")
isHeader: (line: String)Boolean
'''
 Python   ,Scala         def。 Python   ,           
  :    ,    line    String。       String   contains  
 ,            "id_1"     ,              。  
      line      ,              ,    Scala    
   String      String  contains     true  false          
      。
'''
'''
                   ,          、        
  return      。   ,Scala                  。   
        ,            。            ,    
          。                 ,    :
'''
def isHeader(line: String): Boolean = {
line.contains("id_1")
}
isHeader: (line: String)Boolean
#   Scala  Array   filter        ,   head          Scala   :
head.filter(isHeader).foreach(println)
#  "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1",...
'''
      isHeader        :  filter    isHeader    head   
 ,             。                。      
  ,Scala      。       Array   filterNot   :
head.filterNot(isHeader).length
...
res: Int = 9
     Scala         , filter      isHeader     :
head.filter(x => !isHeader(x)).length
...
res: Int = 9
Scala           Python  lambda   。               x
        isHeader   ,  isHeader         。   ,    
      x        ,Scala        head     Array[String]  
  x  String  。
Scala              。  Scala              ,    
       ,                ,      x=>。     
      ,         :Scala        (_)         ,
         4    :
head.filter(!isHeader(_)).length
...
res: Int = 9
               ,              ,        
   。             ,           。
'''

4.クライアントからクラスタへのコード送信
さっき、Scala言語の定義と関数の実行の様々な方法を見ました.私たちが実行するコードはhead配列のデータに作用し、これらのデータはクライアントマシンにあります.今、Sparkで書いたばかりのコードを関連記録データセットRDD rawblocksに適用するつもりです.このデータセットはクラスタにあり、数百万件が記録されています.次はサンプルコードですが、特によく知っていると思いますか?val noheader = rawblocks.filter(x=>!isHeader(x))は、クラスタ上のデータセット全体をフィルタする構文と、ローカルマシン上のhead配列をフィルタする構文がそっくりです.フィルタリングルールが正しいかどうかをnoheaderというRDDで検証できます:noheader.first … res: String = 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUEこれは強すぎる!これは、クラスタからサンプリングして小さなデータセットを得ることができ、小さなデータセットでデータ処理コードを開発し、デバッグすることができ、すべてが準備されたら、クラスタに送信して完全なデータセットを処理すればよいことを意味します.一番すごいのは、shellインタフェースを離れる必要がないことです.Spark以外に、このような体験を与える道具はありません.
5.メタグループとcase classでデータを構造化する
 val line = head(5)#      
#line: String = 4,17,1193
val pieces = line.split(',')  #      
#pieces: Array[String] = Array(4, 17, 1193)

注意head配列要素にアクセスするときは、角カッコではなく丸カッコを使用します.Scala言語アクセス配列要素は、関数呼び出しであり、特別なオペレータではありません.Scalaはクラスに特殊な関数applyを定義することを許可し、オブジェクトを関数として処理すると、このapply関数が呼び出されるのでhead(5)はheadに等しい.apply(5). Java Stringクラスのsplit関数を用いてlineの異なる部分を分解し,Array[String]タイプの配列piecesを返す.次に、Scalaのタイプ変換関数を使用してpiecesの単一要素を適切なタイプに変換します.
val id1 = pieces(0).toInt
#id1: Int = 4
val matched = pieces(2).toBoolean   #   

また,二重精度浮動小数点数タイプの9つの整合スコアフィールドを変換する必要がある.すべての変換を一度に完了するには、まずScala Arrayクラスのsliceメソッドで配列要素の一部を抽出し、高次関数mapを呼び出してsliceの各要素のタイプをStringからDoubleに変換します.
val rawscores = pieces.slice(1,3)
#rawscores: Array[String] = Array(17, 1193)

scala> rawscores.map(s => s.toDouble)
res0: Array[Double] = Array(17.0, 1193.0)
'''  rawscores      ?  ,StringOps  toDouble         ?   
Double。        ,    ?    NaN  ,   rawscores       
   :
'''
def toDouble(s: String) = {
if ("?".equals(s)) Double.NaN else s.toDouble
}
val scores = rawscores.map(toDouble)

次に、すべての解析コードを1つの関数に結合し、1つのメタグループですべての解析された値を返します.
def parse(line: String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2, 11).map(toDouble)
val matched = pieces(11).toBoolean
(id1, id2, scores, matched)
}
val tup = parse(line)

メタグループから1つのフィールドの値を取得するには、下付き関数を使用します.1から、またはproductElementメソッドを使用して、0からカウントを開始します.ProductArityメソッドを使用して、メタグループのサイズを取得することもできます.
tup._1
tup.productElement(0)
tup.productArity

下付きではなく名前に基づいてフィールドにアクセスできる簡単なレコードタイプを作成したいと考えています.幸いなことに、Scalaはこのような構文を提供し、case classという記録を簡単に作成することができます.case classは可変クラスの簡単なタイプで、toString、equals、hashCodeなどのすべてのJavaクラスの基本的な方法を内蔵しています.関連データを記録するcase classを定義してみましょう.
case class MatchData(id1: Int, id2: Int,scores: Array[Double], matched: Boolean)

次にparseメソッドを変更してMatchDataインスタンスを返します.このインスタンスはcase classであり、メタグループではありません.
def parse(line: String) = {
val pieces = line.split(',')
val id1 = pieces(0).toInt
val id2 = pieces(1).toInt
val scores = pieces.slice(2, 11).map(toDouble)
val matched = pieces(11).toBoolean
MatchData(id1, id2, scores, matched)
}
val md = parse(line)

ここで注意しなければならないのは2つです.1つは、case classを作成するときにMatchDataの前にキーワードnewを書く必要はありません(Scala開発者がキーボードを叩くのが大嫌いであることを再説明します).二つ目は、MatchDataクラスに内蔵されたtoStringメソッドが実装されており、scores配列フィールドのほか、このメソッドは他のフィールドでよく表現されています.MatchDataのフィールドに名前でアクセスします.
md.matched
md.id1

単一のレコードで解析関数をテストし、head配列のすべての要素(ヘッダー行を除く)に使用します.
val mds = head.filter(x => !isHeader(x)).map(x => parse(x))

はい、通過しました.解析関数をクラスタデータに使用し、noheader RDDでmap関数を使用します.
val parsed = noheader.map(line => parse(line))

ローカルで生成したmds配列とは異なり、parse関数はクラスタデータに実際に適用されていないことを覚えておいてください.parsedというRDD上で出力が必要な呼び出しを実行すると、noheader RDDの各Stringはparse関数でMatchDataクラスのインスタンスに変換されます.parsed RDD上で別の呼び出しを実行して異なる出力を生成する場合、parse関数は入力データ上でもう一度実行されます.これはクラスタリソースを十分に利用していない.データが解析されると、クラスタに解析形式でデータを保存したいので、新しい問題が発生するたびに再解析する必要はありません.Sparkはこのような使用シーンをサポートし,インスタンス上でcacheメソッドを呼び出すことで,メモリにRDDをキャッシュすることを示すことができる.parsedというRDDで実験してみましょう.
parsed.cache()

6.重合
ここではこれまで見られなかったが,本章では主にScalaとSparkでデータを処理する方法について述べ,これらの方法はローカルデータとクラスタデータに似ている.このセクションでは、ScalaとSpark APIのいくつかの違いを見てみましょう.特に、データパケットと集約の面で見てみましょう.多くの違いは効率です.データが1台のマシンのメモリに格納される場合に比べて、大規模なデータセットが複数のマシンに分布し、集約する場合、データ転送の効率がより心配されます.この違いを説明するために,Sparkを用いて,マッチングと不整合の記録数を計算することを目的として,MatchDataに対してそれぞれローカルクライアントとクラスタ上で簡単な集約操作を行った.mds配列におけるローカルMatchDataレコードについては、キー値がMatchDataクラスのフィールドmatched:val grouped=mdsに基づくScala Map[Boolean,Array[MatchData]]をgroupByメソッドで作成する.groupBy(md=>md.matched)はgrouped変数の値を得た後,groupedでmapValues法を用いてカウントを上げることができる.mapValueメソッドはmapメソッドと似ていますが、Mapオブジェクトに作用する値は、各配列のサイズ:grouped.mapValues(x => x.size).foreach(println)は、ローカルデータのエントリが一致するため、mapが返す唯一のエントリはメタグループ(true,9)である.もちろん、ローカルデータはレコード関連データセット全体の一部にすぎません.このパケット操作がデータ全体で実行されると、多くの不一致なレコードを見つけることが期待されます.クラスタデータを集約する場合は、私たちが分析したデータが複数のマシンに格納されていることを常に覚えておき、集約はマシンのネットワークに接続してデータを移動する必要があります.ネットワーク間でデータを移動するには、各レコードがどのサーバに転送されるか、データシーケンス化、データ圧縮を決定し、ネットワークを介してデータを送信し、解凍し、シーケンス化結果をシーケンス化し、最後に集約されたデータ上で演算を実行するなど、多くの計算リソースが必要です.速度を向上させるためには、できるだけ少ないデータ移動が必要です.集約前にフィルタできるデータが多ければ多いほど、問題の答えが早く得られます.
7.ヒストグラムの作成
まず、parsedのMatchDataレコードのmatchedフィールド値がtrueまたはfalseであるかどうかを計算する簡単なヒストグラムを作成してみましょう.幸いなことに、RDD[T]クラスはカウントクラスの演算効率が非常に高く、クライアントにMap[T,Long]タイプの結果を返すcountByValueという動作を定義しています.MatchDataレコードのmatchedフィールドマップにcountByValueを呼び出すとSparkジョブが実行され、クライアントに結果が返されます.val matchCounts=parsed.map(md => md.matched).countByValue()Sparkクライアントでヒストグラムを作成したり、他の類似の値グループを作成したりする場合、特に関連するタイプの変数に多くの値がある場合、ボタンのアルファベット順や値の個数順など、異なる方法でヒストグラムをソートしたいと考えています.また、ソートは昇順でも降順でも構いません.matchCountsというMapに含まれるキーはtrueとfalseだけですが、コンテンツを異なる方法でソートする方法を簡単に見たいと思います.ScalaのMapクラスは、コンテンツのキーや値に基づいてソートする方法を提供していませんが、MapをScalaのSeqタイプに変換することができ、Seqクラスはソートをサポートします.ScalaのSeqクラスは、JavaのListインタフェースと同様に反復可能な集合である、すなわち、所定の長さを有し、値を下付き文字で検索することができる:val matchCountsSeq=matchCounts.toSeq*Scala集合Scala集合クラスライブラリはlist、set、map、arrayなど膨大です.toList,toSet,toArray手法により,様々な集合タイプを容易に相互変換できる.matchCountsSeqシーケンスは(String,Long)タイプの要素からなり、sortBy法でどの指標でソートするかを制御することができる:matchCountsSeq.sortBy(_._1).foreach(println) … (false,5728201) (true,20931) matchCountsSeq.sortBy(_._2).foreach(println)…(true,20931)(false,5728201)デフォルトではsortBy関数は数値を昇順に並べ替えますが、降順並べ替えがヒストグラムデータに役立つ場合が多いです.シーケンス上でreverseメソッドを呼び出すことにより、印刷前にソート方式:matchCountsSeqを変更することができる.sortBy(_._2).reverse.foreach(println)…(false,5728201)(true,20931)データセット全体のマッチングカウントを見ると,マッチングした記録数と不マッチングした記録数の差が大きいことが分かった.0.4%未満の入力ペアのみが一致します.この相違は記録関連モデルに大きな影響を及ぼす:我々が提案した整合スコア関数の誤報率が高い可能性が高い(すなわち,多くの記録対は整合しているように見えるが,実際には整合していない).
未完待機