Cats Effect & circe でちょっとモダンな Elasticsearch プログラミング


elastic4s と Cats Effect & circe と併用して、Scala の Elasticsearch コードを少しだけモダンな関数型風味にしてみる記事。

はじめに

elastic4s という、わりと歴史のある Elasticsearch のための Scala クライアントがあるが、core のモジュールだけだと、実行の effectFuture だったり、検索結果が Map ベースの汎用的データ構造だったりする。

幸い付属モジュールで Cats EffectMonix、また circe などとの連携がサポートされているので、この記事ではそれらを使って、「記述と実行の分離」と「を活かしたドキュメント表現」を意識した、モダンな関数型 Scala スタイルでの Elasticsearch プログラミングを考えてみる。

お題と趣向

elastic4s の README.md に簡単なサンプルアプリが載っていて、以下のように Elasticsearch を操作をしている。

  1. Elasticsearch クライアントを生成。
  2. インデクス artists を作成。artists にはマッピング modern が、modern にはフィールド name が含まれる。
  3. artists/modern に、ドキュメント {"name": "L.S. Lowry"} を追加。
  4. artists 内を "lowry" で検索。
  5. 検索結果を表示。
  6. Elasticsearch クライアントをクローズ。

これをベースに、以下のような趣向で少し改善してみる。

  • Elasticsearch クライアントの生成・クローズとその使用は、Cats Effect の Resource として管理。
  • Future に替えて型パラメータ Sync[F] でプログラムを「記述」。「実行」するところで初めて F[_]IO を指定。
  • ドキュメントとケースクラスを circe の自動エンコーダ/デコーダで関連づけ。
    • ドキュメント追加時には、ケースクラスのインスタンスを elastic4s の DSL 構文に与える。
    • 検索結果表示時には、得られたドキュメントをケースクラスに変換してから表示する。

実装

elastic4s のバージョンは 6.5.1 を使った。その他のライブラリを含む依存関係は以下。

libraryDependencies ++= Seq(
  "core", "http", "circe", "cats-effect",
).map(s => "com.sksamuel.elastic4s" %% s"elastic4s-$s" % "6.5.1") ++ Seq(
  "org.typelevel" %% "cats-core" % "1.6.0",
  "org.typelevel" %% "cats-effect" % "1.3.0",
  "ch.qos.logback" % "logback-classic" % "1.2.3"
)

ソースはここにおいた。

Resource としての ElasticClient

elastic4s の Elasticsearch クライアント ElasticClient は、URI を指定した ElasticProperties を与えることで得られるが1、この記事ではクライアント使用後のクローズもまとめて Cats Effect の Resource として扱う。以下のように書いてみた。

def client: Resource[F, ElasticClient] = Resource.make {
  F.delay { ElasticClient(ElasticProperties("http://127.0.0.1:9200")) }
} (c => F.delay { c.close() })

FSync のインスタンスを持つ必要があるが、プログラムの他の部分でも必要なので、トレイト Program で以下のように指定している。

trait Program[F[_]] {
  implicit val F: Sync[F]
  def client: Resource[F, ElasticClient] = ???
  ...

以降、この ProgramSync[F] に依存する他のコードも書いていく。

Elasticsearch リクエストの定義

artists / modern でインデクスされるドキュメントは、以下のようなケースクラスで表すことにする。

case class ModernArtist(name: String)

インデクス生成、ドキュメントの追加/検索リクエストは以下のような DSL で書ける。

val createArtists: CreateIndexRequest = createIndex("artists") mappings {
  mapping("modern") fields textField("name")
}
val indexModern: IndexRequest =
  indexInto("artists" / "modern") source ModernArtist("L.S. Lowry") refreshImmediately

val searchModern: SearchRequest = search("artists") query "lowry"

indexModernsource ModernArtist("L.S. Lowry") の部分で、elastic4s-circe で提供される Indexable が使われるが、その際 circe の Encoder が使われる。

プログラムの記述

以上の ElasticClient リソースと Elasticsearch リクエストは、次のように合成できる。

def program(implicit U: Functor[F], E: Executor[F]): F[Unit] = client use { c =>
  for {
    _ <- c.execute(createArtists)            // インデクス作成
    _ <- c.execute(indexModern)              // ドキュメント追加
    _ <- c.execute(searchModern) >>= report  // ドキュメント検索とレポート
  } yield ()
}

ここで F は、上述の Sync だけではなく Functor2Executor のインスタンスも必要になってくるが、IO についてこれを提供するのが elastic4s-cats-effect モジュールということになる。

検索結果を表示する report は以下のように定義した。

private def puts(s: Any): F[Unit] = F.delay { println(s) }

def report(res: Response[SearchResponse]): F[Unit] = res match {
  case RequestFailure(_, _, _, e) => puts(s"We failed: $e")
  case RequestSuccess(_, _, _, r) => for {
    _ <- r.to[ModernArtist].toList.map(puts(_)).sequence // Cats の traverse 構文
    _ <- puts(s"There were ${r.totalHits} total hits")
  } yield ()
}

r.to[ModernArtist] のところで、elastic4s-circe で提供される HitReader[ModernArtist] が暗黙に参照されるが、HitReader 生成の際に、circe で自動的に導出される Decoder[ModernArtist] が使われる。

プログラムの実行

プログラムを「記述」したここまでのコードでは、あえて具体的な effect を明示せず、必要最小限の制約付きの型パラメータとして抽象化しているが、実行部分では具体的に決める必要がある。ここでは、Cats Effect から IO を採用してみた3

object ArtistIndex extends IOApp with Program[IO] {
  import com.sksamuel.elastic4s.cats.effect.instances._
  val F: Sync[IO] = Sync[IO]

  def run(args: List[String]): IO[ExitCode] = program as ExitCode.Success
}

おわりに

  • elastic4s-cats-effect を使うと、最近の関数型な Scala コードぽく Elasticsearch コードが書ける。
  • elastic4s-circe を使うと、ES ドキュメントとケースクラスの自動的なエンコード/デコードができる。
  • ただし、公式 Github のサンプルが陳腐化してたり、サンプルコードやテストコードが少なかったりして、最新情報を集めるのが意外と難しい。

  1. 本家のサンプルアプリでは LocalNode を使っているが、deprecated になっている。 

  2. Cats の Functor ではなく elastic4s 自前のそれ。 

  3. elastic4s-monix で使っている Monix は ver2.3.3 で、Cats Effect の型クラスのインスタンスが提供されていないらしく、同じことを Task でやろうとしたら、Sync[Task] を自前で書かなければならないっぽい。