Akka Httpの中で使われているFastFutureがおもしろかったので紹介

akka-httpの中で使われている FastFuture が面白かったので紹介します。 Scalaの標準の scala.concurrent.Future と基本的な挙動は同じですが、パフォーマンス面で有利になるような実装になっています。

既存のFutureのどこがパフォーマンス的に不利なのか

Future(123).map(_ * 2).map(_ + 234)

のような処理があった時、123 を生成するスレッドと、その値を2倍するスレッドと、 234 を加算するスレッドが別になる。これは flatMap にも言えて

for {
  a <- Future { calculateA() }
  b <- Future { calculateB(a) }
  c <- Future { calculateC(b) }
} yield c

みたいな処理があった時、 calculateA, calculateB, calculateC 3つのメソッドが別のスレッドで実行される。 map でチェーンするのも、flatMap をチェーンさせる(for文)のも Future が作られるもののこの書き方だと直列に実行されるので別スレッドを割り当てるのはコンテキストスイッチを発生するのとCPUのキャッシュが汚染されるのでパフォーマンス的に無駄がある。 型をあわせるだけの目的であれば Future.successful を使えばスレッドが割り当てられないので型をあわせるだけならこれが使われることが多い。 しかしながら、実は Future#map を呼ぶと内部で別スレッドの割当が行われるので Future.successful を使っていてもスレッドの切り替わりは完全に防げているわけではない。

例えば、 scala.concurrent.Future の実装

  def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S] = { // transform(f, identity)
    val p = Promise[S]()
    onComplete { v => p complete (v map f) }
    p.future
  }

map した中身を処理するときに新たに Future を生成しているのでスレッドの切り替わりが発生している。

akka.http.scaladsl.util.FastFuture

実装

https://github.com/akka/akka-http/blob/master/akka-http-core/src/main/scala/akka/http/scaladsl/util/FastFuture.scala

implicit class EnhancedFuture[T](val future: Future[T]) extends AnyVal {
  def fast: FastFuture[T] = new FastFuture[T](future)
}

このimplicitを使って Future#fast を呼べば FastFuture が作れる。

Future(123).fast
=> FastFuture[Int]

FastFuture の一番のポイントは以下で

private case class FulfilledFuture[+A](a: A) extends Future[A] { ... }
private case class ErrorFuture[+A](a: A) extends Future[A] { ... }

def map[B](f: A ⇒ B)(implicit ec: ExecutionContext): Future[B] =
  transformWith(a ⇒ FastFuture.successful(f(a)), FastFuture.failed)

def transformWith[B](s: A ⇒ Future[B], f: Throwable ⇒ Future[B])(implicit executor: ExecutionContext): Future[B] = {
  def strictTransform[T](x: T, f: T ⇒ Future[B]) =
    try f(x)
    catch { case NonFatal(e) ⇒ ErrorFuture(e) }

  future match {
    case FulfilledFuture(a) ⇒ strictTransform(a, s)
    case ErrorFuture(e)     ⇒ strictTransform(e, f)
    case _ ⇒ future.value match {
      case None ⇒
        val p = Promise[B]()
        future.onComplete {
          case Success(a) ⇒ p completeWith strictTransform(a, s)
          case Failure(e) ⇒ p completeWith strictTransform(e, f)
        }
        p.future
      case Some(Success(a)) ⇒ strictTransform(a, s)
      case Some(Failure(e)) ⇒ strictTransform(e, f)
    }
  }
}

transformWith メソッド内で Future を継承した FulfilledFuture もしくは ErrorFuture であれば Future ではなく Try として実行される。それ以外(ふつうのFuture)であれば通常の Future と同じ挙動をする。

いいこと

Future#fast を呼ぶだけで FastFuture が作れて、これを使うと別スレッドの割り当てが発生しないので直列に実行される場合の Future のチェーンにおいて無駄なコンテキストスイッチが発生せずパフォーマンス面で効率がよい。

わるいこと

サービス層とリポジトリ層で ExecutionContext を分けている場合に意図的に割り当てるスレッドプールを変えたいような場合でもスレッドが切り替わらないのでimplicitで ExecutionContext を渡してあげても意図したとおりにスレッドが変わらない。

scala.concurrent.Futureに取り込まれないのか?

議論はありました。が、 map だけ特別扱いするのはおかしいということになり、取り込まれてはいません。 https://contributors.scala-lang.org/t/upates-to-scala-concurrent-future-wrt-breaking-changes/1281/26