Extensible Effectsでトランザクションモナド“Fujitask”を作る

はじめに

ScalaやHaskellなどではモナドを利用して副作用1を抽象化するということがしばしば行われる。FujitaskはScalaで実装された1つのモナドで、データベースへのアクセスに関するトランザクション制御を抽象化した。ところがモナドはReader[Future[Either[Error, Option[A]]]]のようにいくつものモナドが入れ子になってしまったとき、Scalaのfor式のようなモナド構文では内側のモナドへのアクセスが難しくなってしまう。 この問題へのアプローチとして有名なものにモナドトランスフォーマーがある。あるモナドトランスフォーマーTは任意モナドMを引数に取ってT[M]となる新しい1つのモナドとなり、このモナドT[M]TMの両方のモナドの能力を持つ。たとえばエラーと成功を表すようなモナドトランスフォーマーEitherTと、非同期実行を表すモナドFutureを合成したEitherT[Future]Future[Either]のような能力となり、これはEitherFutureの機能を同時に使えるようなモナドとなる。ただしモナドトランスフォーマーはEitherに対してEitherTReaderに対してReaderTなど次々と生みだしていく必要がある。 Extensible Effectsとはモナドトランスフォーマーをよりよく改良したものであり、モナドトランスフォーマーのような新しい構造を必要とせずに異なるモナドを合成できる。ScalaではExtensible Effectsの実装としてatnos-effkits-effがあり、今回はkits-effを利用してExtensible EffectsによるFujitaskを実装した。 この記事ではまずFujitaskについて簡単な解説を行い、そして今回のExtensible Effects版Fujitaskの利用例を見せる。そして具体的な実装を紹介し、最後にまとめと参考文献の紹介を行う。

この記事について間違った点などがあれば気軽にコメントなどで教えてほしい。なお完全なソースコードは下記のリポジトリに置かれている。

Fujitaskとは?

Fujitaskはデータベースなどのトランザクションを管理するモナドであり、次のような特徴を持つ。

  • トランザクションが読み込みなのか、読み込みと書き込みの両方ができるのかを型レベルの計算により決定する

たとえばFujitaskでは読み込みしかできないReadトランザクションと、読み込みと書き込みができるReadWriteトランザクションを合成すると、型レベルの計算によりReadWriteのトランザクションとなる。実現の詳しい解説は割愛するが、これはサブタイプにより実現されている。それゆえにHaskell由来のExtensible Effectsへ持ち込めるのかは興味深い話題であり、かつて色々と考えたもののうまくいかなかった。 kits-effではモナドスタック2を型の積として表すため、こちらであればサブタイプを利用したFujitaskを作成できると考えた。atnos-effが用いるような型レベルのリスト的な構造ではFujitaskのように2つ型の共通のサブタイプを見つけてそれに置き換えるといったことが難しい。一方でkits-effではwithを利用した型の積を利用しているので、うまく工夫することでサブタイプを利用することができるのではないかと考えた。

利用例

解説のまえに、まずはどのように利用できるのかを見てみる。普通のモナドと同じようにfor-yield式の中で利用できる。

case class User(id: Long, name: String)
// create table `user` (
//   `id` bigint not null auto_increment,
//   `name` varchar(64) not null
// )

val logger: Logger = LoggerFactory.getLogger(Main.getClass)

val eff1 = for {
  user1 <- userRepository.read(1L)
  _     <- userRepository.create("test")
  user2 <- userRepository.read(1L)
} yield {
  logger.info(s"user1 is $user1")
  logger.info(s"user2 is $user2")
}
Fujitask.run(eff1)

この結果は次のようになる。

02:08:31.610 [run-main-1] INFO  repository.impl.jdbc.package$ - ReadWriteRunner begin --------->
02:08:31.672 [scala-execution-context-global-82] INFO  Main$ - user1 is None
02:08:31.672 [scala-execution-context-global-82] INFO  Main$ - user2 is Some(User(1,test))
02:08:31.674 [scala-execution-context-global-82] INFO  repository.impl.jdbc.package$ - <--------- ReadWriteRunner end

id = 1となるユーザーが存在しないため最初はNoneとなり、その後userRepository.createでユーザーを作ったことでuser2ではSomeとなっている。ここでは読み込み(read)と書き込み(create)を行っているためログとしてReadWriteRunner begin/ReadWriteRunner endが出力されている。 この後たとえば次のように読み込みだけを行ってみる。

