sqoop 2単一引用符とカスタム区切り記号の除去
3438 ワード
1、缘由(需要)
oracleからhiveへのデータのインポートをしています.データの区切りの問題に遭遇しました.sqoop 2はデフォルトの区切り式カンマです.私はhiveを使ってもカンマ区切りで、データにカンマが含まれているフィールドを見つけました.hiveはそれをいくつかに分けました.そこでネット上でカスタム区切りの解決方法を探しましたが、仕方なくこの方面の資料は少ないです.この問題を解決しないと、仕事はできません.最後の法宝に勝って、ソースコードをコンパイルします.
2、あなたと使用しているバージョンのsqoop 2ソースコード(最良)をダウンロードし、mavenを使用して構築します(パッケージ化時にtestをスキップしてコンパイルしたほうがいいです.そうしないと、エラーが発生する可能性があります)
3、修正コード:
位置付け:hdfsに書くべきで、いわゆる私はこのhdfs-connectorのコードを書くべきだと知っていて、コードの中にデフォルトの区切り記号を定義していることを発見して、それをタブに変えて、コンパイル->アップロードした後、結果は変わっていません.次に考えてみると、発見類:org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriterの下にwriteメソッドがあり、HdfsConstantsがある.DEFAULT_RECORD_DELIMITER(行ごとのレコードのデフォルトの区切り)
これは、各レコードがカンマで区切られたデータです.次に、それを区切る方法を書きます(単一引用符をフラグとして、フィールドのカンマと非区切られたカンマを分けます).
4、以下はカンマ区切りから「t」区切りに変更されたコード(org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriterクラス全体)
5、パッケージ化、およびsqoop 2フォルダ/webapps/sqoop/WEB-INF/lib(バージョンによって違いがあるかもしれませんが、connectorのあるフォルダを選択してください)にアップロードする前にhdfs-connector、mvを他のフォルダ(保険のため)にアップロードし、削除(推奨しない)したり、hdfs-connectorの名前を変更(未試験)、sqoop 2を再起動したりすることができます.タスクを実行して効果を見ます.
6、完成.
oracleからhiveへのデータのインポートをしています.データの区切りの問題に遭遇しました.sqoop 2はデフォルトの区切り式カンマです.私はhiveを使ってもカンマ区切りで、データにカンマが含まれているフィールドを見つけました.hiveはそれをいくつかに分けました.そこでネット上でカスタム区切りの解決方法を探しましたが、仕方なくこの方面の資料は少ないです.この問題を解決しないと、仕事はできません.最後の法宝に勝って、ソースコードをコンパイルします.
2、あなたと使用しているバージョンのsqoop 2ソースコード(最良)をダウンロードし、mavenを使用して構築します(パッケージ化時にtestをスキップしてコンパイルしたほうがいいです.そうしないと、エラーが発生する可能性があります)
3、修正コード:
位置付け:hdfsに書くべきで、いわゆる私はこのhdfs-connectorのコードを書くべきだと知っていて、コードの中にデフォルトの区切り記号を定義していることを発見して、それをタブに変えて、コンパイル->アップロードした後、結果は変わっていません.次に考えてみると、発見類:org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriterの下にwriteメソッドがあり、HdfsConstantsがある.DEFAULT_RECORD_DELIMITER(行ごとのレコードのデフォルトの区切り)
これは、各レコードがカンマで区切られたデータです.次に、それを区切る方法を書きます(単一引用符をフラグとして、フィールドのカンマと非区切られたカンマを分けます).
4、以下はカンマ区切りから「t」区切りに変更されたコード(org.apache.sqoop.connector.hdfs.hdfsWriter.HdfsTextWriterクラス全体)
package org.apache.sqoop.connector.hdfs.hdfsWriter;
import com.google.common.base.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.sqoop.connector.hdfs.HdfsConstants;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
public class HdfsTextWriter extends GenericHdfsWriter {
private BufferedWriter filewriter;
@Override
public void initialize(Path filepath, Configuration conf, CompressionCodec codec) throws IOException {
FileSystem fs = filepath.getFileSystem(conf);
DataOutputStream filestream = fs.create(filepath, false);
if (codec != null) {
filewriter = new BufferedWriter(new OutputStreamWriter(
codec.createOutputStream(filestream, codec.createCompressor()),
Charsets.UTF_8));
} else {
filewriter = new BufferedWriter(new OutputStreamWriter(
filestream, Charsets.UTF_8));
}
}
@Override
public void write(String csv) throws IOException {
filewriter.write(transfer(csv) + HdfsConstants.DEFAULT_RECORD_DELIMITER);
}
@Override
public void destroy() throws IOException {
filewriter.close();
}
private String transfer(String csv){
String words[]=csv.split(",");
StringBuffer sb=new StringBuffer();
boolean flag1=false;
boolean flag2=false;
boolean flag3=false;
String temp="";
for (int i = 0; i < words.length; i++) {
if(words[i].equalsIgnoreCase("NULL")){
words[i]="-1";
}
flag1=words[i].startsWith("'");
flag2=words[i].endsWith("'");
if(flag1&&flag2){
sb.append(words[i]);
sb.append('\t');
}else if(flag1&&!flag2){
temp=words[i]+",";
flag3=true;
}else if(!flag1&&flag2){
temp=temp+","+words[i];
flag3=false;
sb.append(temp);
sb.append('\t');
}else{
if(flag3){
temp=temp+words[i];
}else{
sb.append(words[i]);
sb.append('\t');
}
}
}
return sb.toString().replace("'", "").trim();
//return sb.toString().trim();
}
}
5、パッケージ化、およびsqoop 2フォルダ/webapps/sqoop/WEB-INF/lib(バージョンによって違いがあるかもしれませんが、connectorのあるフォルダを選択してください)にアップロードする前にhdfs-connector、mvを他のフォルダ(保険のため)にアップロードし、削除(推奨しない)したり、hdfs-connectorの名前を変更(未試験)、sqoop 2を再起動したりすることができます.タスクを実行して効果を見ます.
6、完成.