pythonでは、pysparkでHbaseデータを読み込み、dataframe形式に変換します
完了する必要があるポイントは、sparkを接続する構成を設定し、sparkによってhbaseからrdd形式にデータを読み込み、rdd形式のデータをdataframe形式に変換することです.
1、まずpyspark接続sparkの構成を設定する必要があり、spark接続にはsparkcontextとsparksessionの2つの方式があり、同時にこの2つの方式の間で互いに変換することができ、接続コードは以下の通りである.
(1)SparkConfとSparkContextで接続
(2)sparkSessionで接続して、個人的にはこの方式がもっと簡潔だと思って、一言で済むので、あははは
2、pysparkはHBASEのデータを読み、読み終わったデータはRDD形式で、コードは以下の通りである.
3、読み取ったrdd形式のデータをdataframe形式に変換し、後続の計算を容易にする
この1枚はかつて何度もピットを採取したが,たぶんhbaseの表構造(非関係型)と関係があるからである.hbase自体がrecordを生成するとき、一部のカラム値は値を付与しないことができ、sparkが読み込んだとき、それらの値はnullに自動的に付与されません.これにより、読み込まれた各レコードのカラム長が異なる可能性があります.hbaseにはいくつかのカラム値が保存されています.読み込まれたカラム値はいくつかのカラム値しかありません.
だからpyspark自体のrdd.toDF()という関数はまったく使えません.この関数は列数が同じデータにしか適していません.次のコードは、ネット上で見つかったバージョンのコードに基づいて修正改善されたコードです.
コードはすべてプロジェクトの中の記録なので、実行結果はありません.今ブログを書く環境にはコード実行の環境はありませんが、コードはすべてプロジェクトコードから直接掘り出したもので、親測で大丈夫です.
というかhbaseのデータを読むのは面倒なので、できれば(データ速度の要求があまり高くない)、hbaseのデータをHiveのテーブルにマッピングして(これは実はとても操作がいい)、pysparkがhiveのデータを読むのは直接dataframe形式で、このように苦労してフォーマットを変換しません.
もっともっと専門的な方法があれば、分かち合うことを期待します!
1、まずpyspark接続sparkの構成を設定する必要があり、spark接続にはsparkcontextとsparksessionの2つの方式があり、同時にこの2つの方式の間で互いに変換することができ、接続コードは以下の通りである.
(1)SparkConfとSparkContextで接続
from pyspark import SparkConf, SparkContext
spark_host = "spark://spark-master:7077" #spark , , local[x],x ,
app_name = "test"
# , 1g
spark_conf = SparkConf().setMaster(spark_host).setAppName(app_name).set("spark.executor.memory", "1g")
spark_context = SparkContext.getOrCreate(conf=spark_conf)
# sparkContext sparkSession
from pyspark.sql import SparkSession
spark_session = SparkSession(spark_context)
(2)sparkSessionで接続して、個人的にはこの方式がもっと簡潔だと思って、一言で済むので、あははは
from pyspark.sql import SparkSession
spark_host = "spark://spark-master:7077"
app_name = "test"
spark_session = SparkSession.builder.master(spark_host).appName(app_name).getOrCreate()
2、pysparkはHBASEのデータを読み、読み終わったデータはRDD形式で、コードは以下の通りである.
hbase_host = ""
table_name = ""
"""
(1)pyspark hbase hbase record row_key ,
(2)hbase row_key , '2019-04-29_' row_key 2019-04-29_ , stop 2019-04-30_ record ( stop record)
(3)hbase.mapreduce.scan.columns hbase { : } ,
"""
conf = {
"hbase.zookeeper.quorum": hbase_host,
"hbase.mapreduce.inputtable": table_name,
"hbase.mapreduce.scan.row.start": '2019-04-29_',
"hbase.mapreduce.scan.row.stop": '2019-04-30_',
"hbase.mapreduce.scan.columns": "family1:column1 family1:column2 family2:column1"
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
# pysaprk hbase sparkContext , spark_context
hbase_rdd = spark_context.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
# sparkSesssion hbase,
hbase_rdd = spark_session.saprkContext.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf)
3、読み取ったrdd形式のデータをdataframe形式に変換し、後続の計算を容易にする
この1枚はかつて何度もピットを採取したが,たぶんhbaseの表構造(非関係型)と関係があるからである.hbase自体がrecordを生成するとき、一部のカラム値は値を付与しないことができ、sparkが読み込んだとき、それらの値はnullに自動的に付与されません.これにより、読み込まれた各レコードのカラム長が異なる可能性があります.hbaseにはいくつかのカラム値が保存されています.読み込まれたカラム値はいくつかのカラム値しかありません.
だからpyspark自体のrdd.toDF()という関数はまったく使えません.この関数は列数が同じデータにしか適していません.次のコードは、ネット上で見つかったバージョンのコードに基づいて修正改善されたコードです.
"""
hbase_rdd record, , dict, dict
{column_name1:value1, column_name2:value2, column_name3:value3}, hbase ,
, “Null” , dataframe ( “Null”
, dataframe “Null” , filter , “!=Null”,
“isNotNull()” )
"""
import json
def row_transform(row_cells_info, hbase_structure):
row_cell_info_list = [json.loads(i) for i in row_cells_info]
row_dict = {}
hbase_index = 0
for cell_index in range(len(row_cell_info_list)):
column_name = row_cell_info_list[cell_index]['qualifier']
column_value = row_cell_info_list[cell_index]['value']
if hbase_structure[hbase_index] == column_name:
row_dict[column_name] = column_value
hbase_index += 1
else:
row_dict[hbase_structure[hbase_index]] = "Null"
for j in range(hbase_index + 1, len(hbase_structure)):
if hbase_structure[j] == column_name:
row_dict[column_name] = column_value
hbase_index = j + 1
break
else:
row_dict[hbase_structure[j]] = "Null"
for j in range(hbase_index, len(hbase_structure)):
row_dict[hbase_structure[j]] = "Null"
return row_dict
"""
HBase RDD DataFrame, hbase_structure hbase list, [column_name1,column_name2,column_name3]
hbase
"""
def rdd_to_df(hbase_rdd, hbase_structure):
# RowKey
, split,split dict
data_rdd_split = hbase_rdd.map(lambda x: (x[0], x[1].split('
')))
#
data_rdd_columns = data_rdd_split.map(lambda x: (x[0], row_transform(x[1], hbase_structure)))
data = data_rdd_columns.map(lambda x: [x[0]] + [x[1][i] for i in x[1]])
data_df = sess.createDataFrame(data, ["row_key"] + hbase_structure)
return data_df
if __name__ == '__main__':
hbase_rdd = load_from_hbase() # 1 2 hbase rdd
# rdd dataframe
hbase_structure = [column_name1,column_name2,column_name3]
hbase_df = rdd_to_df(hbase_rdd, hbase_structure)
コードはすべてプロジェクトの中の記録なので、実行結果はありません.今ブログを書く環境にはコード実行の環境はありませんが、コードはすべてプロジェクトコードから直接掘り出したもので、親測で大丈夫です.
というかhbaseのデータを読むのは面倒なので、できれば(データ速度の要求があまり高くない)、hbaseのデータをHiveのテーブルにマッピングして(これは実はとても操作がいい)、pysparkがhiveのデータを読むのは直接dataframe形式で、このように苦労してフォーマットを変換しません.
もっともっと専門的な方法があれば、分かち合うことを期待します!