val eff2 = for {
  user3 <- userRepository.read(1L)
} yield {
  logger.info(s"user3 is $user3")
}
Fujitask.run(eff2)
02:08:31.675 [scala-execution-context-global-84] INFO  repository.impl.jdbc.package$ - ReadRunner begin --------->
02:08:31.676 [scala-execution-context-global-104] INFO  Main$ - user3 is Some(User(1,test))
02:08:31.677 [scala-execution-context-global-104] INFO  repository.impl.jdbc.package$ - <--------- ReadRunner end

さきほど作ったユーザーが表示された。ここでは読み込みしか行っていないのでReadRunner begin/ReadRunner endがログに出力されている。

これだけではモナド版Fujitaskと変わりないが、Extensible Effects版はモナドトランスフォーマーを定義することなくたとえばReaderモナドと組み合わせて次のように使うことができる。

val eff3 = for {
  name <- Reader.ask[String]
  user <- userRepository.create(name)
  user4 <- userRepository.read(user.id)
} yield {
  logger.info(s"user4 is $user4")
}
Fujitask.run(Reader.run("piyo")(eff3))
02:20:15.892 [scala-execution-context-global-134] INFO  repository.impl.jdbc.package$ - ReadWriteRunner begin --------->
02:20:15.893 [scala-execution-context-global-134] INFO  Main$ - user4 is Some(User(2,piyo))
02:20:15.893 [scala-execution-context-global-133] INFO  repository.impl.jdbc.package$ - <--------- ReadWriteRunner end

Reader.runの引数piyonameに持つユーザーが作成され、それが表示されている。かつ読み込み・書き込みの両方なのでReadWriteRunnerが利用されている。 このFujitaskは、Reader以外にもkits-effで定義されている任意のモナドと組み合わせることができる。

実装

ここでは詳しい実装について解説する。ただモナド版Fujitaskの記事と比較すると、ほとんどが同じように作られていることが分かると思う。

エフェクトの定義

まずはFujitaskを表すエフェクト3Fujitaskを定義する。

sealed abstract class Fujitask extends Product with Serializable

object Fujitask {
  final case class Execute[A](f: ExecutionContext => Future[A])
    extends Fujitask with Fx[A]
  
  abstract case class Transaction() extends Fujitask with Fx[Transaction]
  
  final case class Ask[I <: Transaction]() extends Fujitask with Fx[I]
}
Execute
ExecutionContextを利用して最終的な計算を行う。
Ask
Readerモナドのaskに相当し、トランザクション内のデータベースセッションを取得する。
Session
具体的なトランザクション用の情報のための抽象的なデータ構造を表す。

Askがあるのはモナド版のFujitaskと変わらず、Executeはモナド版のTask.applyに相当する。 またこれらをインスタンシエイトするための関数を次のように作っておく。

object Fujitask {
  def apply[I, A](a: => A): Eff[I, A] =
    Eff(Execute(Future(a)(_)))

  def ask[R <: Transaction, I <: R]: Eff[R, I] =
    Eff(Ask())
}

トランザクションの定義

次にモナド版Fujitaskと同様に抽象的なトランザクションと具体的なトランザクションを定義する。トランザクションの管理に必要なデータベースとのセッションなどはデータベースの種類やデータベース用のライブラリーに依存する。そこで抽象的な部分でReadトランザクションやReadWriteトランザクションなどと定義しておいて、具体的なものとしてたとえばScalikeJDBCの実装を利用するものを定義することで、インターフェースと実装を分離しやすくなる。 まずは抽象的なトランザクションの定義である。

trait ReadTransaction extends Transaction

trait ReadWriteTransaction extends ReadTransaction

Transactionはさきほど作成したFujitask.Transactionである。Transactionが別の場所にあること除いて、モナド版Fujitaskと全く同じである。 次に今回は例としてScalikeJDBCを利用する。

trait ScalikeJDBCTransaction extends Transaction {
  val ctx: DBSession
}

class ScalikeJDBCReadTransaction(val ctx: DBSession)
    extends ScalikeJDBCTransaction
      with ReadTransaction

class ScalikeJDBCWriteTransaction(override val ctx: DBSession)
  extends ScalikeJDBCReadTransaction(ctx)
    with ReadWriteTransaction

