SparkのMLlibは、ベクトル、ラベルポイント、マトリクス、疎フォーマットファイルlibSVMの基本データ型を使用します.


Spark MLlibライブラリは多くの機械学習アルゴリズムを実現し,その基本的ないくつかのデータ型解釈とコードプレゼンテーションは以下の通りである.
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, IndexedRow, IndexedRowMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.junit.Test

class DataType {

  val conf = new SparkConf().setMaster("local[4]").setAppName("datatype").set("spark.testing.memory", "471859200")
  val sc = new SparkContext(conf)


  /**
   * local vector     
   *                         0            。MLlib           :     。
   */
  def localVectorDemo() = {
//             (1.0, 0.0, 3.0)
    val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
//            , 3   ,  (0, 2)      (1.0, 3.0)
    val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
//            , 3   ,      0  ,  1.0;       2  ,  3.0
    val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

  }

  /**
   * label point    
   *         (     )        
   */
  def labelPointDemo() = {
//          (1.0)                  
    val pos: LabeledPoint = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
//         (0.0)                 
    val neg: LabeledPoint = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
  }

  /**
   *     : MLlib   libSVM         
   *  libSVM   LIBSVM LIBLINEAR     ,        ,                 
   *    :label index1:value1, index2:vale2 ......
   */
  @Test
  def sparseDateDemo() = {
    val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
    examples.take(10).foreach(println)
  }

  /**
   * Local Matrix     
   *           Matrices,MLlib           RDD
   *                               
   */
  def localMatrixDemo() = {
    //            ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
    val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
    //            ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
    val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
  }

    /**
     * Distributed Maxtrix      
     */

    /**
     * RowMatrix    
     */
    @Test
  def rowMatrixDemo(): Unit = {
      //             RDD: RDD[Vector]
      val rows: RDD[Vector] = sc.parallelize(Seq(
        Vectors.dense(1.0, 2.0, 3.0),
        Vectors.dense(1.0, 4.0, 3.0),
        Vectors.dense(1.0, 5.0, 3.0),
        Vectors.dense(1.0, 6.0, 3.0)
      ))
      //      RDD[Vector]      
      val mat: RowMatrix = new RowMatrix(rows)
      //             、  
      val m = mat.numRows()
      val n = mat.numCols()
      println(f"  :${m},   :${n}")

      //    QR  
      mat.tallSkinnyQR(true)
  }

  /**
   * IndexedRowMatrix      :           ,          ,                   
   *           RDD[IndexedRow]    ,   IndexedRow (Long, Vector)    
   *                         
   */
  def indexdRowMatrixDemo = {
//         RDD: RDD[IndexedRow]
    val rows: RDD[IndexedRow] = sc.parallelize(Seq(
      IndexedRow(1L, Vectors.dense(1.0, 0.0, 3.0)),
      IndexedRow(2L, Vectors.dense(1.0, 1.0, 3.0)),
      IndexedRow(3L, Vectors.dense(1.0, 2.0, 3.0))
    ))
//        RDD[IndexedRow]           
    val mat = new IndexedRowMatrix(rows)
//          
    val m = mat.numRows()
    val n = mat.numCols()

//                    ,     。        
    val rowMat: RowMatrix = mat.toRowMatrix()
  }

  /**
   * CorrdinateMatrix      :        RDD,       (i:Long, j: Long, value: Double)   
   *  i     ,j     ,value     
   *                    ,  RDD[MatrixEntry]    
   *  MatrixEntry    Tuple     ,     、 、     
   */
  def corrdinRDD(): Unit = {
//       RDD[MatrixEntry]
    val entries: RDD[MatrixEntry] = sc.parallelize(Seq(
      MatrixEntry(0, 0, 1.1),
      MatrixEntry(0, 1, 1.2),
      MatrixEntry(1, 0, 1.3),
      MatrixEntry(1, 1, 1.4)
    ))
//             
    val mat = new CoordinateMatrix(entries)

//          
    val m = mat.numRows()
    val n = mat.numCols()

//                  
    val indexedRowMatrixConv = mat.toIndexedRowMatrix()
  }

  def blockMatrixDemo():Unit = {
    //       RDD[MatrixEntry]
    val entries: RDD[MatrixEntry] = sc.parallelize(Seq(
      MatrixEntry(0, 0, 1.1),
      MatrixEntry(0, 1, 1.2),
      MatrixEntry(1, 0, 1.3),
      MatrixEntry(1, 1, 1.4)
    ))
    //             
    val coordMat = new CoordinateMatrix(entries)

    // Transform the CoordinateMatrix to a BlockMatrix
    val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

    // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
    // Nothing happens if it is valid.
    matA.validate()

    // Calculate A^T A.
    val ata = matA.transpose.multiply(matA)
  }
}