Apache Sparkでバイナリファイルのデータを16進数文字列に変換し、レコード分割する方法


環境

  • mac上のdocker(コンテナイメージはcloudera/quickstart)
  • CDH 5.7
  • spark 1.6

準備

1.hdfs上の/tmp/in配下に適当なデータを配置する
2.spark-shellを起動する
(spark-shellはrootで実行すると、permissionエラーがでて面倒なため、hdfsユーザで起動する)

実行するsparkプログラム

//binaryからHexStrにConvert機能を使用するためにimport(Javaのライブラリ)
import javax.xml.bind.DatatypeConverter

//1レコード当たりのカラム区切り位置
//この場合1レコードは2+4+6+4=16バイトとなる
var record_split_arr = Array(2,4,6,4)

//SparkのbinaryRecordsメソッドを使用して、引数1のパス配下のバイナリファイルを読み込み、
//引数2のレコード長でバイナリデータを分割する。
//第1引数はHDFS上のバイナリファイルが保存されたパスで、
//パス配下に1つ以上のバイナリファイルが存在すること
var binary_rdd = sc.binaryRecords("/tmp/in/",record_split_arr.sum/2)

//読み込んだバイナリデータを16進数文字列に変換する
var hex_string_rdd = binary_rdd.map{case(bytes) => DatatypeConverter.printHexBinary(bytes)}

//とりあえずキャッシュしたがさほど意味があるか分かっていない
var hex_string_rdd_cache = hex_string_rdd.cache

//16進数文字列のレコードに分割したrddを、カンマ区切りのカラム単位に分割する
var hex_string_records_rdd = hex_string_rdd_cache.map{str =>
  var record_str_arr = new Array[String](record_split_arr.size)
  var length_postion = 0
  for (i <- 0 until record_split_arr.length){
     record_str_arr(i) = str.substring(length_postion, length_postion +record_split_arr(i))
     length_postion += record_split_arr(i)
  }
  record_str_arr.mkString(",")
}

//先頭3レコードの中身を確認
hex_string_records_rdd.collect.take(3).foreach(println)

//カンマ区切りcsvのテキストファイルとして保存する
hex_string_records_rdd.saveAsTextFile("/tmp/out")

--テキストファイル用テーブル
--impala等でテーブル作成すれば、データを検索する可能
create table sample_table
(
a1 string ,
a2 string ,
a3 string ,
a4 string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
LOCATION '/tmp/out';