これについてもモナド版とほとんど同じである。

リポジトリ層の定義

次に実際にデータベースへアクセスする部分のインターフェースと実装を作っていく。いま次のようなテーブルuserによってユーザーのデータを表すUserがある

create table `user` (
  `id` bigint not null auto_increment,
  `name` varchar(64) not null
)
case class User(id: Long, name: String)

これへの操作のインターフェースを次のように定義する。

trait UserRepository {
  def create(name: String): Eff[ReadWriteTransaction, User]

  def read(id: Long): Eff[ReadTransaction, Option[User]]

  def update(user: User): Eff[ReadWriteTransaction, Unit]

  def delete(id: Long): Eff[ReadWriteTransaction, Unit]
}

ここではインターフェースなので抽象的なトランザクションを表すReadTransactionReadWriteTransactionを利用している。そして具体的な実装は次のようになる。


class UserRepositoryImpl extends UserRepository {
  def create(name: String): Eff[ReadWriteTransaction, User] =
    Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
      implicit val session: DBSession = i.ctx

      val sql = sql"""insert into user (name) values ($name)"""
      val id = sql.updateAndReturnGeneratedKey.apply()
      User(id, name)
    }

  def read(id: Long): Eff[ReadTransaction, Option[User]] =
    Fujitask.ask map { (i: ScalikeJDBCReadTransaction) =>
      implicit val session: DBSession = i.ctx

      val sql = sql"""select * from user where id = $id"""
      sql.map(rs => User(rs.long("id"), rs.string("name"))).single.apply()
    }

  def update(user: User): Eff[ReadWriteTransaction, Unit] =
    Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
      implicit val session: DBSession = i.ctx

      val sql = sql"""update user set name = ${user.name} where id = ${user.id}"""
      sql.update.apply()
    }

  def delete(id: Long): Eff[ReadWriteTransaction, Unit] =
    Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
      implicit val session: DBSession = i.ctx

      val sql = sql"""delete user where id = $id"""
      sql.update.apply()
    }
}

Fujitask.askによってScalikeJDBCのセッションを取得し、それを用いて具体的なSQLを発行する。

FujitaskRunnerの定義

FujitaskRunnerという型クラスを次のように定義する。

trait FujitaskRunner[I] {
  def apply[A](task: I => Future[A]): Future[A]
}

これは型パラメーターIを引数にとりFuture[A]となるような関数taskを受けとり、その結果をとしてFuture[A]を返す。ScalikeJDBCによる実装は次のようになる。

package object jdbc {
  lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)

  implicit def readRunner[I >: ReadTransaction](implicit ec: ExecutionContext): FujitaskRunner[I] =
    new FujitaskRunner[I] {
      def apply[A](task: I => Future[A]): Future[A] = {
        logger.info("ReadRunner begin --------->")
        val session = DB.readOnlySession()
        val future = task(new ScalikeJDBCReadTransaction(session))
        future.onComplete { _ =>
          logger.info("<--------- ReadRunner end")
          session.close()
        }
        future
      }
    }

  implicit def readWriteRunner[I >: ReadWriteTransaction](implicit ec: ExecutionContext): FujitaskRunner[I] =
    new FujitaskRunner[I] {
      def apply[A](task: I => Future[A]): Future[A] = {
        logger.info("ReadWriteRunner begin --------->")
        val future = DB.futureLocalTx(session => task(new ScalikeJDBCWriteTransaction(session)))
        future.onComplete(_ =>
          logger.info("<--------- ReadWriteRunner end")
        )
        future
      }
    }
}

このように型パラメーターの下限を利用することでReadトランザクション時にはDB.readOnlySession()で読み込み専用のセッションを取得し、そして一方でReadWriteトランザクションのときはDB.futureLocalTxでトランザクションを発生させる。 型パラメーターIは、後述するインタープリターの実行するコードの呼び出し部分でScalaのコンパイラーがコンパイル時に適切に計算する。

インタープリターの定義

インタープリターは実際にエフェクトスタックに積まれたエフェクトを解析して、副作用を生じさせる。たとえばFutureを表すようなエフェクトがあるなら、実際にスレッドを起動するとか、あるいはファイルへのIOを行うエフェクトがあるならばscala.io.Sourceなどを利用してファイルへアクセスするなどである。今考えているFujitaskはデータベースのトランザクションなので、ここでデータベースへトランザクションを発行するといったことが行われる。 これが最も複雑ではあると思うが、次のようになっている。

