KoalasとElasticsearchが連携できるか試してみた


この記事は、NTTテクノクロスAdvent Calnder 2020の18日目です。

こんにちは、NTTテクノクロスの@yuyhiraka (平川) と申します。

普段は仮想化/コンテナ/クラウド基盤、小規模ネットワークあたりの先進技術のPoCを主に担当しています。

この記事に記載の内容は個人的な取り組みの内容であり、所属する組織とは関係ありません。

はじめに

KoalasとElasticsearchが連携できるかを試してみました。
世の中的にApache Spark TMとElasticsearchの連携を試されている方々がいるのでその応用となる動作検証となります。

Elasticsearchとは

Elasticsearchは検索・分析のための検索エンジンおよびデータベースおよびエコシステムです。

Elasticsearchについての参考情報

Apache Spark TMとは

Apache Spark TMは高速なビッグデータ用の分散処理フレームワークです。Pythonにも対応しており特にPySparkと呼びます。

Apache Spark TMについての参考情報

pandasとは

pandasはPython用の強力なデータ分析ライブラリです。

pandasについての参考情報

Koalasとは

KoalasはApache Spark TMでpandasライクのデータ操作が可能になるラッパーライブラリです。 Apache Spark TMにはSpark Dataset/DataFrameと呼ばれるPandasのDataFrameに近い概念が存在しますが各種APIが異なるためpandas ⇔ Spark Dataset/DataFrame間でオブジェクト変換した際に混乱します。それを解決するアプローチがKoalasになります。

Koalasについての参考情報

ElasticsearchとApache Spark TMの連携について

同じバージョンが振られていることからElasticsearch-Hadoopプラグイン (elasticsearch-hadoop 7.10) を使えばElasticsearch 7.10とHadoopエコシステム (Apache Spark TM, Koalasを含む) の連携ができそうです。一方で2020年12月現在においてElasticsearch-Hadoopプラグインを用いてElasticsearchとApache Spark TM 3.0.xの連携ができないようです。

そこで今回は以下を満たすApache Spark TM 2.4.7を利用することにします。

  • Koalasのサポートバージョン
  • Elasticsearch-Hadoopプラグインのサポートバージョン

Elasticsearch-HadoopプラグインとElasticsearchのバージョンについての参考情報

ElasticsearchとApache Spark TM 3.0.xの連携についての参考情報

KoalasのDependenciesについての参考情報

検証環境の情報について

マシンスペック

  • VirtualBox 6.1.10上のUbuntu 20.04 LTS
  • vCPU 6コア
  • vMem 32GB
  • SSD 100GB
  • via HTTP/HTTPS Proxy
    (※HTTP/HTTPS Proxyの各種設定については手順上省略します。)

Dockerバージョン

# docker version
Client: Docker Engine - Community
 Version:           20.10.0

Apache Spark TM 2.4.7のコンテナイメージを作成する

検証のための環境構築稼働を節約するためBuild an Image with a Different Version of Sparkを参考にPySpark2.4.7とJupyterLabがインストール済のコンテナイメージを作成します。

# mkdir ~/pyspark-notebook
# curl -O https://raw.githubusercontent.com/jupyter/docker-stacks/master/pyspark-notebook/Dockerfile
# mv Dockerfile ~/pyspark-notebook
# docker build --rm --force-rm \
    -t jupyter/pyspark-notebook:spark-2.4.7 ./pyspark-notebook \
    --build-arg spark_version=2.4.7 \
    --build-arg hadoop_version=2.7 \
    --build-arg spark_checksum=0F5455672045F6110B030CE343C049855B7BA86C0ECB5E39A075FF9D093C7F648DA55DED12E72FFE65D84C32DCD5418A6D764F2D6295A3F894A4286CC80EF478 \
    --build-arg openjdk_version=8

上記のベースイメージに対してElasticsearch-HadoopプラグインとKoalasをインストールするためのDockerfileを作成します。しかし、PySpark2.4はそのままだとPython3.8.xで動作しないため対策としてPython3.7.xのconda仮想環境を作っておきます。

※cloudpickle.pyを差し替えたうえで改造するという方法もあるようですが今回は試しません。

# mkdir ~/koalas-spark
# vi ~/koalas-spark/Dockerfile
FROM jupyter/pyspark-notebook:spark-2.4.7
USER root
RUN apt-get update
RUN apt-get install -y curl
USER jovyan
RUN mkdir ~/jars
RUN curl https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/7.10.1/{elasticsearch-hadoop-7.10.1.jar} --output "/home/jovyan/jars/#1"
RUN conda create -n py37 -c conda-forge python=3.7 jupyter pyspark=2.4 koalas=1.5 openjdk=8 -y

作成したDockerfileを使ってコンテナイメージを作成します。

# docker image build --rm --force-rm -t koalas-spark:0.1 ~/koalas-spark/

Elasticsearchのコンテナイメージをローカルに取得

大きめのコンテナイメージなので先に取得しておきます。

# docker pull elasticsearch:7.10.1

Docker Composeでコンテナ起動

必要ディレクトリ作成とdocker-compose.yamlの作成を行います。

# mkdir /opt/es
# mkdir /opt/koalas-spark/
# コンテナからアクセスできるようにパーミッションを緩める (手抜き)
# chmod 777 /opt/es /opt/koalas-spark/
# vi docker-compose.yaml
version: '3'

