はじめに
ScalaやHaskellなどではモナドを利用して副作用1を抽象化するということがしばしば行われる。FujitaskはScalaで実装された1つのモナドで、データベースへのアクセスに関するトランザクション制御を抽象化した。ところがモナドはReader[Future[Either[Error, Option[A]]]]のようにいくつものモナドが入れ子になってしまったとき、Scalaのfor式のようなモナド構文では内側のモナドへのアクセスが難しくなってしまう。 この問題へのアプローチとして有名なものにモナドトランスフォーマーがある。あるモナドトランスフォーマーTは任意モナドMを引数に取ってT[M]となる新しい1つのモナドとなり、このモナドT[M]はTとMの両方のモナドの能力を持つ。たとえばエラーと成功を表すようなモナドトランスフォーマーEitherTと、非同期実行を表すモナドFutureを合成したEitherT[Future]はFuture[Either]のような能力となり、これはEitherとFutureの機能を同時に使えるようなモナドとなる。ただしモナドトランスフォーマーはEitherに対してEitherTやReaderに対してReaderTなど次々と生みだしていく必要がある。 Extensible Effectsとはモナドトランスフォーマーをよりよく改良したものであり、モナドトランスフォーマーのような新しい構造を必要とせずに異なるモナドを合成できる。ScalaではExtensible Effectsの実装としてatnos-effやkits-effがあり、今回はkits-effを利用してExtensible EffectsによるFujitaskを実装した。 この記事ではまずFujitaskについて簡単な解説を行い、そして今回のExtensible Effects版Fujitaskの利用例を見せる。そして具体的な実装を紹介し、最後にまとめと参考文献の紹介を行う。
この記事について間違った点などがあれば気軽にコメントなどで教えてほしい。なお完全なソースコードは下記のリポジトリに置かれている。
Fujitaskとは?
Fujitaskはデータベースなどのトランザクションを管理するモナドであり、次のような特徴を持つ。
- トランザクションが読み込みなのか、読み込みと書き込みの両方ができるのかを型レベルの計算により決定する
たとえばFujitaskでは読み込みしかできないReadトランザクションと、読み込みと書き込みができるReadWriteトランザクションを合成すると、型レベルの計算によりReadWriteのトランザクションとなる。実現の詳しい解説は割愛するが、これはサブタイプにより実現されている。それゆえにHaskell由来のExtensible Effectsへ持ち込めるのかは興味深い話題であり、かつて色々と考えたもののうまくいかなかった。 kits-effではモナドスタック2を型の積として表すため、こちらであればサブタイプを利用したFujitaskを作成できると考えた。atnos-effが用いるような型レベルのリスト的な構造ではFujitaskのように2つ型の共通のサブタイプを見つけてそれに置き換えるといったことが難しい。一方でkits-effではwithを利用した型の積を利用しているので、うまく工夫することでサブタイプを利用することができるのではないかと考えた。
利用例
解説のまえに、まずはどのように利用できるのかを見てみる。普通のモナドと同じようにfor-yield式の中で利用できる。
case class User(id: Long, name: String)
// create table `user` (
// `id` bigint not null auto_increment,
// `name` varchar(64) not null
// )
val logger: Logger = LoggerFactory.getLogger(Main.getClass)
val eff1 = for {
user1 <- userRepository.read(1L)
_ <- userRepository.create("test")
user2 <- userRepository.read(1L)
} yield {
logger.info(s"user1 is $user1")
logger.info(s"user2 is $user2")
}
Fujitask.run(eff1)
この結果は次のようになる。
02:08:31.610 [run-main-1] INFO repository.impl.jdbc.package$ - ReadWriteRunner begin --------->
02:08:31.672 [scala-execution-context-global-82] INFO Main$ - user1 is None
02:08:31.672 [scala-execution-context-global-82] INFO Main$ - user2 is Some(User(1,test))
02:08:31.674 [scala-execution-context-global-82] INFO repository.impl.jdbc.package$ - <--------- ReadWriteRunner end
id = 1となるユーザーが存在しないため最初はNoneとなり、その後userRepository.createでユーザーを作ったことでuser2ではSomeとなっている。ここでは読み込み(read)と書き込み(create)を行っているためログとしてReadWriteRunner begin/ReadWriteRunner endが出力されている。 この後たとえば次のように読み込みだけを行ってみる。
val eff2 = for {
user3 <- userRepository.read(1L)
} yield {
logger.info(s"user3 is $user3")
}
Fujitask.run(eff2)
02:08:31.675 [scala-execution-context-global-84] INFO repository.impl.jdbc.package$ - ReadRunner begin --------->
02:08:31.676 [scala-execution-context-global-104] INFO Main$ - user3 is Some(User(1,test))
02:08:31.677 [scala-execution-context-global-104] INFO repository.impl.jdbc.package$ - <--------- ReadRunner end
さきほど作ったユーザーが表示された。ここでは読み込みしか行っていないのでReadRunner begin/ReadRunner endがログに出力されている。
これだけではモナド版Fujitaskと変わりないが、Extensible Effects版はモナドトランスフォーマーを定義することなくたとえばReaderモナドと組み合わせて次のように使うことができる。
val eff3 = for {
name <- Reader.ask[String]
user <- userRepository.create(name)
user4 <- userRepository.read(user.id)
} yield {
logger.info(s"user4 is $user4")
}
Fujitask.run(Reader.run("piyo")(eff3))
02:20:15.892 [scala-execution-context-global-134] INFO repository.impl.jdbc.package$ - ReadWriteRunner begin --------->
02:20:15.893 [scala-execution-context-global-134] INFO Main$ - user4 is Some(User(2,piyo))
02:20:15.893 [scala-execution-context-global-133] INFO repository.impl.jdbc.package$ - <--------- ReadWriteRunner end
Reader.runの引数piyoをnameに持つユーザーが作成され、それが表示されている。かつ読み込み・書き込みの両方なのでReadWriteRunnerが利用されている。 このFujitaskは、Reader以外にもkits-effで定義されている任意のモナドと組み合わせることができる。
実装
ここでは詳しい実装について解説する。ただモナド版Fujitaskの記事と比較すると、ほとんどが同じように作られていることが分かると思う。
エフェクトの定義
まずはFujitaskを表すエフェクト3Fujitaskを定義する。
sealed abstract class Fujitask extends Product with Serializable
object Fujitask {
final case class Execute[A](f: ExecutionContext => Future[A])
extends Fujitask with Fx[A]
abstract case class Transaction() extends Fujitask with Fx[Transaction]
final case class Ask[I <: Transaction]() extends Fujitask with Fx[I]
}
ExecuteExecutionContextを利用して最終的な計算を行う。Ask- Readerモナドの
askに相当し、トランザクション内のデータベースセッションを取得する。 Session- 具体的なトランザクション用の情報のための抽象的なデータ構造を表す。
Askがあるのはモナド版のFujitaskと変わらず、Executeはモナド版のTask.applyに相当する。 またこれらをインスタンシエイトするための関数を次のように作っておく。
object Fujitask {
def apply[I, A](a: => A): Eff[I, A] =
Eff(Execute(Future(a)(_)))
def ask[R <: Transaction, I <: R]: Eff[R, I] =
Eff(Ask())
}
トランザクションの定義
次にモナド版Fujitaskと同様に抽象的なトランザクションと具体的なトランザクションを定義する。トランザクションの管理に必要なデータベースとのセッションなどはデータベースの種類やデータベース用のライブラリーに依存する。そこで抽象的な部分でReadトランザクションやReadWriteトランザクションなどと定義しておいて、具体的なものとしてたとえばScalikeJDBCの実装を利用するものを定義することで、インターフェースと実装を分離しやすくなる。 まずは抽象的なトランザクションの定義である。
trait ReadTransaction extends Transaction
trait ReadWriteTransaction extends ReadTransaction
Transactionはさきほど作成したFujitask.Transactionである。Transactionが別の場所にあること除いて、モナド版Fujitaskと全く同じである。 次に今回は例としてScalikeJDBCを利用する。
trait ScalikeJDBCTransaction extends Transaction {
val ctx: DBSession
}
class ScalikeJDBCReadTransaction(val ctx: DBSession)
extends ScalikeJDBCTransaction
with ReadTransaction
class ScalikeJDBCWriteTransaction(override val ctx: DBSession)
extends ScalikeJDBCReadTransaction(ctx)
with ReadWriteTransaction
これについてもモナド版とほとんど同じである。
リポジトリ層の定義
次に実際にデータベースへアクセスする部分のインターフェースと実装を作っていく。いま次のようなテーブルuserによってユーザーのデータを表すUserがある
create table `user` (
`id` bigint not null auto_increment,
`name` varchar(64) not null
)
case class User(id: Long, name: String)
これへの操作のインターフェースを次のように定義する。
trait UserRepository {
def create(name: String): Eff[ReadWriteTransaction, User]
def read(id: Long): Eff[ReadTransaction, Option[User]]
def update(user: User): Eff[ReadWriteTransaction, Unit]
def delete(id: Long): Eff[ReadWriteTransaction, Unit]
}
ここではインターフェースなので抽象的なトランザクションを表すReadTransactionやReadWriteTransactionを利用している。そして具体的な実装は次のようになる。
class UserRepositoryImpl extends UserRepository {
def create(name: String): Eff[ReadWriteTransaction, User] =
Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""insert into user (name) values ($name)"""
val id = sql.updateAndReturnGeneratedKey.apply()
User(id, name)
}
def read(id: Long): Eff[ReadTransaction, Option[User]] =
Fujitask.ask map { (i: ScalikeJDBCReadTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""select * from user where id = $id"""
sql.map(rs => User(rs.long("id"), rs.string("name"))).single.apply()
}
def update(user: User): Eff[ReadWriteTransaction, Unit] =
Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""update user set name = ${user.name} where id = ${user.id}"""
sql.update.apply()
}
def delete(id: Long): Eff[ReadWriteTransaction, Unit] =
Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""delete user where id = $id"""
sql.update.apply()
}
}
Fujitask.askによってScalikeJDBCのセッションを取得し、それを用いて具体的なSQLを発行する。
FujitaskRunnerの定義
FujitaskRunnerという型クラスを次のように定義する。
trait FujitaskRunner[I] {
def apply[A](task: I => Future[A]): Future[A]
}
これは型パラメーターIを引数にとりFuture[A]となるような関数taskを受けとり、その結果をとしてFuture[A]を返す。ScalikeJDBCによる実装は次のようになる。
package object jdbc {
lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit def readRunner[I >: ReadTransaction](implicit ec: ExecutionContext): FujitaskRunner[I] =
new FujitaskRunner[I] {
def apply[A](task: I => Future[A]): Future[A] = {
logger.info("ReadRunner begin --------->")
val session = DB.readOnlySession()
val future = task(new ScalikeJDBCReadTransaction(session))
future.onComplete { _ =>
logger.info("<--------- ReadRunner end")
session.close()
}
future
}
}
implicit def readWriteRunner[I >: ReadWriteTransaction](implicit ec: ExecutionContext): FujitaskRunner[I] =
new FujitaskRunner[I] {
def apply[A](task: I => Future[A]): Future[A] = {
logger.info("ReadWriteRunner begin --------->")
val future = DB.futureLocalTx(session => task(new ScalikeJDBCWriteTransaction(session)))
future.onComplete(_ =>
logger.info("<--------- ReadWriteRunner end")
)
future
}
}
}
このように型パラメーターの下限を利用することでReadトランザクション時にはDB.readOnlySession()で読み込み専用のセッションを取得し、そして一方でReadWriteトランザクションのときはDB.futureLocalTxでトランザクションを発生させる。 型パラメーターIは、後述するインタープリターの実行するコードの呼び出し部分でScalaのコンパイラーがコンパイル時に適切に計算する。
インタープリターの定義
インタープリターは実際にエフェクトスタックに積まれたエフェクトを解析して、副作用を生じさせる。たとえばFutureを表すようなエフェクトがあるなら、実際にスレッドを起動するとか、あるいはファイルへのIOを行うエフェクトがあるならばscala.io.Sourceなどを利用してファイルへアクセスするなどである。今考えているFujitaskはデータベースのトランザクションなので、ここでデータベースへトランザクションを発行するといったことが行われる。 これが最も複雑ではあると思うが、次のようになっている。
object Fujitask {
def run[I <: Transaction: Manifest, A](
eff: Eff[I, A]
)(
implicit runner: FujitaskRunner[I],
ec: ExecutionContext
): Future[A] = {
def handle(i: I) = new ApplicativeInterpreter[Fujitask, Any] {
override type Result[T] = Future[T]
def pure[T](a: T): Eff[Any, Result[T]] = Eff.Pure(Future.successful(a))
def flatMap[T, B](fa: Fujitask with Fx[T])(k: T => Eff[Any, Future[B]]): Eff[Any, Future[B]] =
fa match {
case Execute(f) =>
Eff.Pure(f(ec).flatMap(a => Eff.run(k(a))))
case _: Ask[I] =>
k(i.asInstanceOf[T])
}
def ap[T, B](fa: Fujitask with Fx[T])(k: Eff[Any, Result[T => B]]): Eff[Any, Result[B]] =
fa match {
case Execute(f) =>
Eff.Pure(f(ec).flatMap(a => Eff.run(k).map(_(a))))
case _: Ask[I] =>
k.map(_.map(_(i.asInstanceOf[T])))
}
def map[T, B](fa: Future[T])(k: T => B): Future[B] = fa.map(k)
}
runner(i => Eff.run(handle(i)(eff.asInstanceOf[Eff[Fujitask, A]])))
}
}
まず最下部ではさきほどのTaskRunnerを利用している。iにはScalikeJDBCReadTransactionなどの具体的なトランザクションが入ることになる。 適切なTaskRunnerが呼びだされることを説明するため、型Eff[R, A]について述べる。Rはエフェクトスタック4を表し、Aはトランザクション内で実行された計算の結果を表す。上記の関数runは型パラメーターI <: Transaction: ManifestとAを取り、引数にeff: Eff[I, A]を取っている。Manifestについては本質的ではないので説明しないが、この型Iは型Transactionのサブタイプであるので、エフェクトスタックそのものがTransactionのサブタイプでなければならない。 この型パラメーターIが、たとえばReadTransactionとReadWriteTransactionをflatMapしたならばReadWriteTransactionになる必要がある。いまEff[R, A]のflatMapは次のように実装されている。
def flatMap[S, B](f: A => Eff[S, B]): Eff[R with S, B] =
this match {
case Eff.Pure(v) =>
f(v)
case Eff.Impure(u, k) =>
Eff.Impure(u.extend[S], k :+ f)
}
エフェクトスタックRと別のエフェクトスタックSを合成してR with Sとしている。より具体的に、次のような例を考えてみる。
\[ \def\RT{\color{blue}{\text{ReadTransaction}}\;} \def\RWT{\color{red}{\text{ReadWriteTransaction}}\;} \def\with{\text{with}\;} \RT =\, \RT \with \RT \]Eff[ReadTransaction, A]とEff[ReadTransaction, B]を合成した場合、Eff[ReadTransaction with ReadTransaction, B]となる。このときとなり5ここでは
runnerとしてreadRunnerが選択される同様にいくつかの合成の結果
\[ \RWT <:\, \RT \]Eff[ReadTransaction with ReadWriteTransaction with ReadTransaction with ReadTransaction, A]となった場合なので、
\[ \RWT =\, \RT \with \RWT \with \RT \with \RT \]となり、
runnerとしてreadWriteRunnerが選択される
このようにしてサブタイプによる型の交わりと型クラスを利用することで巧妙に適したFujitaskRunnerのインスタンスを選択している。
さて、次にインタープリターの内部について説明する。ApplicativeInterpreterをインスタンシエイトしており、この中で説明するべきところはflatMapメソッドである。
def flatMap[T, B](fa: Fujitask with Fx[T])(k: T => Eff[Any, Future[B]]): Eff[Any, Future[B]] =
fa match {
case Execute(f) =>
Eff.Pure(f(ec).flatMap(a => Eff.run(k(a))))
case _: Ask[I] =>
k(i.asInstanceOf[T])
}
faとはエフェクトであり、この型Fujitask with Fx[T]は値faについての特徴を次のように説明している。
Fujitaskのエフェクトである(Fujitask)- 継続(残りの計算)
kを開始するために必要な値の型がTである(Fx[T])
そしてfaに基づいて継続kを処理することができる。faをExecuteのケースとAskのケースで場合わけをして、次のように処理を行っている。
Execute(f)の場合
関数f: ExecutionContext => Future[A]をrunが呼び出されたときに渡されたExecutionContextを利用して起動する。すると当然Future[A]となる値が得られる。さて、Executeの定義はこのようであった。
final case class Execute[A](f: ExecutionContext => Future[A])
extends Fujitask with Fx[A]
Fx[A]を継承しているので、k: A => Eff[Any, Future[B]]となる。いまFuture[A]がf(ec)で得られたため、これをflatMapすることでAの値(a)を得られる。あとはk(a)で継続を起動すればよい。kの返り値はEff[Any, Future[B]]であるが、Eff.runによって「もしk(a)の返り値のエフェクトスタックが空になっているならば、結果の型Future[B]の値を取り出す」ということができる6。これにより処理が終了する。
Askの場合
Askは次のようであった。
final case class Ask[I <: Transaction]() extends Fujitask with Fx[I]
Fx[I]なので継続kを呼びだすための引数の型TはIであるということになる。したがって継続kにFujitaskRunnerから渡されたiを渡して起動するだけでよい7。
まとめ
このようにしてExtensible Effects版のFujitaskを実装することができた。しかしこの実装は次のような問題があるのではないか?という指摘もある。
Transactionはエフェクトを表現するが、これがsealedされていないのでユーザーが任意のエフェクトを作ってしまう可能性がある
エフェクトを任意に作ることができてしまうので、このエフェクトのインタープリター(Fujitaskで言えばFujitask.run)が意図しないようなエフェクトをエフェクトスタックに詰めこめてしまう。するとそのエフェクトはインタープリターによって処理されず、Eff.runがランタイムエラーを生じさせる可能性がある。 まだよく考えたわけではないものの、もしTransactionを継承したよく分からないエフェクトがあったとしても、適切なFujitaskRunner[I]が見つからずにコンパイルエラーとなるような予感もある。これについてはもう少し考えてみて、インタープリターが処理できないかつランタイムエラーとなるような状態が発生するかを検証しようと思う。
謝辞
Extensible Effects版Fujitaskを作るにあたって、@halcat0x15aさんにはとても有用なコメントやコードの修正など様々な協力をしていただいた。
参考文献
- 進捗大陸05(“ScalaらしいEffを目指して”)
- ドワンゴ秘伝のトランザクションモナドを解説!
- 筆者が社内勉強会でFujitask(モナド版)の説明のために作成したスライド
- kits-eff
- Freer Monads, More Extensible Effects
最近は「副作用」という言葉ではなくて「計算効果(Computational effect)」という言葉で表現されることもある。↩
複数種類のモナドをリストとして表現したものであり、たとえば
Reader[Future[Either[Error, Option[A]]]]であればStack(Reader, Future, Either, Option)というようなイメージである。↩エフェクトとは副作用(計算効果)を抽象化したものである。モナドは副作用を抽象化する方法の1つであることから、ここではモナドのようなものと思えばよい。Extensible Effectsは(1)エフェクトをスタックに詰み、計算の合成を行うが、この時点ではまだスレッドを起動したりデータベースへ接続したりといった具体的な副作用は生じさせない。そして次に(2)インタープリターによってエフェクトが入ったスタックを解析し、それに基づいた副作用を生じさせる。 3つのデータ構造は次のような意味を持っている。↩
エフェクト(たとえば
Fujitaskなど)をモナドスタックのように並べたものである。モナドスタックと似ているが一応この記事ではモナドはエフェクトの具体的な実装であると区別しているため、呼び分けることとした。↩それほど重要ではないが、型$A, B$が等しい($A = B$)とは$A$は$B$のサブタイプかつ$B$は$A$のサブタイプである($A = B \Leftrightarrow (A <: B \land B <: A)$ことを指す。また$A <: B$とは型$A$は型$B$のサブタイプであることを表す↩
もしエフェクトスタックが空ではない値
eff: Eff[R, A]によってEff.run(eff)を実行した場合、ランタイムエラーを送出する。↩パターンマッチの結果
Askとなった時点でT = Iという型を決定できそうだが、Scalaの現時点でのコンパイラーはそれを検出してくれない。そのためコンパイルを通すためここではk(i.asInstanceOf[T])としている。↩
コメント
コメントを投稿