汎関数プログラミング(37)−汎関数Stream IO:汎用IO処理プロセス−Free Process


前の2つの議論では,IO Process:Process[I,O],その動作原理,関数組合せなどを紹介した.完全なIOプログラムは、データソース+処理プロセス+データ終点:Source->Process->Sinkから構成されていることが容易に想像できます.プロセス[I,O]自体がSourceとSinkを両立させることができない機能であることが分かった.一方,独立して付加されたSourceとSinkはProcess[I,O]との関数の組合せを効率的に行うことができない.
実際にはProcess[I,O]は固定単一入力タイプ(single input process):単一とはProcess[I,O]がIタイプ入力のみを受信し、固定とは外部へのコミュニケーション方式が固定されていることを意味する:Halt,Emit,Awaitの3つの状態のみを指す.このような状況はプロセス[I,O]が有効なIOプログラムコンポーネントにならないことを招き,より概括的でより汎用的なIOプロセスの開発を試みなければならない.新しいIOタイプ構造を見てみましょう:Process[F[],O]
trait Process[F[_],O]{}
object Process {
  case class Halt[F[_],O](err: Throwable) extends Process[F,O] //  ,err      
  case class Emit[F[_],O](os: Seq[O], ns: Process[F,O]) extends Process[F,O] //  os,ns=nextState 
  case class Await[F[_],A,O](  //    
    rq: F[A],  //       。        
    rf: Either[Throwable,A] => Process[F,O], //   F[A]        
    fb: Process[F,O], //fallback         fb  
    fl: Process[F,O]  //finalizer     ,    
    ) extends Process[F,O]
}

この新しいタイプは一連のOタイプ元素を生成できることを表す.これをList[O]としてコンポーネントを記述することができる.重要なのは、ファイルやデータベースの内容を読み取るなど、F[A]を演算することで外部と連絡を取ることができることです.演算F[A]で返されたデータはrf関数の処理を経て新しい状態に移行する:データ入力が正常に完了したときにfbを実行して新しい状態に入り、データ入力中に異常が発生した場合にflを実行して戦場を整理し、資源を解放する.Haltを改善し,終了状況情報を返すようにした.これによりプロセスは全体的に透明になり、より安全になります.さらに重要なのはProcessが拡張できることです.
安全で利用可能なIOタイプとして、まずProcess[F,O]の基本コンポーネントを設計します.
  case object End extends Exception   //      
  case object Kill extends Exception  //          
  //  p,        Halt       
  def Try[F[_],O](p: Process[F,O]): Process[F,O] =
    try p
    catch { case e: Throwable => Halt[F,O](e) }  //      

プロセスを演算するときにTryを使用して異常情報をキャプチャし、制御可能な状態Halt(err)に戻ります.なぜなら、終了状況-End:正常終了、Kill:強制終了およびThrowable:異常終了が発生したことを戻り状態から知ることができるからです.
呼び出しが容易なヘルプ関数(helper functions)を設計します.
  //         emit	,          ,      
  // emitAll(os1, Emit(os2,ns)) == Emit(os1 ++ os2, ns)
  def emitAll[F[_],O](outseq: Seq[O], nxtState: Process[F,O] = Halt[F,O](End)): Process[F,O] =
    nxtState match {
    	case Emit(os,ns) => Emit(outseq ++ os, ns)  //      Emit,     
    	case _ => Emit(outseq,nxtState)  //      
    }  
  //   O      
  def emit[F[_],O](o: O, nxs: Process[F,O] = Halt[F,O](End)): Process[F,O] = emitAll[F,O](Seq(o),nxs)
  //Await        
  def await[F[_],A,O](req: F[A])(rcfn: Either[Throwable,A] => Process[F,O])(
      fallback: Process[F,O] = Halt[F,O](End), 
      finalizer: Process[F,O] = Halt[F,O](End)) = Await(req,rcfn,fallback,finalizer)

さらにヘルプ関数があります.
  //          f
  def onHalt(f: Throwable => Process[F,O]): Process[F,O] = this match {
    case Halt(e) => Try { f(e) }     //       ,  f
    case Emit(os,ns) => emitAll(os,ns.onHalt(f))  //     ,        。        
    case Await(rq,rf,fb,fl) => await(rq)(rf andThen (_ onHalt(f)))(fb,fl) //     ,        
  }
  //        。        
  def drain[O2]: Process[F,O2] = this match {
  	case Halt(e) => Halt(e)
  	case Emit(os,ns) => ns.drain
  	case Await(rq,rf,fb,fl) => Await(rq,rf andThen (_ drain))
  }
  //        ,     ,    ,      
  def asFinalizer: Process[F,O] = this match {
  	case Halt(e) => Halt(e)
  	case Emit(os,ns) => Emit(os, ns.asFinalizer)
  	case Await(rq,rf,fb,fl) => await(rq){
  		case Left(Kill) => this.asFinalizer   //      ,      
  		case x => rf(x)
  	}(fb,fl)
  }
  //      p,          
  def onComplete(p: => Process[F,O]) =
    this.onHalt {
    	case End => p.asFinalizer  //    
    	case err => p.asFinalizer ++ Halt(err) //     ,   p,          
    }

