Elastic4s の SearchIterator で ElasticSearch から大量のデータを取得する

2023年3月12日 engineering

こんにちは、 @kz_morita です。

Scala で ElasticSearch から大量の件数のデータを取得する必要があり実装をしました。 ライブラリは Elastic4s を使用しています。

要求として、1万件以上の数の多いデータを ElasticSearch から取得する必要があったのですが、デフォルトでは 1万件ずつしか取得できません。 軽く調べたところアプリケーション側で繰り返し取得するか、ElasticSearch の Scroll API を使用する必要がありそうでした。

Elastic4s に ScrollAPI への繰り返しリクエストを Wrap してくれる SearchIterator があったのでそれを利用してみました。

SearchIterator を使った取得

以下のような SearchIterator を使用したコードで1万件以上のデータを取得することができました。

import com.sksamuel.elastic4s._
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.http.JavaClient
import scala.util.{Failure, Success, Using}

// ...
// ..
// .

// ElasticSearch の Client を生成
private def buildClient: ElasticClient = ElasticClient(JavaClient(ElasticProperties(s"https://$host")))

// ElasticSearch から取得した結果から読み込むための case class
case class FetchItemIdResult(itemId: String)
object FetchItemIdResult {
    implicit val hitReader: HitReader[FetchItemIdResult] = hitReaderWithCirce[FetchItemIdResult]
}

// itemCategory から itemId を種痘する
override def fetchItemIds(itemCategory: ItemCategory): IO[Error, Seq[ItemId]] = {
  ZIO.fromEither {
    val response = Using(buildClient) { client =>
      implicit val timeout = 180.seconds
      SearchIterator
        .iterate[FetchItemIdResult](
          client,
          search(index)
            .bool(
              must(
                termQuery("itemCategory", itemCategory.value)
              )
            )
            .sourceInclude("itemId")
            .sortBy(fieldSort("itemid"))
            .from(0)
            .size(10000)
            .scroll("1m")
          )
          .toList
   }
   
   response match {
     case Failure(exception) => Left(RepositoryError(exception))
     case Success(res: List[FetchItemIdResult]) =>
       Right(res.map(_.itemId).map(ItemId))
     }
   }
}

上記の例は、サンプルコードですが、Item という何らかの商品があり、それを ItemCategory で取得してきて、ItemId を取得するといったコードになります。

SearchIterator クラスの iterate メソッドは以下のような実装になっています。

  def iterate[T](client: ElasticClient, searchReq: SearchRequest)(implicit
                                                                      reader: HitReader[T],
                                                                      timeout: Duration): Iterator[T] = 
    hits(client, searchReq).map(_.to[T])

第一引数に、ElasticSearch の Client を、第二引数に検索リクエストを受け取ります。implicit として、HitReader と timeout 時間である Duration 型が必要です。

  implicit val timeout = 180.seconds

implicit で受け取っている変数はそれぞれ以下のように宣言しています。

// ElasticSearch から取得した結果から読み込むための case class
case class FetchItemIdResult(itemId: String)
object FetchItemIdResult {
    implicit val hitReader: HitReader[FetchItemIdResult] = hitReaderWithCirce[FetchItemIdResult]
}

hitReader は、ElasticSearch で hit したドキュメントのレスポンスの Json から case class へと変換する処理になります。Json ライブラリの ここでは Elastic4s のライブラリで用意されている Circe を利用した hitReader を使用しています。

iterate メソッドの中で呼ばれている、hits メソッドをみると内部で、ElasticSearch から返される scrollId がある場合はそれを使用して実行していることがわかります。

↓上記 GitHub から引用します。

def fetchNext(): Iterator[SearchHit] = {

  // we're either advancing a scroll id or issuing the first query w/ the keep alive set
  val f = scrollId match {
    case Some(id) => client.execute(searchScroll(id, searchreq.keepAlive.get))
    case None     => client.execute(searchreq)
  }

  val resp = Await.result(f, timeout)

  // in a search scroll we must always use the last returned scrollId
  val response = resp match {
    case RequestSuccess(_, _, _, result) => result
    case failure: RequestFailure         => sys.error(failure.toString)
  }

  scrollId = response.scrollId
  response.hits.hits.iterator
}

検索リクエストは以下のように送っています。

search(index)
  .bool(
    must(
      termQuery("itemCategory", itemCategory.value)
      .value)
    )
  )
  .sourceInclude("itemId")
  .sortBy(fieldSort("itemid"))
  .from(0)
  .size(10000)
  .scroll("1m")
)

色々条件やらを指定していますが、scroll("1m") と keepAlive を設定しつつ呼ぶことで、Scroll リクエストとしています。

このようにすると、10000 件を超えるデータでも、SearchIterator が内部で逐次 ScrollAPI を利用してデータを取得してくれるため取得することができました。

まとめ

今回は、Elastic4s を使って Scala で ElasticSearch から 10000件以上の大量のデータを fetch する方法についてまとめました。 ElasticSearch のクエリの形式を知っていれば、Elastic4s は DSL があり直感的にかけるのでとても便利です。

さらに、今回あつかった SearchIterator のような便利な Wrapper も用意されているため非常に楽に実装することができました。

この記事をシェア