object Fujitask {
  def run[I <: Transaction: Manifest, A](
    eff: Eff[I, A]
  )(
    implicit runner: FujitaskRunner[I],
    ec: ExecutionContext
  ): Future[A] = {
    def handle(i: I) = new ApplicativeInterpreter[Fujitask, Any] {
      override type Result[T] = Future[T]

      def pure[T](a: T): Eff[Any, Result[T]] = Eff.Pure(Future.successful(a))

      def flatMap[T, B](fa: Fujitask with Fx[T])(k: T => Eff[Any, Future[B]]): Eff[Any, Future[B]] =
        fa match {
          case Execute(f) =>
            Eff.Pure(f(ec).flatMap(a => Eff.run(k(a))))
          case _: Ask[I] =>
            k(i.asInstanceOf[T])
        }

      def ap[T, B](fa: Fujitask with Fx[T])(k: Eff[Any, Result[T => B]]): Eff[Any, Result[B]] =
        fa match {
          case Execute(f) =>
            Eff.Pure(f(ec).flatMap(a => Eff.run(k).map(_(a))))
          case _: Ask[I] =>
            k.map(_.map(_(i.asInstanceOf[T])))
        }

      def map[T, B](fa: Future[T])(k: T => B): Future[B] = fa.map(k)
    }

    runner(i => Eff.run(handle(i)(eff.asInstanceOf[Eff[Fujitask, A]])))
  }
}

まず最下部ではさきほどのTaskRunnerを利用している。iにはScalikeJDBCReadTransactionなどの具体的なトランザクションが入ることになる。 適切なTaskRunnerが呼びだされることを説明するため、型Eff[R, A]について述べる。Rはエフェクトスタック4を表し、Aはトランザクション内で実行された計算の結果を表す。上記の関数runは型パラメーターI <: Transaction: ManifestAを取り、引数にeff: Eff[I, A]を取っている。Manifestについては本質的ではないので説明しないが、この型Iは型Transactionのサブタイプであるので、エフェクトスタックそのものがTransactionのサブタイプでなければならない。 この型パラメーターIが、たとえばReadTransactionReadWriteTransactionflatMapしたならばReadWriteTransactionになる必要がある。いまEff[R, A]flatMapは次のように実装されている。

def flatMap[S, B](f: A => Eff[S, B]): Eff[R with S, B] =
  this match {
    case Eff.Pure(v) =>
      f(v)
    case Eff.Impure(u, k) =>
      Eff.Impure(u.extend[S], k :+ f)
  }

エフェクトスタックRと別のエフェクトスタックSを合成してR with Sとしている。より具体的に、次のような例を考えてみる。

  • Eff[ReadTransaction, A]Eff[ReadTransaction, B]を合成した場合、Eff[ReadTransaction with ReadTransaction, B]となる。このとき

    \[ \def\RT{\color{blue}{\text{ReadTransaction}}\;} \def\RWT{\color{red}{\text{ReadWriteTransaction}}\;} \def\with{\text{with}\;} \RT =\, \RT \with \RT \]

    となり5ここではrunnerとしてreadRunnerが選択される

  • 同様にいくつかの合成の結果Eff[ReadTransaction with ReadWriteTransaction with ReadTransaction with ReadTransaction, A]となった場合

    \[ \RWT <:\, \RT \]

    なので、

    \[ \RWT =\, \RT \with \RWT \with \RT \with \RT \]

    となり、runnerとしてreadWriteRunnerが選択される

このようにしてサブタイプによる型の交わりと型クラスを利用することで巧妙に適したFujitaskRunnerのインスタンスを選択している。

さて、次にインタープリターの内部について説明する。ApplicativeInterpreterをインスタンシエイトしており、この中で説明するべきところはflatMapメソッドである。

def flatMap[T, B](fa: Fujitask with Fx[T])(k: T => Eff[Any, Future[B]]): Eff[Any, Future[B]] =
  fa match {
    case Execute(f) =>
      Eff.Pure(f(ec).flatMap(a => Eff.run(k(a))))
    case _: Ask[I] =>
      k(i.asInstanceOf[T])
  }

