Spark SQL統合ElasticSearchの事例が実戦

2387 ワード

Spark SQL統合ElasticSearchの事例が実戦
ElasticSearch概念の回顧
ElasticSearchはLuceneに基づく検索サーバです。これは、RESTfulウェブインターフェースに基づいて、分散型多ユーザ能力の全文検索エンジンを提供します。ElasticsearchはJavaで開発され、Apacheの許可条項の下のオープンソースとして発表され、現在流行している企業級の検索エンジンです。クラウドコンピューティングに使用する設計で、リアルタイム検索、安定、信頼性、高速、便利なインストールができます。
SparkとElasticSearchの統合
Javaバージョン
public class SparkESJavaOps {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName(SparkESJavaOps.class.getSimpleName()).setMaster("local");
        // spark     es    
        conf.set("es.index.auto.create","true");
        //   spark   es url   
        conf.set("es.nodes","master");
        conf.set("es.port","9200");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD  jsonRDD = sc.textFile("E:/test/scala/sqldf.json");
        // es     
        JavaEsSpark.saveJsonToEs(jsonRDD,"spark3/person");
        // es     
        /***
         * es        
         * index/type/id
         * es
         * index -->db
         * type -->table
         * document -->row
         * field --> column
         * esJsonRDD      JavaPairRDD  -->t._1  es    document id,    
         * t._2  document  
         * */
        JavaEsSpark.esJsonRDD(sc,"spark3/person").foreach(new VoidFunction>(){
            @Override
            public void call(Tuple2 tuple) throws Exception {
                System.out.println(tuple._1()+" ===> " +tuple._2());
            }
        });

    }
}
Scaraバージョン
/**
  * Spark   ES    
  */
object SparkESOps extends App{
  val conf  = new SparkConf().setAppName("SparkESOps").setMaster("local")
  // spark     es    
  conf.set("es.index.auto.creaete","true")
  //   Spark     es url   
  conf.set("es.nodes","master")
  conf.set("es.port","9200")
  val sc = new SparkContext(conf)

  // es     
  val linesRDD = sc.textFile("E:/test/scala/sqldf.json")
  //     resources     es           :index/type
  linesRDD.saveJsonToEs("spark2/person")
  //    
  val personRDD = sc.esJsonRDD("spark2/person")
  personRDD.foreach(println)

  sc.stop()

}