JavaのFutureをScalaのFutureへ変換する

はじめに

ScalaでJavaのライブラリを利用することがしばしばあるが、ScalaのFutureとJavaのFutureは別である。Scalaを利用する時はJavaのFutureではなく、ScalaのFutureを利用したい。この記事ではJavaのFutureをScalaのFutureへ変換する方法について述べる。なお、この記事の完全なコードは次のGitHubリポジトリに置かれている。

表記

この記事ではScalaのFutureとJavaのFutureを次のように区別する。

import java.util.concurrent.{Future => JavaFuture}
import scala.concurrent.{Future => ScalaFuture}

つまり、JavaFuture[A]はJavaのFutureであり、一方でScalaFuture[A]はScalaのFutureである。

直感的な実装

Stackoverflowなどでは次のようなコードで変換できると書かれている。

object BrokenJavaFutureConverter {
  def toScala[A](jf: JavaFuture[A])(implicit ec: ExecutionContext): ScalaFuture[A] = {
    val p: Promise[A] = Promise[A]

    ec.execute(
      new Runnable {
        override def run() =
          p.complete(
            Try(jf.get())
          )
      }
    )

    p.future
  }

  implicit class RichJavaFuture[A](val jf: JavaFuture[A]) extends AnyVal {
    def asScala(implicit ec: ExecutionContext): ScalaFuture[A] = toScala(jf)
  }
}

これはJavaのFutureをExecutionContextに基づいて実行しScalaのPromiseで受け取ってそれでScalaのFutureを返すというコードである。これで一見良さそうで、次のように正常に動作するように見える。

class BrokenJavaFutureConverterSpec extends WordSpec {
  import BrokenJavaFutureConverter._

  trait SetupWithFixedThreadPool {
    val timeout = Duration(1, TimeUnit.SECONDS)

    val threadPool: ExecutorService = Executors.newFixedThreadPool(1)

    val executor: Executor = new ExecutorFromExecutorService(threadPool)

    implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
  }

  "toScala" should {
    "return the value" in new SetupWithFixedThreadPool {
      val javaFuture: JavaFuture[Int] = threadPool.submit { () =>
        Thread.sleep(200)
        10
      }

      assert(Await.result(toScala(javaFuture), timeout) == 10)
    }
  }
}

一方で次のように、作成したScalaのFutureをrecoverしようとすると正しく動作しない。

"not be able to recover the exception" in new SetupWithFixedThreadPool {
  val javaFuture: JavaFuture[Int] = threadPool.submit{ () =>
    throw new TestException()
  }
 
  val recover = javaFuture.asScala.recover {
    case e: TestException => 10
  }
 
  assertThrows[ExecutionException](Await.result(recover, timeout))
}
class TestException(message: String = null, cause: Throwable = null)
  extends Exception(message, cause)

上記のテストではJavaのFutureは実行時に例外TestExceptionを送出し、それをScalaFuture#recoverで変換しているはずだが、これの結果をAwait#resultで取り出すと例外ExecutionExceptionが送出される。このように、JavaのFutureは実行時の例外をExecutionExceptionでラップしてしまう。これではScalaのFutureとしてrecoverがやりずらいので改善を考える。

ExecutionExceptionをアンラップする

次のようなコードで先ほどの問題を解決する。

object JavaFutureConverter {
  def toScala[A](jf: JavaFuture[A])(implicit ec: ExecutionContext): ScalaFuture[A] = {
    val p: Promise[A] = Promise[A]

    ec.execute { () =>
      p.complete(
        Try(jf.get()) match {
          case Failure(e: ExecutionException) =>
            Failure(e.getCause)
          case x =>
            x
        }
      )
    }

    p.future
  }

  implicit class RichJavaFuture[A](val jf: JavaFuture[A]) extends AnyVal {
    def asScala(implicit ec: ExecutionContext): ScalaFuture[A] = toScala(jf)
  }
}

これはJavaのFutureをgetしたものをTryで包み、もし結果がFailureでかつ例外の型がExecutionExceptionである場合は、getCauseを利用してアンラップするようにしている。こうすることで次のようにrecoverが動作する。

"be able to recover the exception" in new SetupWithFixedThreadPool {
  val javaFuture: JavaFuture[Int] = threadPool.submit { () =>
    throw new TestException()
  }
 
  val recover = javaFuture.asScala.recover {
    case e: TestException => 10
  }
 
  assert(Await.result(recover, timeout) == 10)
}

ForkJoinPoolによる問題

これでよいものができたかに見えたが、調べたところForkJoinPool.commonPool()といった方法で作成されたExecutorServiceを利用するとこれは次のように正しく動作しないことがあると分った。

trait SetupWithForkJoinPool {
  val timeout = Duration(1, TimeUnit.SECONDS)
 
  val forkJoinPool: ExecutorService = ForkJoinPool.commonPool()
 
  val executor: Executor = new ExecutorFromExecutorService(forkJoinPool)
 
  implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(executor)
}

"return RuntimeException despite it returns IOException if you use the ForkJoinPool executor" in new SetupWithForkJoinPool {
  val javaFuture: JavaFuture[Unit] = forkJoinPool.submit { () =>
    throw new IOException()
  }
 
  assertThrows[RuntimeException](Await.result(javaFuture.asScala, timeout))
}

これはIOExceptionを送出しているにも関わらず、RuntimeExceptionにラップされていることを示している。このように、このJavaFutureConverter#toScalaは利用するExecutorServiceによっては正しく動作しないことがあることに注意が必要である。もしこのコードを本番で利用する場合は、そのコードがどのような方法で作成したExecutorServiceを利用しているのかを調べて、そのExecutorServiceを利用したテストを実行してからこのコードを利用するべきである。

がくぞさんからのアドバイス

がくぞさんから次のようなコードでもよいという意見をいただいた。

JavaFutureConverter#toScalaExecutionContextを暗黙に受け取ってそれでJavaのFutureを実行するが、ScalaのFutureも同じような動作だったのでこちらの方がシンプルであるという理由でこちらに修正した。

object JavaFutureConverter {
  def toScala[A](jf: JavaFuture[A])(implicit ec: ExecutionContext): ScalaFuture[A] = {
    ScalaFuture(jf.get()).transform {
      case Failure(e: ExecutionException) =>
        Failure(e.getCause)
      case x => x
    }
  }
}

まとめ

JavaのFutureをScalaのFutureへ変換するのは思っていたよりも難しいということが分かった。もしこれよりもよい方法があれば、気軽にこの記事のコメントなどで指摘して欲しい。

コメント