2つのprocessを接続するために++関数も呼び出しました.例えば、p.asFinalizer++Halt(err).次に、この関数を実装します.
 //    process, this         p   
 def ++(p: Process[F,O]): Process[F,O] = this match {
 	 case Halt(e) => Try { p }    //   ,     p
 	 case Emit(os,ns) => emitAll(os, ns ++ p)  //    ,    
 	 case Await(rq,rf,fb,fl) =>Await(rq, rf andThen (_ ++ p), fb ++ p, fl ++ p) //    ,    
 }
 //++        。          
 def append(p: Process[F,O]): Process[F,O] =
   onHalt {
   		case End => Try { p }  //                 p
   		case err => Halt(err)
   }

私たちがよく知っているmap、flatmap、repeatもあります.
  def map[O2](f: O => O2): Process[F,O2] = this match {
  	case Halt(e) => Halt(e)
  	case Emit(os,ns) => Try { if (os.isEmpty) ns map f
  	                    else emit(f(os.head), emitAll(os.tail, ns) map f) }
  	case Await(rq,rf,fb,fl) => Await(rq, rf andThen (_ map f), fb map f, fl map f)
  }
  def flatMap[O2](f: O => Process[F,O2]): Process[F,O2] = this match {
  	case Halt(e) => Halt(e)
  	case Emit(os,ns) => Try { if (os.isEmpty) ns flatMap f
  	                    else f(os.head) ++ emitAll(os.tail, ns).flatMap(f) }
  	case Await(rq,rf,fb,fl) => Await(rq,rf andThen(_ flatMap f), fb flatMap f, fl flatMap f)
  }
  def repeat: Process[F,O] = this ++ this.repeat

注意:O値の遷移のみを対象とします.ちなみに、演算の安全を保証するいくつかのヘルプ関数を列挙します(Tryを使用してみました):
  //  p,        Halt       
  def Try[F[_],O](p: Process[F,O]): Process[F,O] =
    try p
    catch { case e: Throwable => Halt[F,O](e) }  //      
  //  p,          fl        
  def TryOr[F[_],O](p: Process[F,O], fl: Process[F,O]): Process[F,O] =
    try p
    catch {case e: Throwable => fl ++ Halt[F,O](e) }
  //  p,      ,  fb,          fl
  def TryAwait[F[_],O](p: Process[F,O])(fb: Process[F,O], fl: Process[F,O]): Process[F,O] =
    try p
    catch {
    	case End => fb  //    
    	case e: Throwable => fl ++ Halt(e)
    }

