golangはcsvを解析してelastic searchを導入します.
3706 ワード
今日はcsvファイルからelastic searchにデータを導入する小さいプログラムを書きましょう.
準備工作
gopathのsrcフォルダの下でcsv 2 esフォルダを作成し、main.goファイルを作成します.いくつかのcsvファイルをダウンロードします.
解析コマンドラインパラメータ
まず、私達はflags packageを使ってコマンドラインのパラメータを解析したいです.コードは以下の通りです.
elasticこのオープンソースプロジェクトはelasticを接続するのを助けてくれます.
ここは注意が必要です.第一に、Macでは\rの最後のファイルの問題がありますので、macreaderというバッグを使って、io.Readerに包んでいます.興味がある学生は前の文章「mac上のファイルは毒があります」を見てもいいです.第二に、csvファイルの第一行為を黙認します.
能率を上げる
elastic searchにはbulk appiがあります.いくつかの操作を統合して、elastic searchに伝えながら処理して返します.
本論文のコードはgithubで開始されました.ご利用またはご意見をどうぞ.
準備工作
gopathのsrcフォルダの下でcsv 2 esフォルダを作成し、main.goファイルを作成します.いくつかのcsvファイルをダウンロードします.
解析コマンドラインパラメータ
まず、私達はflags packageを使ってコマンドラインのパラメータを解析したいです.コードは以下の通りです.
func main() {
//
host := flag.String("host", "http://localhost:9200", "host, e.g. http://localhost:9200")
file := flag.String("file", "", "file path")
esIndex := flag.String("index", "", "elastic search index")
esType := flag.String("type", "", "elastic search type")
flag.Parse()
if *file == "" {
fmt.Println("please set which csv file you want to import clearly")
return
}
if *esIndex == "" {
fmt.Println("please set elastic search index")
return
}
if *esType == "" {
fmt.Println("please set elastic search type")
return
}
...
}
主な解析のパラメータにはesのアドレスがあり、導入するファイルのパス、インポートするesのindexとtypeがあります.go buildを実行し、実行可能ファイルcsv 2 goを生成し、実行します.csv2go -h
-file string
file path (default "")
-host string
host, e.g. http://localhost:9200 (default "http://localhost:9200")
-index string
elastic search index (default "")
-type string
elastic search type (default "")
接続eselasticこのオープンソースプロジェクトはelasticを接続するのを助けてくれます.
// es
ctx := context.Background()
client, err := elastic.NewClient(
elastic.SetURL(*host),
elastic.SetSniff(false))
if err != nil {
panic(err)
}
// index , index
exists, err := client.IndexExists(*esIndex).Do(ctx)
if err != nil {
panic(err)
}
if !exists {
createIndex, err := client.CreateIndex(*esIndex).Do(ctx)
if err != nil {
panic(err)
}
}
csvを解析してelastic searchに導入するここは注意が必要です.第一に、Macでは\rの最後のファイルの問題がありますので、macreaderというバッグを使って、io.Readerに包んでいます.興味がある学生は前の文章「mac上のファイルは毒があります」を見てもいいです.第二に、csvファイルの第一行為を黙認します.
// csv
f, _ := os.Open(*file)
r := csv.NewReader(macreader.New(bufio.NewReader(f)))
keys, err := r.Read()
for { //1
record, err := r.Read()
if err == io.EOF {
break
}
m := make(map[string]string)
for i, key := range keys {
m[key] = record[i]
}
jsonStr, err := json.Marshal(m)
if err != nil {
panic(err)
}
put1, err := client.Index().
Index(*esIndex).
Type(*esType).
BodyString(string(jsonStr)).
Do(ctx)
if err != nil {
// Handle error
panic(err)
}
fmt.Printf("Indexed tweet %s to index %s, type %s
", put1.Id, put1.Index, put1.Type)
} //2
ok、基本的にcsvのデータをelastic searchに導入するプログラムが完成しました.性能をテストしましょう.上の(1)行のコードの前にプラスします.start := time.Now().Unix()
上の(2)行のコードの後にプラスします.end := time.Now().Unix()
fmt.Println(end, start)
61567本の記録ファイルを走って、全部で36分走った.昼寝はもういいです.能率を上げる
elastic searchにはbulk appiがあります.いくつかの操作を統合して、elastic searchに伝えながら処理して返します.
// Bulk
bulkRequest := client.Bulk()
for {
...
// IndexRequest Bulk
req := elastic.NewBulkIndexRequest().Index(*esIndex).Type(*esType).Doc(string(jsonStr))
bulkRequest.Add(req)
}
//
bulkResponse, err := bulkRequest.Do(ctx)
if err != nil {
}
indexed := bulkResponse.Indexed()
fmt.Println(" es ",len(indexed)," ")
最適化後、同じ6万本以上のレコードを挿入すると数秒しかかかりません.クール.本論文のコードはgithubで開始されました.ご利用またはご意見をどうぞ.