汎関数プログラミング(37)−汎関数Stream IO:汎用IO処理プロセス−Free Process
16694 ワード
前の2つの議論では,IO Process:Process[I,O],その動作原理,関数組合せなどを紹介した.完全なIOプログラムは、データソース+処理プロセス+データ終点:Source->Process->Sinkから構成されていることが容易に想像できます.プロセス[I,O]自体がSourceとSinkを両立させることができない機能であることが分かった.一方,独立して付加されたSourceとSinkはProcess[I,O]との関数の組合せを効率的に行うことができない.
実際にはProcess[I,O]は固定単一入力タイプ(single input process):単一とはProcess[I,O]がIタイプ入力のみを受信し、固定とは外部へのコミュニケーション方式が固定されていることを意味する:Halt,Emit,Awaitの3つの状態のみを指す.このような状況はプロセス[I,O]が有効なIOプログラムコンポーネントにならないことを招き,より概括的でより汎用的なIOプロセスの開発を試みなければならない.新しいIOタイプ構造を見てみましょう:Process[F[],O]
この新しいタイプは一連のOタイプ元素を生成できることを表す.これをList[O]としてコンポーネントを記述することができる.重要なのは、ファイルやデータベースの内容を読み取るなど、F[A]を演算することで外部と連絡を取ることができることです.演算F[A]で返されたデータはrf関数の処理を経て新しい状態に移行する:データ入力が正常に完了したときにfbを実行して新しい状態に入り、データ入力中に異常が発生した場合にflを実行して戦場を整理し、資源を解放する.Haltを改善し,終了状況情報を返すようにした.これによりプロセスは全体的に透明になり、より安全になります.さらに重要なのはProcessが拡張できることです.
安全で利用可能なIOタイプとして、まずProcess[F,O]の基本コンポーネントを設計します.
プロセスを演算するときにTryを使用して異常情報をキャプチャし、制御可能な状態Halt(err)に戻ります.なぜなら、終了状況-End:正常終了、Kill:強制終了およびThrowable:異常終了が発生したことを戻り状態から知ることができるからです.
呼び出しが容易なヘルプ関数(helper functions)を設計します.
さらにヘルプ関数があります.
2つのprocessを接続するために++関数も呼び出しました.例えば、p.asFinalizer++Halt(err).次に、この関数を実装します.
私たちがよく知っているmap、flatmap、repeatもあります.
注意:O値の遷移のみを対象とします.ちなみに、演算の安全を保証するいくつかのヘルプ関数を列挙します(Tryを使用してみました):
プロセス[F[]と言う以上、O]はより一般的なIOタイプであり、プロセス[I,O]はプロセス[F[]であるべきである.O]の特殊なケースです.今最も重要なのは、このFを形成してIタイプの入力しか受け入れられないように制限する必要があります.
プロセス[I,O]に対する以前のコンポーネントも同様に導出できる.
注意:コンポーネント実装における書き方は,従来のProcess[I,O]と同様であるが,今回我々の戻りタイプはProcess 1[I,O]である.
IO Processドッキングの最も重要なコンポーネントカテーテル(pipe|>)コンポーネントに来ました.
下流pが入力awaitを待つと同時に上流thisがemitを送信している状態を考慮してドッキングを実現するだけである.他の状態は、自分でポートを調整してからドッキングします.
この|>があれば、プロセス1コンポーネントのペアをプロセス[F,O]に接続できます.
以上のコンポーネントはいずれも出力Oをフィルタリングするものである.
プロセス[F,O]により,アルファベットTのように,入力が上から左右に入る2つの入力を実現することもできる.
まず、左のIクラス、右のI 2クラスの入力のみを許容するように、F[A]を再構築します.
Teeタイプは2頭入力のIO Processタイプで、左はIのみ、右はI 2のみ入力できます.
Teeを構築する基本コンポーネントを再定義する必要があります.
zipは両側入力挿入動作です.このTeeタイプでzipを実現することができます.
次はTeeの一般的なヘルプ関数です.これは2つの入力の代表的な形式です.
zipWithは次のように書くことができます.
完全なIOプログラムはデータソースSourceとデータ終点Sinkの操作を含まなければならないが、Process[F,O]はデータソース(Source)タイプを表すことができるだろうか.Process[F,O]の読み取りAwaitを見てみましょう.
もし私たちがF[A]をIO[A]に変えたら:
実際にProcess[I,O]はProcess[F,O]の一例である.IOを演算するだけでデータソースからデータを読み取ることができます(run IOは結果を返します).まずIOタイプを見てみましょう.前にこのタイプについて議論しました.
データソースSourceからデータを読み込むプログラムを書くことができます.
なお、上述したプロセス[IO,O]は、プロセス[F,O]の特別な例として、Sourceからのデータの読み出しが可能である.次の章では、具体的な実行可能な案について説明します.
次に,データ終点SinkとProcess[F,O]の関係を見る.Process[F,O]タイプで出力機能を実現し,つまりSourceの入力をSinkに送信することを望んでいる.Sinkを次のように表します.
type Sink[F[_],O] = Process[F, O => F[Unit]]
これはSinkが出力Oに一連の関数を提供していることを理解するのは難しくないだろう.これらの関数は入力パラメータOを受信してFを実行し、FはIO演算のような結果を返さない演算である.
SourceとSinkタイプの実用化については、次号「IOプロセス実用化-IOプロセスin action」で具体的に説明します.
実際にはProcess[I,O]は固定単一入力タイプ(single input process):単一とはProcess[I,O]がIタイプ入力のみを受信し、固定とは外部へのコミュニケーション方式が固定されていることを意味する:Halt,Emit,Awaitの3つの状態のみを指す.このような状況はプロセス[I,O]が有効なIOプログラムコンポーネントにならないことを招き,より概括的でより汎用的なIOプロセスの開発を試みなければならない.新しいIOタイプ構造を見てみましょう:Process[F[],O]
trait Process[F[_],O]{}
object Process {
case class Halt[F[_],O](err: Throwable) extends Process[F,O] // ,err
case class Emit[F[_],O](os: Seq[O], ns: Process[F,O]) extends Process[F,O] // os,ns=nextState
case class Await[F[_],A,O]( //
rq: F[A], // 。
rf: Either[Throwable,A] => Process[F,O], // F[A]
fb: Process[F,O], //fallback fb
fl: Process[F,O] //finalizer ,
) extends Process[F,O]
}
この新しいタイプは一連のOタイプ元素を生成できることを表す.これをList[O]としてコンポーネントを記述することができる.重要なのは、ファイルやデータベースの内容を読み取るなど、F[A]を演算することで外部と連絡を取ることができることです.演算F[A]で返されたデータはrf関数の処理を経て新しい状態に移行する:データ入力が正常に完了したときにfbを実行して新しい状態に入り、データ入力中に異常が発生した場合にflを実行して戦場を整理し、資源を解放する.Haltを改善し,終了状況情報を返すようにした.これによりプロセスは全体的に透明になり、より安全になります.さらに重要なのはProcessが拡張できることです.
安全で利用可能なIOタイプとして、まずProcess[F,O]の基本コンポーネントを設計します.
case object End extends Exception //
case object Kill extends Exception //
// p, Halt
def Try[F[_],O](p: Process[F,O]): Process[F,O] =
try p
catch { case e: Throwable => Halt[F,O](e) } //
プロセスを演算するときにTryを使用して異常情報をキャプチャし、制御可能な状態Halt(err)に戻ります.なぜなら、終了状況-End:正常終了、Kill:強制終了およびThrowable:異常終了が発生したことを戻り状態から知ることができるからです.
呼び出しが容易なヘルプ関数(helper functions)を設計します.
// emit , ,
// emitAll(os1, Emit(os2,ns)) == Emit(os1 ++ os2, ns)
def emitAll[F[_],O](outseq: Seq[O], nxtState: Process[F,O] = Halt[F,O](End)): Process[F,O] =
nxtState match {
case Emit(os,ns) => Emit(outseq ++ os, ns) // Emit,
case _ => Emit(outseq,nxtState) //
}
// O
def emit[F[_],O](o: O, nxs: Process[F,O] = Halt[F,O](End)): Process[F,O] = emitAll[F,O](Seq(o),nxs)
//Await
def await[F[_],A,O](req: F[A])(rcfn: Either[Throwable,A] => Process[F,O])(
fallback: Process[F,O] = Halt[F,O](End),
finalizer: Process[F,O] = Halt[F,O](End)) = Await(req,rcfn,fallback,finalizer)
さらにヘルプ関数があります.
// f
def onHalt(f: Throwable => Process[F,O]): Process[F,O] = this match {
case Halt(e) => Try { f(e) } // , f
case Emit(os,ns) => emitAll(os,ns.onHalt(f)) // , 。
case Await(rq,rf,fb,fl) => await(rq)(rf andThen (_ onHalt(f)))(fb,fl) // ,
}
// 。
def drain[O2]: Process[F,O2] = this match {
case Halt(e) => Halt(e)
case Emit(os,ns) => ns.drain
case Await(rq,rf,fb,fl) => Await(rq,rf andThen (_ drain))
}
// , , ,
def asFinalizer: Process[F,O] = this match {
case Halt(e) => Halt(e)
case Emit(os,ns) => Emit(os, ns.asFinalizer)
case Await(rq,rf,fb,fl) => await(rq){
case Left(Kill) => this.asFinalizer // ,
case x => rf(x)
}(fb,fl)
}
// p,
def onComplete(p: => Process[F,O]) =
this.onHalt {
case End => p.asFinalizer //
case err => p.asFinalizer ++ Halt(err) // , p,
}
2つのprocessを接続するために++関数も呼び出しました.例えば、p.asFinalizer++Halt(err).次に、この関数を実装します.
// process, this p
def ++(p: Process[F,O]): Process[F,O] = this match {
case Halt(e) => Try { p } // , p
case Emit(os,ns) => emitAll(os, ns ++ p) // ,
case Await(rq,rf,fb,fl) =>Await(rq, rf andThen (_ ++ p), fb ++ p, fl ++ p) // ,
}
//++ 。
def append(p: Process[F,O]): Process[F,O] =
onHalt {
case End => Try { p } // p
case err => Halt(err)
}
私たちがよく知っているmap、flatmap、repeatもあります.
def map[O2](f: O => O2): Process[F,O2] = this match {
case Halt(e) => Halt(e)
case Emit(os,ns) => Try { if (os.isEmpty) ns map f
else emit(f(os.head), emitAll(os.tail, ns) map f) }
case Await(rq,rf,fb,fl) => Await(rq, rf andThen (_ map f), fb map f, fl map f)
}
def flatMap[O2](f: O => Process[F,O2]): Process[F,O2] = this match {
case Halt(e) => Halt(e)
case Emit(os,ns) => Try { if (os.isEmpty) ns flatMap f
else f(os.head) ++ emitAll(os.tail, ns).flatMap(f) }
case Await(rq,rf,fb,fl) => Await(rq,rf andThen(_ flatMap f), fb flatMap f, fl flatMap f)
}
def repeat: Process[F,O] = this ++ this.repeat
注意:O値の遷移のみを対象とします.ちなみに、演算の安全を保証するいくつかのヘルプ関数を列挙します(Tryを使用してみました):
// p, Halt
def Try[F[_],O](p: Process[F,O]): Process[F,O] =
try p
catch { case e: Throwable => Halt[F,O](e) } //
// p, fl
def TryOr[F[_],O](p: Process[F,O], fl: Process[F,O]): Process[F,O] =
try p
catch {case e: Throwable => fl ++ Halt[F,O](e) }
// p, , fb, fl
def TryAwait[F[_],O](p: Process[F,O])(fb: Process[F,O], fl: Process[F,O]): Process[F,O] =
try p
catch {
case End => fb //
case e: Throwable => fl ++ Halt(e)
}
プロセス[F[]と言う以上、O]はより一般的なIOタイプであり、プロセス[I,O]はプロセス[F[]であるべきである.O]の特殊なケースです.今最も重要なのは、このFを形成してIタイプの入力しか受け入れられないように制限する必要があります.
case class Is[I]() {
sealed trait f[X] {}
case object get extends f[I] //f[X] , f[I], X I
}
def Get[I] = Is[I]().get
type Process1[I,O] = Process[Is[I]#f,O]
def halt1[I,O]: Process1[I,O] = Halt[Is[I]#f,O](End)
def emit1[I,O](o: O, ns: Process1[I,O] = halt1[I,O]): Process1[I,O] = emit(o,ns)
def emitAll1[I,O](os: Seq[O], ns: Process1[I,O] = halt1[I,O]): Process1[I,O] = emitAll(os,ns)
def await1[I,O](rf: I => Process1[I,O],fb: Process1[I,O] = halt1[I,O],fl: Process1[I,O] = halt1[I,O]) =
await(Get[I]){(ei: Either[Throwable,I]) => ei match { //F[A]>>>Get[I], I
case Left(End) => fb
case Left(err) => halt1[I,O]
case Right(i) => Try { rf(i) }
}}(fb,fl)
プロセス[I,O]に対する以前のコンポーネントも同様に導出できる.
def lift[I,O](f: I => O): Process1[I,O] =
await1[I,O](i => emit(f(i))) repeat
def filter[I](f: I => Boolean): Process1[I,I] =
await1[I,I](i => if (f(i)) emit(i) else halt1)
def take[I](n: Int): Process1[I,I] =
if (n > 0) await1[I,I](i => emit(i, take(n-1)))
else halt1
def takeWhile[I](f: I => Boolean): Process1[I,I] =
await1[I,I](i => if (f(i)) emit(i, takeWhile(f)) else halt1)
//
def id[I]: Process1[I,I] = await1[I,I](i => emit(i,id))
def passUnchanged[I]: Process1[I,I] = lift[I,I](identity)
def drop[I](n: Int): Process1[I,I] =
if (n > 0) await1[I,I](i => drop[I](n-1))
else passUnchanged
def dropWhile[I](f: I => Boolean): Process1[I,I] =
await1[I,I](i => if (f(i)) id else emit(i, dropWhile(f)))
注意:コンポーネント実装における書き方は,従来のProcess[I,O]と同様であるが,今回我々の戻りタイプはProcess 1[I,O]である.
IO Processドッキングの最も重要なコンポーネントカテーテル(pipe|>)コンポーネントに来ました.
// process
@annotation.tailrec
final def kill[O2]: Process[F,O2] = this match {
case Halt(e) => Halt(e) //
case Emit(os,ns) => ns.kill // , kill
case Await(rq,rf,fb,fl) => rf(Left(Kill)).drain.onHalt { // , Kill
case Kill => Halt(End) // Kill
case err => Halt(err) //
}
}
// process. emit await
def |>[O2](p2: Process1[O,O2]): Process[F,O2] = {
@annotation.tailrec
// Seq[O] O O p2
def feed(os: Seq[O], ns: Process[F,O], rf: Either[Throwable,O] => Process1[O,O2],
fb: Process1[O,O2] = halt1[O,O2], fl: Process1[O,O2] = halt1[O,O2]): Process[F,O2] =
if (os.isEmpty) ns |> await(Get[O])(rf)(fb,fl)
else rf(Right(os.head)) match { //
case Await(rq1,rf1,fb1,fl1) => feed(os.tail,ns,rf,fb,fl)
case p => Emit(os.tail,ns) |> p
}
p2 match {
case Halt(e) => this.kill.onHalt {e2 => Halt(e) ++ Halt(e2)} // 。
case Emit(os,ns) => emitAll(os, this |> ns)
case Await(rq,rf,fb,fl) => this match {
case Halt(e) => Halt(e) |> p2
case Await(rq0,rf0,fb0,fl0) => await(rq0)(rf0 andThen (_ |> p2))(fb0 |> p2, fl0 |> p2)
case Emit(os,ns) => Try {feed(os,ns,rf,fb,fl)} // 。
}
}
}
下流pが入力awaitを待つと同時に上流thisがemitを送信している状態を考慮してドッキングを実現するだけである.他の状態は、自分でポートを調整してからドッキングします.
この|>があれば、プロセス1コンポーネントのペアをプロセス[F,O]に接続できます.
def filter(f: O => Boolean): Process[F,O] = this |> Process.filter(f)
def take(n: Int): Process[F,O] = this |> Process.take(n)
def takeWhile(f: O => Boolean): Process[F,O] = this |> Process.takeWhile(f)
def drop(n: Int): Process[F,O] = this |> Process.drop(n)
def dropWhile(f: O => Boolean): Process[F,O] = this |> Process.dropWhile(f)
以上のコンポーネントはいずれも出力Oをフィルタリングするものである.
プロセス[F,O]により,アルファベットTのように,入力が上から左右に入る2つの入力を実現することもできる.
まず、左のIクラス、右のI 2クラスの入力のみを許容するように、F[A]を再構築します.
case class T[I,I2]() {
sealed trait f[X] {def get: Either[I => X, I2 => X]}
val L = new f[I] { def get = Left(identity) }
val R = new f[I2] { def get = Right(identity) }
}
def L[I,I2] = T[I,I2]().L
def R[I,I2] = T[I,I2]().R
type Tee[I,I2,O] = Process[T[I,I2]#f,O]
Teeタイプは2頭入力のIO Processタイプで、左はIのみ、右はI 2のみ入力できます.
Teeを構築する基本コンポーネントを再定義する必要があります.
def haltT[I,I2,O] = Halt[T[I,I2]#f,O](End)
def emitT[I,I2,O](o: O, ns: Tee[I,I2,O] = haltT) = emit[T[I,I2]#f,O](o,ns)
//
def awaitL[I,I2,O](rf: I => Tee[I,I2,O], fb: Tee[I,I2,O] = haltT, fl: Tee[I,I2,O] = haltT): Tee[I,I2,O] =
await[T[I,I2]#f,I,O](L){ //F[A] >>> L >>> I
case Left(End) => fb
case Left(err) => fl ++ Halt(err)
case Right(i) => Try { rf(i) }
}(fb,fl)
//
def awaitR[I,I2,O](rf: I2 => Tee[I,I2,O], fb: Tee[I,I2,O] = haltT, fl: Tee[I,I2,O] = haltT): Tee[I,I2,O] =
await[T[I,I2]#f,I2,O](R) { //F[A] >>> R >>> I2
case Left(End) => fb
case Left(err) => fl ++ Halt(err)
case Right(i2) => Try { rf(i2) }
}(fb,fl)
zipは両側入力挿入動作です.このTeeタイプでzipを実現することができます.
def zipWith[I,I2,O](f: (I,I2) => O): Tee[I,I2,O] =
awaitL[I,I2,O](i => awaitR[I,I2,O](i2 => emitT(f(i,i2)))) repeat
// pair
def zip[I,I2]: Tee[I,I2,(I,I2)] = zipWith[I,I2,(I,I2)]((_,_))
//
def interleave[I]: Tee[I,I,I] = awaitL[I,I,I](i => awaitR(i2 => emitT(i) ++ emitT(i2))) repeat
// ,
def passL[I,I2]: Tee[I,I2,I] = awaitL(emitT(_,passL))
// ,
def passR[I,I2]: Tee[I,I2,I2] = awaitR(emitT(_,passR))
// fb
def awaitLOr[I,I2,O](fb: Tee[I,I2,O])(rf: I => Tee[I,I2,O]): Tee[I,I2,O] =
awaitL(rf,fb)
// fb
def awaitROr[I,I2,O](fb: Tee[I,I2,O])(rf: I2 => Tee[I,I2,O]): Tee[I,I2,O] =
awaitR(rf,fb)
// padI padI2
def zipWithAll[I,I2,O](padI: I, padI2: I2)(f: (I,I2) => O): Tee[I,I2,O] = {
val fbL = passL[I,I2] map {f(_, padI2)} // , fbL, padI2 Tee
val fbR = passR[I,I2] map {f(padI, _)} // , fbR, padI Tee
awaitLOr(fbL)(i =>
awaitROr(fbR)(i2 => emitT(f(i,i2)))) repeat
}
次はTeeの一般的なヘルプ関数です.これは2つの入力の代表的な形式です.
//
def tee[O2,O3](p2: Process[F,O2])(t: Tee[O,O2,O3]): Process[F,O3] = {
@annotation.tailrec
def feedL(os: Seq[O], ns: Process[F,O], p: Process[F,O2], t: Tee[O,O2,O3], //
rf: Either[Throwable,O] => Tee[O,O2,O3]): Process[F,O3] = // O
if (os.isEmpty) (ns tee p)(t) //
else rf(Right(os.head)) match { // O
case Await(rq0,rf0,fb0,fl0) => feedL(os.tail,ns,p,t,rf0) // await os.tail
case t0 => (Emit(os.tail,ns) tee p)(t0) //tee
}
@annotation.tailrec
def feedR(os: Seq[O2], ns: Process[F,O2], p: Process[F,O], t: Tee[O,O2,O3], //
rf: Either[Throwable,O2] => Tee[O,O2,O3]): Process[F,O3] = // O2
if (os.isEmpty) (p tee ns)(t) //
else rf(Right(os.head)) match { // O2
case Await(rq0,rf0,fb0,fl0) => feedR(os.tail,ns,p,t,rf0) // await os.tail
case t0 => (p tee Emit(os.tail,ns))(t0) //tee
}
t match {
case Halt(e) => this.kill onComplete p2.kill onComplete Halt(e) //T process
case Emit(os,ns) => emitAll(os, (this tee p2)(ns)) //T , tee
case Await(side,rf,fb,fl) => side.get match { //T ,
case Left(isO) => this match { // , O
case Halt(e) => p2.kill onComplete Halt(e) // ,
case Await(rqL,rfL,fbL,flL) => await(rqL)(rfL andThen (this2 => this2.tee(p2)(t)))() //
case Emit(os,ns) => Try {feedL(os,ns,p2,t,rf)} // , feedL O
}
case Right(isO2) => p2 match {
case Halt(e) => this.kill onComplete Halt(e) // ,
case Await(rqR,rfR,fbR,flR) => await(rqR)(rfR andThen (p => this.tee(p)(t)))()
case Emit(os,ns) => Try {feedR(os,ns,this,t,rf)} // , feedL O
}
}
}
}
zipWithは次のように書くことができます.
// tee zipWith
def zipWith[O2,O3](p2: Process[F,O2])(f: (O,O2) => O3): Process[F,O3] =
(this tee p2)(Process.zipWith(f))
完全なIOプログラムはデータソースSourceとデータ終点Sinkの操作を含まなければならないが、Process[F,O]はデータソース(Source)タイプを表すことができるだろうか.Process[F,O]の読み取りAwaitを見てみましょう.
case class Await[F[_],A,O]( //
rq: F[A], // 。
rf: Either[Throwable,A] => Process[F,O], // F[A]
fb: Process[F,O] = Halt[F,O](End), //fallback fb
fl: Process[F,O] = Halt[F,O](End) //finalizer ,
) extends Process[F,O]
もし私たちがF[A]をIO[A]に変えたら:
case class Await[IO[_],A,O]( //
rq: IO[A], // 。
rf: Either[Throwable,A] => Process[IO,O], // F[A]
fb: Process[IO,O] = Halt[IO,O](End), //fallback fb
fl: Process[IO,O] = Halt[IO,O](End) //finalizer ,
) extends Process[IO,O]
実際にProcess[I,O]はProcess[F,O]の一例である.IOを演算するだけでデータソースからデータを読み取ることができます(run IOは結果を返します).まずIOタイプを見てみましょう.前にこのタイプについて議論しました.
// IO ,
trait IO[A] { self =>
def run: A
def map[B](f: A => B): IO[B] =
new IO[B] { def run = f(self.run) }
def flatMap[B](f: A => IO[B]): IO[B] =
new IO[B] { def run = f(self.run).run }
}
object IO {
def unit[A](a: => A): IO[A] = new IO[A] { def run = a }
def flatMap[A,B](fa: IO[A])(f: A => IO[B]) = fa flatMap f
def apply[A](a: => A): IO[A] = unit(a) // syntax for IO { .. }
}
データソースSourceからデータを読み込むプログラムを書くことができます.
// src
def collect[O](src: Process[IO,O]): IndexedSeq[O] = { // src IndexedSeq
def go(curState: Process[IO,O], accSeq: IndexedSeq[O]): IndexedSeq[O] =
curState match {
case Halt(e) => accSeq // ,
case Emit(os,ns) => go(ns, accSeq ++ os) // , accSeq
case Await(rq,rf,fb,fl) => go(Try { rf(Right(rq.run)) }, accSeq) //
// Try Halt(err)
}
go(src,IndexedSeq())
}
なお、上述したプロセス[IO,O]は、プロセス[F,O]の特別な例として、Sourceからのデータの読み出しが可能である.次の章では、具体的な実行可能な案について説明します.
次に,データ終点SinkとProcess[F,O]の関係を見る.Process[F,O]タイプで出力機能を実現し,つまりSourceの入力をSinkに送信することを望んでいる.Sinkを次のように表します.
type Sink[F[_],O] = Process[F, O => F[Unit]]
これはSinkが出力Oに一連の関数を提供していることを理解するのは難しくないだろう.これらの関数は入力パラメータOを受信してFを実行し、FはIO演算のような結果を返さない演算である.
SourceとSinkタイプの実用化については、次号「IOプロセス実用化-IOプロセスin action」で具体的に説明します.