プロセス[F[]と言う以上、O]はより一般的なIOタイプであり、プロセス[I,O]はプロセス[F[]であるべきである.O]の特殊なケースです.今最も重要なのは、このFを形成してIタイプの入力しか受け入れられないように制限する必要があります.
   case class Is[I]() {
   	sealed trait f[X] {}
   	case object get extends f[I]  //f[X]      ,  f[I],   X    I
   }
   def Get[I] = Is[I]().get
   
   type Process1[I,O] = Process[Is[I]#f,O]   
   def halt1[I,O]: Process1[I,O] = Halt[Is[I]#f,O](End)
   def emit1[I,O](o: O, ns: Process1[I,O] = halt1[I,O]): Process1[I,O] = emit(o,ns)
   def emitAll1[I,O](os: Seq[O], ns: Process1[I,O] = halt1[I,O]): Process1[I,O] = emitAll(os,ns)
   def await1[I,O](rf: I => Process1[I,O],fb: Process1[I,O] = halt1[I,O],fl: Process1[I,O] = halt1[I,O]) =
     await(Get[I]){(ei: Either[Throwable,I]) => ei match {  //F[A]>>>Get[I],    I    
     	case Left(End) => fb
     	case Left(err) => halt1[I,O]
     	case Right(i) => Try { rf(i) }
     }}(fb,fl)

プロセス[I,O]に対する以前のコンポーネントも同様に導出できる.
  def lift[I,O](f: I => O): Process1[I,O] =
    await1[I,O](i => emit(f(i))) repeat
  def filter[I](f: I => Boolean): Process1[I,I] =
    await1[I,I](i => if (f(i)) emit(i) else halt1)
  def take[I](n: Int): Process1[I,I] =
    if (n > 0) await1[I,I](i => emit(i, take(n-1)))
    else halt1
  def takeWhile[I](f: I => Boolean): Process1[I,I] =
    await1[I,I](i => if (f(i)) emit(i, takeWhile(f)) else halt1)
  //         
  def id[I]: Process1[I,I] = await1[I,I](i => emit(i,id))
  def passUnchanged[I]: Process1[I,I] = lift[I,I](identity)
  def drop[I](n: Int): Process1[I,I] =
    if (n > 0) await1[I,I](i => drop[I](n-1))
    else passUnchanged
  def dropWhile[I](f: I => Boolean): Process1[I,I] = 
    await1[I,I](i => if (f(i)) id else emit(i, dropWhile(f)))   

注意:コンポーネント実装における書き方は,従来のProcess[I,O]と同様であるが,今回我々の戻りタイプはProcess 1[I,O]である.
IO Processドッキングの最も重要なコンポーネントカテーテル(pipe|>)コンポーネントに来ました.
 //    process
 @annotation.tailrec
 final def kill[O2]: Process[F,O2] = this match {
 	 case Halt(e) => Halt(e)    //        
 	 case Emit(os,ns) => ns.kill  //   ,    kill    
 	 case Await(rq,rf,fb,fl) => rf(Left(Kill)).drain.onHalt { //        ,       Kill           
 	 	case Kill => Halt(End)    //        Kill           
 	 	case err => Halt(err)  //       
 	 }
 }
   
 //    process.    emit        await  
 def |>[O2](p2: Process1[O,O2]): Process[F,O2] = {
   @annotation.tailrec
   // Seq[O]  O  O  p2
   def feed(os: Seq[O], ns: Process[F,O], rf: Either[Throwable,O] => Process1[O,O2],
            fb: Process1[O,O2] = halt1[O,O2], fl: Process1[O,O2] = halt1[O,O2]): Process[F,O2] =
       if (os.isEmpty) ns |> await(Get[O])(rf)(fb,fl)
       else rf(Right(os.head)) match {  //                
       	 case Await(rq1,rf1,fb1,fl1) => feed(os.tail,ns,rf,fb,fl)
       	 case p => Emit(os.tail,ns) |> p
       }
   p2 match {
 	  case Halt(e) => this.kill.onHalt {e2 => Halt(e) ++ Halt(e2)} //         。           
 	  case Emit(os,ns) => emitAll(os, this |> ns)
 	  case Await(rq,rf,fb,fl) => this match {
 	 	 case Halt(e) => Halt(e) |> p2
 	 	 case Await(rq0,rf0,fb0,fl0) => await(rq0)(rf0 andThen (_ |> p2))(fb0 |> p2, fl0 |> p2)
 	 	 case Emit(os,ns) => Try {feed(os,ns,rf,fb,fl)}   //          。      
 	 }
  }
 }

下流pが入力awaitを待つと同時に上流thisがemitを送信している状態を考慮してドッキングを実現するだけである.他の状態は、自分でポートを調整してからドッキングします.
この|>があれば、プロセス1コンポーネントのペアをプロセス[F,O]に接続できます.
  def filter(f: O => Boolean): Process[F,O] = this |> Process.filter(f)
  def take(n: Int): Process[F,O] = this |> Process.take(n)
  def takeWhile(f: O => Boolean): Process[F,O] = this |> Process.takeWhile(f)
  def drop(n: Int): Process[F,O] = this |> Process.drop(n)
  def dropWhile(f: O => Boolean): Process[F,O] = this |> Process.dropWhile(f)

以上のコンポーネントはいずれも出力Oをフィルタリングするものである.
プロセス[F,O]により,アルファベットTのように,入力が上から左右に入る2つの入力を実現することもできる.
まず、左のIクラス、右のI 2クラスの入力のみを許容するように、F[A]を再構築します.
  case class T[I,I2]() {  
  	sealed trait f[X] {def get: Either[I => X, I2 => X]}
  	val L = new f[I] { def get = Left(identity) }
  	val R = new f[I2] { def get = Right(identity) }
  }
  def L[I,I2] = T[I,I2]().L
  def R[I,I2] = T[I,I2]().R
  type Tee[I,I2,O] = Process[T[I,I2]#f,O]

Teeタイプは2頭入力のIO Processタイプで、左はIのみ、右はI 2のみ入力できます.
Teeを構築する基本コンポーネントを再定義する必要があります.
  def haltT[I,I2,O] = Halt[T[I,I2]#f,O](End)
  def emitT[I,I2,O](o: O, ns: Tee[I,I2,O] = haltT) = emit[T[I,I2]#f,O](o,ns)
  //    
  def awaitL[I,I2,O](rf: I => Tee[I,I2,O], fb: Tee[I,I2,O] = haltT, fl: Tee[I,I2,O] = haltT): Tee[I,I2,O] =
    await[T[I,I2]#f,I,O](L){ //F[A] >>> L >>>    I   
    	case Left(End) => fb
    	case Left(err) => fl ++ Halt(err)
    	case Right(i) => Try { rf(i) }
    }(fb,fl)
  //    
  def awaitR[I,I2,O](rf: I2 => Tee[I,I2,O], fb: Tee[I,I2,O] = haltT, fl: Tee[I,I2,O] = haltT): Tee[I,I2,O] =
    await[T[I,I2]#f,I2,O](R) { //F[A] >>> R >>>    I2   
    	case Left(End) => fb
    	case Left(err) => fl ++ Halt(err)
    	case Right(i2) => Try { rf(i2) }
    }(fb,fl)

zipは両側入力挿入動作です.このTeeタイプでzipを実現することができます.
 def zipWith[I,I2,O](f: (I,I2) => O): Tee[I,I2,O] =
   awaitL[I,I2,O](i => awaitR[I,I2,O](i2 => emitT(f(i,i2)))) repeat
 //            pair
  def zip[I,I2]: Tee[I,I2,(I,I2)] = zipWith[I,I2,(I,I2)]((_,_))
  //         
  def interleave[I]: Tee[I,I,I] = awaitL[I,I,I](i => awaitR(i2 => emitT(i) ++ emitT(i2))) repeat
  //       ,    
  def passL[I,I2]: Tee[I,I2,I] = awaitL(emitT(_,passL))
  //       ,    
  def passR[I,I2]: Tee[I,I2,I2] = awaitR(emitT(_,passR))
  //         fb
  def awaitLOr[I,I2,O](fb: Tee[I,I2,O])(rf: I => Tee[I,I2,O]): Tee[I,I2,O] =
   awaitL(rf,fb)
  //         fb
  def awaitROr[I,I2,O](fb: Tee[I,I2,O])(rf: I2 => Tee[I,I2,O]): Tee[I,I2,O] =
   awaitR(rf,fb)
  //                padI     padI2
  def zipWithAll[I,I2,O](padI: I, padI2: I2)(f: (I,I2) => O): Tee[I,I2,O] = {
  	val fbL = passL[I,I2] map {f(_, padI2)}  //         ,  fbL,    padI2 Tee  
  	val fbR = passR[I,I2] map {f(padI, _)}   //         ,  fbR,    padI Tee  
  	awaitLOr(fbL)(i =>
  	awaitROr(fbR)(i2 => emitT(f(i,i2)))) repeat
  }

次はTeeの一般的なヘルプ関数です.これは2つの入力の代表的な形式です.
  //        
  def tee[O2,O3](p2: Process[F,O2])(t: Tee[O,O2,O3]): Process[F,O3] = {
    @annotation.tailrec
    def feedL(os: Seq[O], ns: Process[F,O], p: Process[F,O2], t: Tee[O,O2,O3], //     
      rf: Either[Throwable,O] => Tee[O,O2,O3]): Process[F,O3] =  //    O
      if (os.isEmpty) (ns tee p)(t)  //        
      else rf(Right(os.head)) match { //    O     
      	case Await(rq0,rf0,fb0,fl0) => feedL(os.tail,ns,p,t,rf0) //       await     os.tail
      	case t0 => (Emit(os.tail,ns) tee p)(t0) //tee     
      }
    @annotation.tailrec
    def feedR(os: Seq[O2], ns: Process[F,O2], p: Process[F,O], t: Tee[O,O2,O3], //     
      rf: Either[Throwable,O2] => Tee[O,O2,O3]): Process[F,O3] =  //    O2
      if (os.isEmpty) (p tee ns)(t)  //        
      else rf(Right(os.head)) match { //    O2     
      	case Await(rq0,rf0,fb0,fl0) => feedR(os.tail,ns,p,t,rf0) //       await     os.tail
      	case t0 => (p tee Emit(os.tail,ns))(t0) //tee     
      }
    t match {
    	case Halt(e) => this.kill onComplete p2.kill onComplete Halt(e) //T           process
    	case Emit(os,ns) => emitAll(os, (this tee p2)(ns)) //T    ,     tee
    	case Await(side,rf,fb,fl) => side.get match {  //T        ,          
    		case Left(isO) => this match {  //    ,  O
    			case Halt(e) => p2.kill onComplete Halt(e) //      ,            
    			case Await(rqL,rfL,fbL,flL) => await(rqL)(rfL andThen (this2 => this2.tee(p2)(t)))() //       
    			case Emit(os,ns) => Try {feedL(os,ns,p2,t,rf)} //     , feedL  O  
    		}
    		case Right(isO2) => p2 match {
    		  case Halt(e) => this.kill onComplete Halt(e) //      ,            
    		  case Await(rqR,rfR,fbR,flR) => await(rqR)(rfR andThen (p => this.tee(p)(t)))()
    		  case Emit(os,ns) => Try {feedR(os,ns,this,t,rf)} //     , feedL  O  
    		}
    			
    	}
    }
  }

zipWithは次のように書くことができます.
  // tee   zipWith
  def zipWith[O2,O3](p2: Process[F,O2])(f: (O,O2) => O3): Process[F,O3] =
   (this tee p2)(Process.zipWith(f))

完全なIOプログラムはデータソースSourceとデータ終点Sinkの操作を含まなければならないが、Process[F,O]はデータソース(Source)タイプを表すことができるだろうか.Process[F,O]の読み取りAwaitを見てみましょう.
  case class Await[F[_],A,O](  //    
    rq: F[A],  //       。        
    rf: Either[Throwable,A] => Process[F,O], //   F[A]        
    fb: Process[F,O] = Halt[F,O](End), //fallback         fb  
    fl: Process[F,O] = Halt[F,O](End)  //finalizer     ,          
    ) extends Process[F,O]

もし私たちがF[A]をIO[A]に変えたら:
  case class Await[IO[_],A,O](  //    
    rq: IO[A],  //       。        
    rf: Either[Throwable,A] => Process[IO,O], //   F[A]        
    fb: Process[IO,O] = Halt[IO,O](End), //fallback         fb  
    fl: Process[IO,O] = Halt[IO,O](End)  //finalizer     ,          
    ) extends Process[IO,O]

実際にProcess[I,O]はProcess[F,O]の一例である.IOを演算するだけでデータソースからデータを読み取ることができます(run IOは結果を返します).まずIOタイプを見てみましょう.前にこのタイプについて議論しました.
//   IO ,      
trait IO[A] { self =>
    def run: A
    def map[B](f: A => B): IO[B] =
      new IO[B] { def run = f(self.run) }
    def flatMap[B](f: A => IO[B]): IO[B] =
      new IO[B] { def run = f(self.run).run }
}
object IO {
    def unit[A](a: => A): IO[A] = new IO[A] { def run = a }
    def flatMap[A,B](fa: IO[A])(f: A => IO[B]) = fa flatMap f
    def apply[A](a: => A): IO[A] = unit(a) // syntax for IO { .. }
}

データソースSourceからデータを読み込むプログラムを書くことができます.
 //    src       
  def collect[O](src: Process[IO,O]): IndexedSeq[O] = {  // src  IndexedSeq
  	def go(curState: Process[IO,O], accSeq: IndexedSeq[O]): IndexedSeq[O] =
  	  curState match {
  	  	case Halt(e) => accSeq     //  ,      
  	  	case Emit(os,ns) => go(ns, accSeq ++ os)  //          ,        accSeq
  	  	case Await(rq,rf,fb,fl) => go(Try { rf(Right(rq.run)) }, accSeq) //       
  	  	                           //        Try       Halt(err)
  	  }
  	  go(src,IndexedSeq())
  }

なお、上述したプロセス[IO,O]は、プロセス[F,O]の特別な例として、Sourceからのデータの読み出しが可能である.次の章では、具体的な実行可能な案について説明します.
次に,データ終点SinkとProcess[F,O]の関係を見る.Process[F,O]タイプで出力機能を実現し,つまりSourceの入力をSinkに送信することを望んでいる.Sinkを次のように表します.
  type Sink[F[_],O] = Process[F, O => F[Unit]] 
これはSinkが出力Oに一連の関数を提供していることを理解するのは難しくないだろう.これらの関数は入力パラメータOを受信してFを実行し、FはIO演算のような結果を返さない演算である.
SourceとSinkタイプの実用化については、次号「IOプロセス実用化-IOプロセスin action」で具体的に説明します.