akka-httpの中で使われている FastFuture
が面白かったので紹介します。
Scalaの標準の scala.concurrent.Future
と基本的な挙動は同じですが、パフォーマンス面で有利になるような実装になっています。
既存のFutureのどこがパフォーマンス的に不利なのか
Future(123).map(_ * 2).map(_ + 234)
のような処理があった時、123
を生成するスレッドと、その値を2倍するスレッドと、 234
を加算するスレッドが別になる。これは flatMap
にも言えて
for { a <- Future { calculateA() } b <- Future { calculateB(a) } c <- Future { calculateC(b) } } yield c
みたいな処理があった時、 calculateA, calculateB, calculateC
3つのメソッドが別のスレッドで実行される。 map
でチェーンするのも、flatMap
をチェーンさせる(for文)のも Future
が作られるもののこの書き方だと直列に実行されるので別スレッドを割り当てるのはコンテキストスイッチを発生するのとCPUのキャッシュが汚染されるのでパフォーマンス的に無駄がある。
型をあわせるだけの目的であれば Future.successful
を使えばスレッドが割り当てられないので型をあわせるだけならこれが使われることが多い。
しかしながら、実は Future#map
を呼ぶと内部で別スレッドの割当が行われるので Future.successful
を使っていてもスレッドの切り替わりは完全に防げているわけではない。
例えば、 scala.concurrent.Future
の実装
def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity) val p = Promise[S]() onComplete { v => p complete (v map f) } p.future }
map
した中身を処理するときに新たに Future
を生成しているのでスレッドの切り替わりが発生している。
akka.http.scaladsl.util.FastFuture
実装
implicit class EnhancedFuture[T](val future: Future[T]) extends AnyVal { def fast: FastFuture[T] = new FastFuture[T](future) }
このimplicitを使って Future#fast
を呼べば FastFuture
が作れる。
Future(123).fast
=> FastFuture[Int]
FastFuture
の一番のポイントは以下で
private case class FulfilledFuture[+A](a: A) extends Future[A] { ... } private case class ErrorFuture[+A](a: A) extends Future[A] { ... } def map[B](f: A ⇒ B)(implicit ec: ExecutionContext): Future[B] = transformWith(a ⇒ FastFuture.successful(f(a)), FastFuture.failed) def transformWith[B](s: A ⇒ Future[B], f: Throwable ⇒ Future[B])(implicit executor: ExecutionContext): Future[B] = { def strictTransform[T](x: T, f: T ⇒ Future[B]) = try f(x) catch { case NonFatal(e) ⇒ ErrorFuture(e) } future match { case FulfilledFuture(a) ⇒ strictTransform(a, s) case ErrorFuture(e) ⇒ strictTransform(e, f) case _ ⇒ future.value match { case None ⇒ val p = Promise[B]() future.onComplete { case Success(a) ⇒ p completeWith strictTransform(a, s) case Failure(e) ⇒ p completeWith strictTransform(e, f) } p.future case Some(Success(a)) ⇒ strictTransform(a, s) case Some(Failure(e)) ⇒ strictTransform(e, f) } } }
transformWith
メソッド内で Future
を継承した FulfilledFuture
もしくは ErrorFuture
であれば Future
ではなく Try
として実行される。それ以外(ふつうのFuture)であれば通常の Future
と同じ挙動をする。
いいこと
Future#fast
を呼ぶだけで FastFuture
が作れて、これを使うと別スレッドの割り当てが発生しないので直列に実行される場合の Future
のチェーンにおいて無駄なコンテキストスイッチが発生せずパフォーマンス面で効率がよい。
わるいこと
サービス層とリポジトリ層で ExecutionContext
を分けている場合に意図的に割り当てるスレッドプールを変えたいような場合でもスレッドが切り替わらないのでimplicitで ExecutionContext
を渡してあげても意図したとおりにスレッドが変わらない。
scala.concurrent.Futureに取り込まれないのか?
議論はありました。が、 map
だけ特別扱いするのはおかしいということになり、取り込まれてはいません。
https://contributors.scala-lang.org/t/upates-to-scala-concurrent-future-wrt-breaking-changes/1281/26