scalaでjsonネストjson
1751 ワード
最近では、神策を使うときに、自分のデータの一部を神策に取り込むようになりました
問題:
1.hdfsImporterはデータを直接kuduにインポートできないため、ユーザデータはすべてkuduに存在する.だからkafkaからkuduまで
現在のメカニズムは、hdfsImporterがインポートしたユーザーデータは、kafkaを経由するものであり、これは最下位のメカニズムであり、一時的に変更しにくい.
解決:そちらで購読した後、対応する条件に従って、ユーザー画像のデータをフィルタリングする必要があります.
2.神策導入メカニズムはデータフォーマットに対して非常に厳格で、ここにjsonがjsonをネストするメカニズムがある.
問題:
1.hdfsImporterはデータを直接kuduにインポートできないため、ユーザデータはすべてkuduに存在する.だからkafkaからkuduまで
現在のメカニズムは、hdfsImporterがインポートしたユーザーデータは、kafkaを経由するものであり、これは最下位のメカニズムであり、一時的に変更しにくい.
解決:そちらで購読した後、対応する条件に従って、ユーザー画像のデータをフィルタリングする必要があります.
2.神策導入メカニズムはデータフォーマットに対して非常に厳格で、ここにjsonがjsonをネストするメカニズムがある.
val value = readDF.rdd.map(p => {
val distinct_id = p.getAs[String]("distinct_id")
val `type` = "profile_set"
val time = p.getAs[Long]("time")
val project = "default"
val a= p.getAs[String]("a")
val b = p.getAs[String]("b")
val properties = Properties(a,b)
val ups = Ups(distinct_id, time, `type`, project, properties)
val gson = new Gson()
val jsonStr: String = gson.toJson(ups)
jsonStr
})
// hdfs
val writePath = getWritePath(startDate)
// ,
val bool = HdfsUtil.pathIsExist(writePath)
if (bool) {
val conf = new Configuration()
val fs = FileSystem.get(conf)
fs.delete(new Path(writePath), true)
value.repartition(10).saveAsTextFile(writePath)
} else {
value.repartition(10).saveAsTextFile(writePath)
}
}
def getWritePath(dateStr: String) = {
val finallyPath = "hdfs "
finallyPath
}
case class Properties(
a:String,
b: String
)
case class Ups(
distinct_id: String,
time: Long,
`type`: String,
project: String,
properties: Properties
)