spark整理データをmysqlに入れる

2255 ワード

spark私はまだ初心者で、彼には特に詳しくありませんが、データを整理した後、テーブルを生成して欲しいデータを入れる必要があるので、2つの異なる方法を見つけました.
1.Listの生成
2.JavaRDD
最初の方法:(前の記事で生成したMapに接続)
List rows = new ArrayList<>();
Iterator> itMap = map.entrySet().iterator();
while(itMap.hasNext()){//  map
    Entry entry = itMap.next();
    int key = entry.getKey();
    List list2 = entry.getValue();
    for(int i = 0;i df_target = sparkSession.createDataFrame(rows, schema);
df_target.show();//    
df_target.write().mode(SaveMode.Append).jdbc(properties.getProperty("url_jdbc_master"),"analyze_point", this.properties);
//     ,   ,    

テーブル構造を作成します(テーブル内のフィールドタイプは、追加したコンテンツフィールドタイプと一致する必要があります):
StructType createSchema() {
StructType ret = new StructType()//           ,    
    .add(new StructField("orgId",DataTypes.IntegerType,true,Metadata.empty()).withComment("orgId"))
    .add(new StructField("morePhoneLogin", DataTypes.IntegerType, true, Metadata.empty()).withComment("xx"))
    .add(new StructField("openAccountMore", DataTypes.IntegerType, true, Metadata.empty()).withComment("xx"))
    .add(new StructField("remoteLogin", DataTypes.IntegerType, true, Metadata.empty()).withComment("xx"))
    .add(new StructField("time", DataTypes.TimestampType, true, Metadata.empty()).withComment("  "));
return ret;
}

2つ目:(dsAccountCount:mysql文に基づいてデータベースから整理されたデータを返し、データベース内の新しいテーブルに格納します.タイプはDataSetです)
JavaRDD rdd_target= dsAccountCount.toJavaRDD().map(t2->{
    return RowFactory.create(t2.get(0),t2.get(1),t2.get(2),new Timestamp(new java.util.Date().getTime()));//         
});
StructType schema = createSchema();//              
Dataset df_target = sparkSession.createDataFrame(rdd_target, schema);
df_target.show();
df_target.write().mode(SaveMode.Append).jdbc(this.properties.getProperty(PropertyKeys.URL_JDBC_MASTER),"analyze_point_operator_login", this.properties);