services:
  elasticsearch:
    image: elasticsearch:7.10.1
    container_name: elasticsearch
    environment:
      - discovery.type=single-node
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    ports:
      - 9200:9200
    volumes:
      - /opt/es/:/usr/share/elasticsearch/data
    networks:
      - devnet

  koalas-spark:
    build: ./koalas-spark
    container_name: koalas-spark
    working_dir: '/home/jovyan/work/'
    tty: true
    volumes:
      - /opt/koalas-spark/:/home/jovyan/work/
    networks:
      - devnet

networks:
  devnet:

Docker Composeを用いてKoalasコンテナとElasticsearchコンテナを立ち上げます。また、Elasticsearchコンテナが正常に起動していることを確認します。

# docker-compose build
# docker-compose up -d
# curl -X GET http://localhost:9200
{
  "name" : "6700fb19f202",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
  "version" : {
    "number" : "7.10.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_date" : "2020-12-05T01:00:33.671820Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"

Koalasコンテナの中に入る

起動しているコンテナ一覧の中からKoalasコンテナのコンテナIDを確認します。次にコンテナIDを指定しKoalasコンテナの中に入ります。 別解としてdocker-compose execを使う方法もあります。

# docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED          STATUS          PORTS                              NAMES
e33681a37aea   root_koalas-spark      "tini -g -- start-no…"   2 minutes ago    Up 2 minutes    8888/tcp                           koalas-spark
fe65e3351bea   elasticsearch:7.10.1   "/tini -- /usr/local…"   16 minutes ago   Up 16 minutes   0.0.0.0:9200->9200/tcp, 9300/tcp   elasticsearch
# docker exec -it e33681a37aea bash

curlコマンドを用いてKoalasコンテナからElasticsearchコンテナへの疎通の確認をします。

$ curl -X GET http://elasticsearch:9200
{
  "name" : "6700fb19f202",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "P-uVFNu6RZKKxdklnVypbw",
  "version" : {
    "number" : "7.10.1",
    "build_flavor" : "default",
    "build_type" : "docker",
    "build_hash" : "1c34507e66d7db1211f66f3513706fdf548736aa",
    "build_date" : "2020-12-05T01:00:33.671820Z",
    "build_snapshot" : false,
    "lucene_version" : "8.7.0",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"

KoalasからElasticsearchへの書き込み

引き続きKoalasコンテナで作業を進めます。

Python3.7環境に切り替えてPySpark (IPython) を起動しElasticsearchに対してデータの書き込みを行います。

今回はSpark RDDの機能を用いて4行4列のデータを作成しています。それを一度Spark DataFrameに変換し、さらにKoalas DataFrameに変換しています。

$ conda activate py37
$ export PYARROW_IGNORE_TIMEZONE=1
$ pyspark --jars /home/jovyan/jars/elasticsearch-hadoop-7.10.1.jar
import databricks.koalas as ks
import pandas as pd
import json, os, datetime, collections
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.types import *

esURL = "elasticsearch"

rdd1 = sc.parallelize([
    Row(col1=1, col2=1, col3=1, col4=1),
    Row(col1=2, col2=2, col3=2, col4=2),
    Row(col1=3, col2=3, col3=3, col4=3),
    Row(col1=4, col2=4, col3=4, col4=4)
])

df1 = rdd1.toDF()
df1.show()
kdf1 = ks.DataFrame(df1)
print(kdf1)

kdf1.to_spark_io(path="sample/test", 
    format="org.elasticsearch.spark.sql", 
    options={"es.nodes.wan.only": "false", 
    "es.port": 9200,
    "es.net.ssl": "false", 
    "es.nodes": esURL}, 
    mode="Overwrite")

PySpark (IPython) からCtrl + Dキー等で抜けます。そして、Elasticsearchにデータが格納されていることを確認します。

curl -X GET http://elasticsearch:9200/sample/test/_search?pretty
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 4,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "kaTbZXYBpKFycpUDLgjO",
        "_score" : 1.0,
        "_source" : {
          "col1" : 4,
          "col2" : 4,
          "col3" : 4,
          "col4" : 4
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "kKTbZXYBpKFycpUDLgjG",
        "_score" : 1.0,
        "_source" : {
          "col1" : 2,
          "col2" : 2,
          "col3" : 2,
          "col4" : 2
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "j6TbZXYBpKFycpUDLgjG",
        "_score" : 1.0,
        "_source" : {
          "col1" : 3,
          "col2" : 3,
          "col3" : 3,
          "col4" : 3
        }
      },
      {
        "_index" : "sample",
        "_type" : "test",
        "_id" : "jqTbZXYBpKFycpUDLgjD",
        "_score" : 1.0,
        "_source" : {
          "col1" : 1,
          "col2" : 1,
          "col3" : 1,
          "col4" : 1
        }
      }
    ]
  }
}

まとめ

上記の通りElasticsearch-Hadoopプラグインを用いてKoalasからElasticsearchにデータを投入できることを確認することができました。

当初の想定では

 PySpark (IPython) ⇒ JupyterLab

 Docker Compose ⇒ Kubernetes

で検証する予定だったのですが本質ではないところに時間をかけるわけにもいかなかったので今回は妥協しています。

おそらくJupyterLab/Kubernetesで実施しても同様にKoalasからElasticsearchに問題なくデータ投入することができるはずなので今後試してみたいと思います。また、要望がたくさん挙がってることから近いうちに対応しそうですがElasticsearch-HadoopプラグインがApache Spark TM 3.0.xでも利用できるようになることを強く望みます。

明日は@y-ohnukiによるNTTテクノクロスAdvent Calnder 2020の記事です。お楽しみに!