はじめに
ScalaやHaskellなどではモナドを利用して副作用1を抽象化するということがしばしば行われる。FujitaskはScalaで実装された1つのモナドで、データベースへのアクセスに関するトランザクション制御を抽象化した。ところがモナドはReader[Future[Either[Error, Option[A]]]]
のようにいくつものモナドが入れ子になってしまったとき、Scalaのfor
式のようなモナド構文では内側のモナドへのアクセスが難しくなってしまう。 この問題へのアプローチとして有名なものにモナドトランスフォーマーがある。あるモナドトランスフォーマーT
は任意モナドM
を引数に取ってT[M]
となる新しい1つのモナドとなり、このモナドT[M]
はT
とM
の両方のモナドの能力を持つ。たとえばエラーと成功を表すようなモナドトランスフォーマーEitherT
と、非同期実行を表すモナドFuture
を合成したEitherT[Future]
はFuture[Either]
のような能力となり、これはEither
とFuture
の機能を同時に使えるようなモナドとなる。ただしモナドトランスフォーマーはEither
に対してEitherT
やReader
に対してReaderT
など次々と生みだしていく必要がある。 Extensible Effectsとはモナドトランスフォーマーをよりよく改良したものであり、モナドトランスフォーマーのような新しい構造を必要とせずに異なるモナドを合成できる。ScalaではExtensible Effectsの実装としてatnos-effやkits-effがあり、今回はkits-effを利用してExtensible EffectsによるFujitaskを実装した。 この記事ではまずFujitaskについて簡単な解説を行い、そして今回のExtensible Effects版Fujitaskの利用例を見せる。そして具体的な実装を紹介し、最後にまとめと参考文献の紹介を行う。
この記事について間違った点などがあれば気軽にコメントなどで教えてほしい。なお完全なソースコードは下記のリポジトリに置かれている。
Fujitaskとは?
Fujitaskはデータベースなどのトランザクションを管理するモナドであり、次のような特徴を持つ。
- トランザクションが読み込みなのか、読み込みと書き込みの両方ができるのかを型レベルの計算により決定する
たとえばFujitaskでは読み込みしかできないReadトランザクションと、読み込みと書き込みができるReadWriteトランザクションを合成すると、型レベルの計算によりReadWriteのトランザクションとなる。実現の詳しい解説は割愛するが、これはサブタイプにより実現されている。それゆえにHaskell由来のExtensible Effectsへ持ち込めるのかは興味深い話題であり、かつて色々と考えたもののうまくいかなかった。 kits-effではモナドスタック2を型の積として表すため、こちらであればサブタイプを利用したFujitaskを作成できると考えた。atnos-effが用いるような型レベルのリスト的な構造ではFujitaskのように2つ型の共通のサブタイプを見つけてそれに置き換えるといったことが難しい。一方でkits-effではwith
を利用した型の積を利用しているので、うまく工夫することでサブタイプを利用することができるのではないかと考えた。
利用例
解説のまえに、まずはどのように利用できるのかを見てみる。普通のモナドと同じようにfor-yield
式の中で利用できる。
case class User(id: Long, name: String)
// create table `user` (
// `id` bigint not null auto_increment,
// `name` varchar(64) not null
// )
val logger: Logger = LoggerFactory.getLogger(Main.getClass)
val eff1 = for {
user1 <- userRepository.read(1L)
_ <- userRepository.create("test")
user2 <- userRepository.read(1L)
} yield {
logger.info(s"user1 is $user1")
logger.info(s"user2 is $user2")
}
Fujitask.run(eff1)
この結果は次のようになる。
02:08:31.610 [run-main-1] INFO repository.impl.jdbc.package$ - ReadWriteRunner begin --------->
02:08:31.672 [scala-execution-context-global-82] INFO Main$ - user1 is None
02:08:31.672 [scala-execution-context-global-82] INFO Main$ - user2 is Some(User(1,test))
02:08:31.674 [scala-execution-context-global-82] INFO repository.impl.jdbc.package$ - <--------- ReadWriteRunner end
id = 1
となるユーザーが存在しないため最初はNone
となり、その後userRepository.create
でユーザーを作ったことでuser2
ではSome
となっている。ここでは読み込み(read
)と書き込み(create
)を行っているためログとしてReadWriteRunner begin
/ReadWriteRunner end
が出力されている。 この後たとえば次のように読み込みだけを行ってみる。
val eff2 = for {
user3 <- userRepository.read(1L)
} yield {
logger.info(s"user3 is $user3")
}
Fujitask.run(eff2)
02:08:31.675 [scala-execution-context-global-84] INFO repository.impl.jdbc.package$ - ReadRunner begin --------->
02:08:31.676 [scala-execution-context-global-104] INFO Main$ - user3 is Some(User(1,test))
02:08:31.677 [scala-execution-context-global-104] INFO repository.impl.jdbc.package$ - <--------- ReadRunner end
さきほど作ったユーザーが表示された。ここでは読み込みしか行っていないのでReadRunner begin
/ReadRunner end
がログに出力されている。
これだけではモナド版Fujitaskと変わりないが、Extensible Effects版はモナドトランスフォーマーを定義することなくたとえばReader
モナドと組み合わせて次のように使うことができる。
val eff3 = for {
name <- Reader.ask[String]
user <- userRepository.create(name)
user4 <- userRepository.read(user.id)
} yield {
logger.info(s"user4 is $user4")
}
Fujitask.run(Reader.run("piyo")(eff3))
02:20:15.892 [scala-execution-context-global-134] INFO repository.impl.jdbc.package$ - ReadWriteRunner begin --------->
02:20:15.893 [scala-execution-context-global-134] INFO Main$ - user4 is Some(User(2,piyo))
02:20:15.893 [scala-execution-context-global-133] INFO repository.impl.jdbc.package$ - <--------- ReadWriteRunner end
Reader.run
の引数piyo
をname
に持つユーザーが作成され、それが表示されている。かつ読み込み・書き込みの両方なのでReadWriteRunner
が利用されている。 このFujitaskは、Reader
以外にもkits-effで定義されている任意のモナドと組み合わせることができる。
実装
ここでは詳しい実装について解説する。ただモナド版Fujitaskの記事と比較すると、ほとんどが同じように作られていることが分かると思う。
エフェクトの定義
まずはFujitaskを表すエフェクト3Fujitask
を定義する。
sealed abstract class Fujitask extends Product with Serializable
object Fujitask {
final case class Execute[A](f: ExecutionContext => Future[A])
extends Fujitask with Fx[A]
abstract case class Transaction() extends Fujitask with Fx[Transaction]
final case class Ask[I <: Transaction]() extends Fujitask with Fx[I]
}
Execute
ExecutionContext
を利用して最終的な計算を行う。Ask
- Readerモナドの
ask
に相当し、トランザクション内のデータベースセッションを取得する。 Session
- 具体的なトランザクション用の情報のための抽象的なデータ構造を表す。
Ask
があるのはモナド版のFujitaskと変わらず、Execute
はモナド版のTask.apply
に相当する。 またこれらをインスタンシエイトするための関数を次のように作っておく。
object Fujitask {
def apply[I, A](a: => A): Eff[I, A] =
Eff(Execute(Future(a)(_)))
def ask[R <: Transaction, I <: R]: Eff[R, I] =
Eff(Ask())
}
トランザクションの定義
次にモナド版Fujitaskと同様に抽象的なトランザクションと具体的なトランザクションを定義する。トランザクションの管理に必要なデータベースとのセッションなどはデータベースの種類やデータベース用のライブラリーに依存する。そこで抽象的な部分でReadトランザクションやReadWriteトランザクションなどと定義しておいて、具体的なものとしてたとえばScalikeJDBCの実装を利用するものを定義することで、インターフェースと実装を分離しやすくなる。 まずは抽象的なトランザクションの定義である。
trait ReadTransaction extends Transaction
trait ReadWriteTransaction extends ReadTransaction
Transaction
はさきほど作成したFujitask.Transaction
である。Transaction
が別の場所にあること除いて、モナド版Fujitaskと全く同じである。 次に今回は例としてScalikeJDBCを利用する。
trait ScalikeJDBCTransaction extends Transaction {
val ctx: DBSession
}
class ScalikeJDBCReadTransaction(val ctx: DBSession)
extends ScalikeJDBCTransaction
with ReadTransaction
class ScalikeJDBCWriteTransaction(override val ctx: DBSession)
extends ScalikeJDBCReadTransaction(ctx)
with ReadWriteTransaction
これについてもモナド版とほとんど同じである。
リポジトリ層の定義
次に実際にデータベースへアクセスする部分のインターフェースと実装を作っていく。いま次のようなテーブルuser
によってユーザーのデータを表すUser
がある
create table `user` (
`id` bigint not null auto_increment,
`name` varchar(64) not null
)
case class User(id: Long, name: String)
これへの操作のインターフェースを次のように定義する。
trait UserRepository {
def create(name: String): Eff[ReadWriteTransaction, User]
def read(id: Long): Eff[ReadTransaction, Option[User]]
def update(user: User): Eff[ReadWriteTransaction, Unit]
def delete(id: Long): Eff[ReadWriteTransaction, Unit]
}
ここではインターフェースなので抽象的なトランザクションを表すReadTransaction
やReadWriteTransaction
を利用している。そして具体的な実装は次のようになる。
class UserRepositoryImpl extends UserRepository {
def create(name: String): Eff[ReadWriteTransaction, User] =
Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""insert into user (name) values ($name)"""
val id = sql.updateAndReturnGeneratedKey.apply()
User(id, name)
}
def read(id: Long): Eff[ReadTransaction, Option[User]] =
Fujitask.ask map { (i: ScalikeJDBCReadTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""select * from user where id = $id"""
sql.map(rs => User(rs.long("id"), rs.string("name"))).single.apply()
}
def update(user: User): Eff[ReadWriteTransaction, Unit] =
Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""update user set name = ${user.name} where id = ${user.id}"""
sql.update.apply()
}
def delete(id: Long): Eff[ReadWriteTransaction, Unit] =
Fujitask.ask map { (i: ScalikeJDBCWriteTransaction) =>
implicit val session: DBSession = i.ctx
val sql = sql"""delete user where id = $id"""
sql.update.apply()
}
}
Fujitask.ask
によってScalikeJDBCのセッションを取得し、それを用いて具体的なSQLを発行する。
FujitaskRunner
の定義
FujitaskRunner
という型クラスを次のように定義する。
trait FujitaskRunner[I] {
def apply[A](task: I => Future[A]): Future[A]
}
これは型パラメーターI
を引数にとりFuture[A]
となるような関数task
を受けとり、その結果をとしてFuture[A]
を返す。ScalikeJDBCによる実装は次のようになる。
package object jdbc {
lazy val logger: Logger = LoggerFactory.getLogger(this.getClass)
implicit def readRunner[I >: ReadTransaction](implicit ec: ExecutionContext): FujitaskRunner[I] =
new FujitaskRunner[I] {
def apply[A](task: I => Future[A]): Future[A] = {
logger.info("ReadRunner begin --------->")
val session = DB.readOnlySession()
val future = task(new ScalikeJDBCReadTransaction(session))
future.onComplete { _ =>
logger.info("<--------- ReadRunner end")
session.close()
}
future
}
}
implicit def readWriteRunner[I >: ReadWriteTransaction](implicit ec: ExecutionContext): FujitaskRunner[I] =
new FujitaskRunner[I] {
def apply[A](task: I => Future[A]): Future[A] = {
logger.info("ReadWriteRunner begin --------->")
val future = DB.futureLocalTx(session => task(new ScalikeJDBCWriteTransaction(session)))
future.onComplete(_ =>
logger.info("<--------- ReadWriteRunner end")
)
future
}
}
}
このように型パラメーターの下限を利用することでReadトランザクション時にはDB.readOnlySession()
で読み込み専用のセッションを取得し、そして一方でReadWriteトランザクションのときはDB.futureLocalTx
でトランザクションを発生させる。 型パラメーターI
は、後述するインタープリターの実行するコードの呼び出し部分でScalaのコンパイラーがコンパイル時に適切に計算する。
インタープリターの定義
インタープリターは実際にエフェクトスタックに積まれたエフェクトを解析して、副作用を生じさせる。たとえばFuture
を表すようなエフェクトがあるなら、実際にスレッドを起動するとか、あるいはファイルへのIOを行うエフェクトがあるならばscala.io.Source
などを利用してファイルへアクセスするなどである。今考えているFujitask
はデータベースのトランザクションなので、ここでデータベースへトランザクションを発行するといったことが行われる。 これが最も複雑ではあると思うが、次のようになっている。
object Fujitask {
def run[I <: Transaction: Manifest, A](
eff: Eff[I, A]
)(
implicit runner: FujitaskRunner[I],
ec: ExecutionContext
): Future[A] = {
def handle(i: I) = new ApplicativeInterpreter[Fujitask, Any] {
override type Result[T] = Future[T]
def pure[T](a: T): Eff[Any, Result[T]] = Eff.Pure(Future.successful(a))
def flatMap[T, B](fa: Fujitask with Fx[T])(k: T => Eff[Any, Future[B]]): Eff[Any, Future[B]] =
fa match {
case Execute(f) =>
Eff.Pure(f(ec).flatMap(a => Eff.run(k(a))))
case _: Ask[I] =>
k(i.asInstanceOf[T])
}
def ap[T, B](fa: Fujitask with Fx[T])(k: Eff[Any, Result[T => B]]): Eff[Any, Result[B]] =
fa match {
case Execute(f) =>
Eff.Pure(f(ec).flatMap(a => Eff.run(k).map(_(a))))
case _: Ask[I] =>
k.map(_.map(_(i.asInstanceOf[T])))
}
def map[T, B](fa: Future[T])(k: T => B): Future[B] = fa.map(k)
}
runner(i => Eff.run(handle(i)(eff.asInstanceOf[Eff[Fujitask, A]])))
}
}
まず最下部ではさきほどのTaskRunner
を利用している。i
にはScalikeJDBCReadTransaction
などの具体的なトランザクションが入ることになる。 適切なTaskRunner
が呼びだされることを説明するため、型Eff[R, A]
について述べる。R
はエフェクトスタック4を表し、A
はトランザクション内で実行された計算の結果を表す。上記の関数run
は型パラメーターI <: Transaction: Manifest
とA
を取り、引数にeff: Eff[I, A]
を取っている。Manifest
については本質的ではないので説明しないが、この型I
は型Transaction
のサブタイプであるので、エフェクトスタックそのものがTransaction
のサブタイプでなければならない。 この型パラメーターI
が、たとえばReadTransaction
とReadWriteTransaction
をflatMap
したならばReadWriteTransaction
になる必要がある。いまEff[R, A]
のflatMap
は次のように実装されている。
def flatMap[S, B](f: A => Eff[S, B]): Eff[R with S, B] =
this match {
case Eff.Pure(v) =>
f(v)
case Eff.Impure(u, k) =>
Eff.Impure(u.extend[S], k :+ f)
}
エフェクトスタックR
と別のエフェクトスタックS
を合成してR with S
としている。より具体的に、次のような例を考えてみる。
\[ \def\RT{\color{blue}{\text{ReadTransaction}}\;} \def\RWT{\color{red}{\text{ReadWriteTransaction}}\;} \def\with{\text{with}\;} \RT =\, \RT \with \RT \]Eff[ReadTransaction, A]
とEff[ReadTransaction, B]
を合成した場合、Eff[ReadTransaction with ReadTransaction, B]
となる。このときとなり5ここでは
runner
としてreadRunner
が選択される同様にいくつかの合成の結果
\[ \RWT <:\, \RT \]Eff[ReadTransaction with ReadWriteTransaction with ReadTransaction with ReadTransaction, A]
となった場合なので、
\[ \RWT =\, \RT \with \RWT \with \RT \with \RT \]となり、
runner
としてreadWriteRunner
が選択される
このようにしてサブタイプによる型の交わりと型クラスを利用することで巧妙に適したFujitaskRunner
のインスタンスを選択している。
さて、次にインタープリターの内部について説明する。ApplicativeInterpreter
をインスタンシエイトしており、この中で説明するべきところはflatMap
メソッドである。
def flatMap[T, B](fa: Fujitask with Fx[T])(k: T => Eff[Any, Future[B]]): Eff[Any, Future[B]] =
fa match {
case Execute(f) =>
Eff.Pure(f(ec).flatMap(a => Eff.run(k(a))))
case _: Ask[I] =>
k(i.asInstanceOf[T])
}
fa
とはエフェクトであり、この型Fujitask with Fx[T]
は値fa
についての特徴を次のように説明している。
Fujitask
のエフェクトである(Fujitask
)- 継続(残りの計算)
k
を開始するために必要な値の型がT
である(Fx[T]
)
そしてfa
に基づいて継続k
を処理することができる。fa
をExecute
のケースとAsk
のケースで場合わけをして、次のように処理を行っている。
Execute(f)
の場合
関数f: ExecutionContext => Future[A]
をrun
が呼び出されたときに渡されたExecutionContext
を利用して起動する。すると当然Future[A]
となる値が得られる。さて、Execute
の定義はこのようであった。
final case class Execute[A](f: ExecutionContext => Future[A])
extends Fujitask with Fx[A]
Fx[A]
を継承しているので、k: A => Eff[Any, Future[B]]
となる。いまFuture[A]
がf(ec)
で得られたため、これをflatMap
することでA
の値(a
)を得られる。あとはk(a)
で継続を起動すればよい。k
の返り値はEff[Any, Future[B]]
であるが、Eff.run
によって「もしk(a)
の返り値のエフェクトスタックが空になっているならば、結果の型Future[B]
の値を取り出す」ということができる6。これにより処理が終了する。
Ask
の場合
Ask
は次のようであった。
final case class Ask[I <: Transaction]() extends Fujitask with Fx[I]
Fx[I]
なので継続k
を呼びだすための引数の型T
はI
であるということになる。したがって継続k
にFujitaskRunner
から渡されたi
を渡して起動するだけでよい7。
まとめ
このようにしてExtensible Effects版のFujitaskを実装することができた。しかしこの実装は次のような問題があるのではないか?という指摘もある。
Transaction
はエフェクトを表現するが、これがsealed
されていないのでユーザーが任意のエフェクトを作ってしまう可能性がある
エフェクトを任意に作ることができてしまうので、このエフェクトのインタープリター(Fujitaskで言えばFujitask.run
)が意図しないようなエフェクトをエフェクトスタックに詰めこめてしまう。するとそのエフェクトはインタープリターによって処理されず、Eff.run
がランタイムエラーを生じさせる可能性がある。 まだよく考えたわけではないものの、もしTransaction
を継承したよく分からないエフェクトがあったとしても、適切なFujitaskRunner[I]
が見つからずにコンパイルエラーとなるような予感もある。これについてはもう少し考えてみて、インタープリターが処理できないかつランタイムエラーとなるような状態が発生するかを検証しようと思う。
謝辞
Extensible Effects版Fujitaskを作るにあたって、@halcat0x15aさんにはとても有用なコメントやコードの修正など様々な協力をしていただいた。
参考文献
- 進捗大陸05(“ScalaらしいEffを目指して”)
- ドワンゴ秘伝のトランザクションモナドを解説!
- 筆者が社内勉強会でFujitask(モナド版)の説明のために作成したスライド
- kits-eff
- Freer Monads, More Extensible Effects
最近は「副作用」という言葉ではなくて「計算効果(Computational effect)」という言葉で表現されることもある。↩
複数種類のモナドをリストとして表現したものであり、たとえば
Reader[Future[Either[Error, Option[A]]]]
であればStack(Reader, Future, Either, Option)
というようなイメージである。↩エフェクトとは副作用(計算効果)を抽象化したものである。モナドは副作用を抽象化する方法の1つであることから、ここではモナドのようなものと思えばよい。Extensible Effectsは(1)エフェクトをスタックに詰み、計算の合成を行うが、この時点ではまだスレッドを起動したりデータベースへ接続したりといった具体的な副作用は生じさせない。そして次に(2)インタープリターによってエフェクトが入ったスタックを解析し、それに基づいた副作用を生じさせる。 3つのデータ構造は次のような意味を持っている。↩
エフェクト(たとえば
Fujitask
など)をモナドスタックのように並べたものである。モナドスタックと似ているが一応この記事ではモナドはエフェクトの具体的な実装であると区別しているため、呼び分けることとした。↩それほど重要ではないが、型$A, B$が等しい($A = B$)とは$A$は$B$のサブタイプかつ$B$は$A$のサブタイプである($A = B \Leftrightarrow (A <: B \land B <: A)$ことを指す。また$A <: B$とは型$A$は型$B$のサブタイプであることを表す↩
もしエフェクトスタックが空ではない値
eff: Eff[R, A]
によってEff.run(eff)
を実行した場合、ランタイムエラーを送出する。↩パターンマッチの結果
Ask
となった時点でT = I
という型を決定できそうだが、Scalaの現時点でのコンパイラーはそれを検出してくれない。そのためコンパイルを通すためここではk(i.asInstanceOf[T])
としている。↩
コメント
コメントを投稿