SparkのMLlibは、ベクトル、ラベルポイント、マトリクス、疎フォーマットファイルlibSVMの基本データ型を使用します.
4938 ワード
Spark MLlibライブラリは多くの機械学習アルゴリズムを実現し,その基本的ないくつかのデータ型解釈とコードプレゼンテーションは以下の通りである.
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, IndexedRow, IndexedRowMatrix, MatrixEntry, RowMatrix}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.junit.Test
class DataType {
val conf = new SparkConf().setMaster("local[4]").setAppName("datatype").set("spark.testing.memory", "471859200")
val sc = new SparkContext(conf)
/**
* local vector
* 0 。MLlib : 。
*/
def localVectorDemo() = {
// (1.0, 0.0, 3.0)
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// , 3 , (0, 2) (1.0, 3.0)
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// , 3 , 0 , 1.0; 2 , 3.0
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
}
/**
* label point
* ( )
*/
def labelPointDemo() = {
// (1.0)
val pos: LabeledPoint = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// (0.0)
val neg: LabeledPoint = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
}
/**
* : MLlib libSVM
* libSVM LIBSVM LIBLINEAR , ,
* :label index1:value1, index2:vale2 ......
*/
@Test
def sparseDateDemo() = {
val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
examples.take(10).foreach(println)
}
/**
* Local Matrix
* Matrices,MLlib RDD
*
*/
def localMatrixDemo() = {
// ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
}
/**
* Distributed Maxtrix
*/
/**
* RowMatrix
*/
@Test
def rowMatrixDemo(): Unit = {
// RDD: RDD[Vector]
val rows: RDD[Vector] = sc.parallelize(Seq(
Vectors.dense(1.0, 2.0, 3.0),
Vectors.dense(1.0, 4.0, 3.0),
Vectors.dense(1.0, 5.0, 3.0),
Vectors.dense(1.0, 6.0, 3.0)
))
// RDD[Vector]
val mat: RowMatrix = new RowMatrix(rows)
// 、
val m = mat.numRows()
val n = mat.numCols()
println(f" :${m}, :${n}")
// QR
mat.tallSkinnyQR(true)
}
/**
* IndexedRowMatrix : , ,
* RDD[IndexedRow] , IndexedRow (Long, Vector)
*
*/
def indexdRowMatrixDemo = {
// RDD: RDD[IndexedRow]
val rows: RDD[IndexedRow] = sc.parallelize(Seq(
IndexedRow(1L, Vectors.dense(1.0, 0.0, 3.0)),
IndexedRow(2L, Vectors.dense(1.0, 1.0, 3.0)),
IndexedRow(3L, Vectors.dense(1.0, 2.0, 3.0))
))
// RDD[IndexedRow]
val mat = new IndexedRowMatrix(rows)
//
val m = mat.numRows()
val n = mat.numCols()
// , 。
val rowMat: RowMatrix = mat.toRowMatrix()
}
/**
* CorrdinateMatrix : RDD, (i:Long, j: Long, value: Double)
* i ,j ,value
* , RDD[MatrixEntry]
* MatrixEntry Tuple , 、 、
*/
def corrdinRDD(): Unit = {
// RDD[MatrixEntry]
val entries: RDD[MatrixEntry] = sc.parallelize(Seq(
MatrixEntry(0, 0, 1.1),
MatrixEntry(0, 1, 1.2),
MatrixEntry(1, 0, 1.3),
MatrixEntry(1, 1, 1.4)
))
//
val mat = new CoordinateMatrix(entries)
//
val m = mat.numRows()
val n = mat.numCols()
//
val indexedRowMatrixConv = mat.toIndexedRowMatrix()
}
def blockMatrixDemo():Unit = {
// RDD[MatrixEntry]
val entries: RDD[MatrixEntry] = sc.parallelize(Seq(
MatrixEntry(0, 0, 1.1),
MatrixEntry(0, 1, 1.2),
MatrixEntry(1, 0, 1.3),
MatrixEntry(1, 1, 1.4)
))
//
val coordMat = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)
}
}