golangはcsvを解析してelastic searchを導入します.

3706 ワード

今日はcsvファイルからelastic searchにデータを導入する小さいプログラムを書きましょう.
準備工作
gopathのsrcフォルダの下でcsv 2 esフォルダを作成し、main.goファイルを作成します.いくつかのcsvファイルをダウンロードします.
解析コマンドラインパラメータ
まず、私達はflags packageを使ってコマンドラインのパラメータを解析したいです.コードは以下の通りです.
func main() {
  //        
  host := flag.String("host", "http://localhost:9200", "host, e.g. http://localhost:9200")
  file := flag.String("file", "", "file path")
  esIndex := flag.String("index", "", "elastic search index")
  esType := flag.String("type", "", "elastic search type")
  flag.Parse()
  if *file == "" {
    fmt.Println("please set which csv file you want to import clearly")
    return
  }
  if *esIndex == "" {
    fmt.Println("please set elastic search index")
    return
  }
  if *esType == "" {
    fmt.Println("please set elastic search type")
    return
  }
...
}
主な解析のパラメータにはesのアドレスがあり、導入するファイルのパス、インポートするesのindexとtypeがあります.go buildを実行し、実行可能ファイルcsv 2 goを生成し、実行します.
csv2go -h
-file string
        file path (default "")
  -host string
        host, e.g. http://localhost:9200 (default "http://localhost:9200")
  -index string
        elastic search index (default "")
  -type string
        elastic search type (default "")
接続es
elasticこのオープンソースプロジェクトはelasticを接続するのを助けてくれます.
  //   es
  ctx := context.Background()
  client, err := elastic.NewClient(
    elastic.SetURL(*host),
    elastic.SetSniff(false))
  if err != nil {
    panic(err)
  }

  //   index    ,        index
  exists, err := client.IndexExists(*esIndex).Do(ctx)
  if err != nil {
    panic(err)
  }
  if !exists {
    createIndex, err := client.CreateIndex(*esIndex).Do(ctx)
    if err != nil {
      panic(err)
    }
  }
csvを解析してelastic searchに導入する
ここは注意が必要です.第一に、Macでは\rの最後のファイルの問題がありますので、macreaderというバッグを使って、io.Readerに包んでいます.興味がある学生は前の文章「mac上のファイルは毒があります」を見てもいいです.第二に、csvファイルの第一行為を黙認します.
  //   csv
  f, _ := os.Open(*file)
  r := csv.NewReader(macreader.New(bufio.NewReader(f)))
  keys, err := r.Read()
  for {                                            //1
    record, err := r.Read()
    if err == io.EOF {
      break
    }
    m := make(map[string]string)
    for i, key := range keys {
      m[key] = record[i]
    }
    jsonStr, err := json.Marshal(m)
    if err != nil {
      panic(err)
    }
    put1, err := client.Index().
            Index(*esIndex).
            Type(*esType).
            BodyString(string(jsonStr)).
            Do(ctx)
    if err != nil {
      // Handle error
      panic(err)
    }
    fmt.Printf("Indexed tweet %s to index %s, type %s
", put1.Id, put1.Index, put1.Type) } //2
ok、基本的にcsvのデータをelastic searchに導入するプログラムが完成しました.性能をテストしましょう.上の(1)行のコードの前にプラスします.
start := time.Now().Unix()
上の(2)行のコードの後にプラスします.
end := time.Now().Unix()
fmt.Println(end, start)
61567本の記録ファイルを走って、全部で36分走った.昼寝はもういいです.
能率を上げる
elastic searchにはbulk appiがあります.いくつかの操作を統合して、elastic searchに伝えながら処理して返します.
  //     Bulk
  bulkRequest := client.Bulk()
  for {
    ...
    //           IndexRequest   Bulk
    req := elastic.NewBulkIndexRequest().Index(*esIndex).Type(*esType).Doc(string(jsonStr))
    bulkRequest.Add(req)
  }
  //        
  bulkResponse, err := bulkRequest.Do(ctx)
  if err != nil {
  }
  indexed := bulkResponse.Indexed()
  fmt.Println(" es   ",len(indexed),"   ")
最適化後、同じ6万本以上のレコードを挿入すると数秒しかかかりません.クール.
本論文のコードはgithubで開始されました.ご利用またはご意見をどうぞ.