SparkのUDTF

13548 ワード

UDTFはudtfをテストする.txtテキスト内容
01//zs//Hadoop scala spark hive hbase
02//ls//Hadoop scala kafka hive hbase Oozie
03//ww//Hadoop scala spark hive sqoop

要求出力
 type   
 Hadoop
 scala
 kafka
 hive
 hbase
 Oozie

インプリメンテーションコード
package nj.zb.kb09.sql

import java.util

import org.apache.hadoop.hive.ql.exec.UDFArgumentException
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{
     ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.sql.SparkSession

object SparkUDTFDemo {
     
  def main(args: Array[String]): Unit = {
     
    val spark = SparkSession.builder().master("local[*]")
      .enableHiveSupport()  // hive 
      .appName("SparkStudy").getOrCreate()

    import spark.implicits._
    val sc = spark.sparkContext

    val lines = sc.textFile("in/udtf.txt")
    val stuDF = lines.map(_.split("//"))
      .filter(x => x(1).equals("ls"))
      .map(x=>(x(0), x(1), x(2)))
      .toDF("id", "name", "subject")

//    stuDF.printSchema()
//    stuDF.show()

    stuDF.createOrReplaceTempView("student")
    
    // ,as   .    ; hiveUDF . 
    spark.sql("CREATE TEMPORARY FUNCTION MyUDTF AS 'nj.zb.kb09.sql.myUDTF'")
    val resDF = spark.sql("select MyUDTF(subject) from student")

    resDF.printSchema()
    resDF.show()

  }
}

class myUDTF extends GenericUDTF {
     

  override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
     
    if (argOIs.length != 1) {
     
      throw new UDFArgumentException(" ")
    }
    if (argOIs(0).getCategory != ObjectInspector.Category.PRIMITIVE) {
     
      throw new UDFArgumentException(" ")
    }
    val fieldNames = new util.ArrayList[String]()
    val fieldOIs = new util.ArrayList[ObjectInspector]()

    fieldNames.add("type")
    // 
    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)

    ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)

  }

  override def process(objects: Array[AnyRef]): Unit = {
     
    // , ( )
    val strings = objects(0).toString.split(" ")
    println(strings)
    for (str  strings) {
     
      val tmp = new Array[String](1)
      tmp(0) = str
      forward(tmp)
    }
  }
  override def close(): Unit = {
     }
}