Scala と ZIO による並行処理

2023年4月3日 engineering

こんにちは、 @kz_morita です。

今回は、Scala 向けの非同期処理ライブラリである ZIO について主に並行処理の使い方についてメモします。

主な並行処理

ZIO を用いて、並行処理のためのメソッドを以下に列挙します。

  • ZIO#zip, ZIO#zipPar
  • ZIO#zipWith, ZIO#zipWithPar
  • ZIO#tupled, ZIO#tupledPar
  • ZIO#foreach, ZIO#foreachPar
  • ZIO#reduceAll, ZIO#reduceAllPar
  • ZIO#mergeAll, ZIO#mergeAllPar

suffix に Par をつけると並列実行となります。

以下のような ZIO を返すメソッドで考えます。

private def numSeq(): ZIO[Clock, Throwable, Seq[Int]] = for {
    _ <- ZIO.effect(println("numSeq start"))
    _ <- zio.clock.sleep(1.seconds)
    _ <- ZIO.effect(println("numSeq end"))
} yield Seq(1,2,3,4,5)

private def stringSeq(): ZIO[Clock, Throwable, Seq[String]] = for {
    _ <- ZIO.effect(println("stringSeq start"))
    _ <- zio.clock.sleep(1.seconds)
    _ <- ZIO.effect(println("stringSeq end"))
} yield Seq("a","b","c","d","e")

ZIO#zip, ZIO#zipPar

zip は、2つの ZIO を返すメソッドの結果を 2要素の tuple にして返します。

    numSeq().zip(stringSeq())

結果は以下の感じ

numSeq start
numSeq end
stringSeq start
stringSeq end
(List(1,2,3,4,5), List(a, b, c, d, e))

zipPar は、上記を並行で実行します。

    numSeq().zipPar(stringSeq())
numSeq start
stringSeq start
stringSeq end
numSeq end
(List(1,2,3,4,5), List(a, b, c, d, e))

ZIO#zipWith, ZIO#zipWithPar

zipWith は、結果を merge する (A, B) => C となるメソッドを第二引数に取る zip です。

    numSeq().zipWith(stringSeq())((nums, strings) => nums.zip(strings))

結果は以下の感じです。

numSeq start
numSeq end
stringSeq start
stringSeq end
List((1, a),(2, b),(3, c),(4, d),(5, e))

zipWithPar

    numSeq().zipWith(stringSeq())((nums, strings) => nums.zip(strings))
numSeq start
stringSeq start
stringSeq end
numSeq end
List((1, a),(2, b),(3, c),(4, d),(5, e))

ZIO#tupled, ZIO#tupledPar

tupled は、ほぼ zip と似てますが、複数の要素も tuple で返すことができます。

    ZIO.tupled(numSeq(), stringSeq())

結果は以下の感じです。

numSeq start
numSeq end
stringSeq start
stringSeq end
(List(1,2,3,4,5), List(a, b, c, d, e))

tupledPar

    ZIO.tupledPar(numSeq(), stringSeq())
numSeq start
stringSeq start
stringSeq end
numSeq end
(List(1,2,3,4,5), List(a, b, c, d, e))

ZIO#foreach, ZIO#foreachPar

foreach は、Collection をとって ZIO を返すような挙動ができます。

    ZIO.foreach(Set(1,2,3))(_ => numSeq())

結果は以下のように、numSeqメソッドを 3 回実行します。

numSeq start
numSeq end 
numSeq start
numSeq end 
numSeq start
numSeq end 
Set(List(1,2,3,4,5))

foreachPar で並行処理です。

numSeq start
numSeq start
numSeq start
numSeq end 
numSeq end 
numSeq end 
Set(List(1,2,3,4,5))

ZIO#reduceAll, ZIO#reduceAllPar

reduceAll は、複数の ZIO を処理して1つの結果を返します。

    ZIO.reduceAll(ZIO.succeed(Nil), Seq(numSeq(), numSeq(), numSeq())) { (num1, num2) =>
        num1 ++ num2
    }
numSeq start
numSeq end 
numSeq start
numSeq end 
numSeq start
numSeq end 
List(1,2,3,4,5,1,2,3,4,5,1,2,3,4,5)

reduceAllPar

    ZIO.reduceAll(ZIO.succeed(Nil), Seq(numSeq(), numSeq(), numSeq())) { (num1, num2) =>
        num1 ++ num2
    }
numSeq start
numSeq start
numSeq start
numSeq end 
numSeq end 
numSeq end 
List(1,2,3,4,5,1,2,3,4,5,1,2,3,4,5)

ZIO#mergeAll, ZIO#mergeAllPar

mergeAll は、複数の ZIO を merge した値を返します。

    ZIO.mergeAll(Seq(numSeq(), stringSeq(), numSeq()))(Seq[String]()) { (acc, num) =>
        acc ++ num.map(_.toString)
    }
numSeq start
numSeq end 
stringSeq start
stringSeq end 
numSeq start
numSeq end 
List(1,2,3,4,5,a,b,c,d,e,1,2,3,4,5)

mergeAllPar

    ZIO.mergeAllPar(Seq(numSeq(), stringSeq(), numSeq()))(Seq[String]()) { (acc, num) =>
        acc ++ num.map(_.toString)
    }
numSeq start
numSeq start
stringSeq start
stringSeq end 
numSeq end 
numSeq end 
List(a,b,c,d,e,1,2,3,4,5,1,2,3,4,5)

並行でなげるので早くおわる stringSeq が先に List に入っているため順序が変わってます。

まとめ

今回は、ZIO の並行処理周りをまとめました。 非同期処理を書く際に、なれると ZIO 使いやすいためどんどん使っていきたいなと思います。

この記事をシェア