SparkのUDTF
13548 ワード
UDTFはudtfをテストする.txtテキスト内容
要求出力
インプリメンテーションコード
01//zs//Hadoop scala spark hive hbase
02//ls//Hadoop scala kafka hive hbase Oozie
03//ww//Hadoop scala spark hive sqoop
要求出力
type
Hadoop
scala
kafka
hive
hbase
Oozie
インプリメンテーションコード
package nj.zb.kb09.sql
import java.util
import org.apache.hadoop.hive.ql.exec.UDFArgumentException
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory
import org.apache.hadoop.hive.serde2.objectinspector.{
ObjectInspector, ObjectInspectorFactory, StructObjectInspector}
import org.apache.spark.sql.SparkSession
object SparkUDTFDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.enableHiveSupport() // hive
.appName("SparkStudy").getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val lines = sc.textFile("in/udtf.txt")
val stuDF = lines.map(_.split("//"))
.filter(x => x(1).equals("ls"))
.map(x=>(x(0), x(1), x(2)))
.toDF("id", "name", "subject")
// stuDF.printSchema()
// stuDF.show()
stuDF.createOrReplaceTempView("student")
// ,as . ; hiveUDF .
spark.sql("CREATE TEMPORARY FUNCTION MyUDTF AS 'nj.zb.kb09.sql.myUDTF'")
val resDF = spark.sql("select MyUDTF(subject) from student")
resDF.printSchema()
resDF.show()
}
}
class myUDTF extends GenericUDTF {
override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
if (argOIs.length != 1) {
throw new UDFArgumentException(" ")
}
if (argOIs(0).getCategory != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(" ")
}
val fieldNames = new util.ArrayList[String]()
val fieldOIs = new util.ArrayList[ObjectInspector]()
fieldNames.add("type")
//
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs)
}
override def process(objects: Array[AnyRef]): Unit = {
// , ( )
val strings = objects(0).toString.split(" ")
println(strings)
for (str strings) {
val tmp = new Array[String](1)
tmp(0) = str
forward(tmp)
}
}
override def close(): Unit = {
}
}