faとはエフェクトであり、この型Fujitask with Fx[T]は値faについての特徴を次のように説明している。

  • Fujitaskのエフェクトである(Fujitask
  • 継続(残りの計算)kを開始するために必要な値の型がTである(Fx[T]

そしてfaに基づいて継続kを処理することができる。faExecuteのケースとAskのケースで場合わけをして、次のように処理を行っている。

Execute(f)の場合

関数f: ExecutionContext => Future[A]runが呼び出されたときに渡されたExecutionContextを利用して起動する。すると当然Future[A]となる値が得られる。さて、Executeの定義はこのようであった。

final case class Execute[A](f: ExecutionContext => Future[A])
  extends Fujitask with Fx[A]

Fx[A]を継承しているので、k: A => Eff[Any, Future[B]]となる。いまFuture[A]f(ec)で得られたため、これをflatMapすることでAの値(a)を得られる。あとはk(a)で継続を起動すればよい。kの返り値はEff[Any, Future[B]]であるが、Eff.runによって「もしk(a)の返り値のエフェクトスタックが空になっているならば、結果の型Future[B]の値を取り出す」ということができる6。これにより処理が終了する。

Askの場合

Askは次のようであった。

final case class Ask[I <: Transaction]() extends Fujitask with Fx[I]

Fx[I]なので継続kを呼びだすための引数の型TIであるということになる。したがって継続kFujitaskRunnerから渡されたiを渡して起動するだけでよい7

まとめ

このようにしてExtensible Effects版のFujitaskを実装することができた。しかしこの実装は次のような問題があるのではないか?という指摘もある。

  • Transactionはエフェクトを表現するが、これがsealedされていないのでユーザーが任意のエフェクトを作ってしまう可能性がある

エフェクトを任意に作ることができてしまうので、このエフェクトのインタープリター(Fujitaskで言えばFujitask.run)が意図しないようなエフェクトをエフェクトスタックに詰めこめてしまう。するとそのエフェクトはインタープリターによって処理されず、Eff.runがランタイムエラーを生じさせる可能性がある。 まだよく考えたわけではないものの、もしTransactionを継承したよく分からないエフェクトがあったとしても、適切なFujitaskRunner[I]が見つからずにコンパイルエラーとなるような予感もある。これについてはもう少し考えてみて、インタープリターが処理できないかつランタイムエラーとなるような状態が発生するかを検証しようと思う。

謝辞

Extensible Effects版Fujitaskを作るにあたって、@halcat0x15aさんにはとても有用なコメントやコードの修正など様々な協力をしていただいた。

参考文献


  1. 最近は「副作用」という言葉ではなくて「計算効果(Computational effect)」という言葉で表現されることもある。

  2. 複数種類のモナドをリストとして表現したものであり、たとえばReader[Future[Either[Error, Option[A]]]]であればStack(Reader, Future, Either, Option)というようなイメージである。

  3. エフェクトとは副作用(計算効果)を抽象化したものである。モナドは副作用を抽象化する方法の1つであることから、ここではモナドのようなものと思えばよい。Extensible Effectsは(1)エフェクトをスタックに詰み、計算の合成を行うが、この時点ではまだスレッドを起動したりデータベースへ接続したりといった具体的な副作用は生じさせない。そして次に(2)インタープリターによってエフェクトが入ったスタックを解析し、それに基づいた副作用を生じさせる。 3つのデータ構造は次のような意味を持っている。

  4. エフェクト(たとえばFujitaskなど)をモナドスタックのように並べたものである。モナドスタックと似ているが一応この記事ではモナドはエフェクトの具体的な実装であると区別しているため、呼び分けることとした。

  5. それほど重要ではないが、型$A, B$が等しい($A = B$)とは$A$は$B$のサブタイプかつ$B$は$A$のサブタイプである($A = B \Leftrightarrow (A <: B \land B <: A)$ことを指す。また$A <: B$とは型$A$は型$B$のサブタイプであることを表す

  6. もしエフェクトスタックが空ではない値eff: Eff[R, A]によってEff.run(eff)を実行した場合、ランタイムエラーを送出する。

  7. パターンマッチの結果Askとなった時点でT = Iという型を決定できそうだが、Scalaの現時点でのコンパイラーはそれを検出してくれない。そのためコンパイルを通すためここではk(i.asInstanceOf[T])としている。

コメント