北海道苫小牧市出身の初老PGが書くブログ

永遠のプログラマを夢見る、苫小牧市出身のおじさんのちらしの裏

I/Oストリーミングライブラリの実装の基礎 - 後編

前回の続きである。予告通り、2つのストリーミングの「出力」と「入力」をつなげる処理、そして、ストリームの命令列を解釈する処理系を実装する。

2つのストリームを1つにする(考え方)

「出力」と「入力」をどうくっつけるといいのか。それぞれのストリームは命令の列なので、2つの命令の列があると考えられる。ちょっと考えると、これらをうまく並べ替えることで、1つの命令の列を作れることがわかる。

例えば、「Notify(1), Wait(2), Done(3)」という列の出力を、「Wait(4), Notify(5), Done(6)」という2つの列の入力につなぐことを考えよう。様々な処理をつなげる場合、取り出したい演算結果は下流で生成されると考えられるだろう。そこで、合成後の命令の列は下流のWait(4)から開始される。Wait(4)では上流からのデータを期待するので、次は上流の命令であるNotify(1)を実行する必要がある。このNotify(1)によってWait(4)がデータを受け取れたので、下流側のNotify(5)を実行することができる。その後、Done(6)を実行すれば処理は完了だ。ここでWait(4)とNotify(1)は互いにデータを受け渡すことで満たされているので、特に命令列として残す必要はない。よってこの場合の合成結果は、「Notify(5), Done(6)」という命令の列となる。Wait(2)やDone(3)は実行する必要がないことに注意されたい。

では、順番を変えて「Wait(4), Notify(5), Done(6)」の出力を「Notify(1), Wait(2), Done(3)」の入力に渡す場合はどうなるか。先ほどと同じように考えると、Notify(1)、Wait(2)、Wait(4)、Notify(5)、Done(3)という順で命令が辿られる。ここでは、Wait(2)でデータを待っているところへNotify(5)でデータを送っており、これらは相殺されていることからこの2つの命令を合成後の列へ残す必要はない。結論としては、「Notify(1), Wait(4), Done(3)」という1つの命令の列に合成できる。

このように2つのストリームは、1つのストリームにまとめることができる。ところで、何気なくNotifyとWaitが相殺されると書いているが、相殺された結果はどこへ行ってしまうのだろうか。Waitは継続持つデータだったため、上流のNotifyで出力されたデータは下流のWaitの継続に渡ってくることになる。そして、以降の処理では変数に束縛されたデータを参照することができ、最終的には何らかの演算が施されてDoneの値として表に出てくることになる。この次の実装の節で詳しく見る。または、今回のライブラリの実装では考えていないが、モナドを扱えるストリームの場合は、モナドの文脈*1に随時影響を与えることもできる。できるというか、むしろその方が普通ではある。

2つのストリームを1つにする(実装)

先ほどの考え方を実装に落としこもう。上流のNotifyと下流のWaitで値を受け渡しできなければいけないので、上流の「出力」の型と下流の「入力」の型は同じものになる。また、「結果」については、上流と下流のどちらからも出てくる可能性がある。Eitherで表現することも可能だろうが、ストリームをたくさん組み合わせた時に型が複雑になり過ぎるので、ここでは「結果」の型は上流と下流で同じものとする。これらの考察をまとめると、2つのストリームを繋ぐ演算 |==| の型は以下のようになる。左側が上流であることを想定している。

(|==|) :: Stream i x a -> Stream x o a -> Stream i o a

考え方のところで書いたように、合成後の命令列は下流の命令から始まる。下流の命令の列の先頭要素によって場合分けを行う。まず、下流の命令が全て終わっていてDoneのみの場合は、上流の命令は無視して列を終えてしまってよい。

p1 |==| Done x = Done x

下流の命令がNotifyの場合は、合成後も「出力」の命令として残す必要がある。よって、合成後の列にもそのままNotifyとして残す。その後に続く命令の列であるnextにはWaitが含まれるかもしれないので、上流の列と再帰的にさらに合成させる。

p1 |==| Notify x next = Notify x (p1 |==| next)

下流の命令がWaitの場合は、上流の命令を解釈する必要がある。今度は上流の命令の列の先頭要素について場合分けする。上流がDoneの場合は、下流の残りの命令は無視してDoneを置いて終わりとする。

Done x |==| Wait cont = Done x

上流がWaitだった場合は、合成後も「入力」の命令として残す。上流の後に続く命令にNotifyが含まれるかもしれないので、後に続く命令の列であるcont' iと下流の命令の列をさらに再帰的に合成する。Waitは継続渡しで表現される命令のため、後に続く命令列を取り出すための継続を記述する必要があることに注意する。

Wait cont' |==| Wait cont = Wait (\i -> (cont' i) |==| Wait cont)

上流がNotifyであれば、下流のWaitに値を受け渡すことができる。上流でNotifyしようとしている値であるxを、下流のWaitの継続であるcontに渡す。下流の残りの命令列では、cont内の束縛変数に束縛された上流からの値を利用できる。このNotifyとWaitは合成後の列には含めずに除去し、上流と下流で残っている命令の列をさらに再帰的に合成する。

Notify x next |==| Wait cont = next |==| (cont x)
== 再帰は毎回上流または下流の命令を1つ以上消費するので、有限の列同士であれば必ず停止する。少なくとも、同じ命令が延々と続くことはない。

ストリームを実行して結果を得る

前にも書いたが、ここまではあくまでも命令の列を定義しただけで、その列を解釈して実行する部分はまだない。言わば、ストリームというプログラムを記述するDSLだけができている状態なので、次はその処理系を作らなければならない。

幸い、ストリームにはWaitとNotifyしかないので処理系を作るのは簡単である。が、意図せぬ動作を防ぐためには、すべてのストリームを実行可能とするのではなく、特定の型を持つストリームのみ実行可能であるとしておいた方が都合がよい。そのためには、処理系がWaitとNotifyという命令を読み込んだ際に何をすべきかなのかを考える必要がある。まず、Waitであるが、これは何か上流からの入力を期待している状態である。しかし、漠然と入力が欲しいと言われたからと言って、処理系から適当な値を渡してもいい結果は得られないであろう。例えば「入力」の型がIntであるストリームからWait命令を繰り返し出されても、1から順に自然数を供給するのがいいのか、それとも0を渡し続けるのがいいのか、処理系側は判断しかねる。よって、実行できるストリームの「入力」の型を、()であるべきと定める。こうしておけば、Wait命令に対して処理系からは()をひたすら入力すればいいことになる。

ではNotifyはどうだろう。これは下流に値を出力しようとしている状態であり、処理系がその値をすべて集めるということも可能である。しかし、命令列は無限かもしれなく、従ってNotifyを無限に含む可能性があることを考えると、「出力」をすべて処理系が集めるのはあまり懸命ではなさそうだ。逆に、Notify命令で出力しようとしている値をすべて捨てるような処理系を実装することもできる。しかし、そもそも捨てても構わないような値が出てくるという状況は、ストリームを合成した際の不具合である可能性が高い。そう考えると、評価できるストリームからは何も出力されないのが理想である。よって、「出力」の型には何も値を持たない型であるVoidを利用する。「出力」の型がVoidであるということは、言い換えるれば、評価できるストリームにはNotify命令は含まれないということだ。

ストリームの命令列をすべて処理すると、最後に「結果」を含むDoneがある。この値をストリームを評価した戻り値とする。ここまでの話を実装したのが以下だ。命令列をすべて辿るために、runStreamは再帰関数として書かれている。Waitが出てきた場合は、継続に対して処理系から機械的に()を供給する。Notifyは定義はしているが、実際は利用されることはない。

runStream :: Stream () Void a -> a
runStream p = case p of
runStream (Done x) = x
runStream (Notify _ next) = runStream next
runStream (Wait cont) = runStream (cont ())

ここで人によっては次のような疑問が出てくるかもしれない。評価できるストリームにNotifyが含まれてはいけないのなら、そもそもなぜNotify命令があるのか。Notify命令は、評価可能なストリームを組み立てるのに使われる、部品となるストリームの中で用いられる。ところが、すでに説明したように、ストリーム同士を適切に合成することでNotifyはWaitと相殺されて消えてしまう。Notifyだけを行うストリームへWaitだけを行うストリームを合成すれば、Done命令だけを含むストリームができる。今回実装した副作用を含まないストリームであれば、合成後の実行可能な命令列にはDone命令1個しか残らないのがほとんどだ。そのため、処理系を実装すると大げさなことを書いたが、実は合成の時点でほとんどの仕事が終わっており、処理系は何もするべきことが残っていないのである。

最後に、前編で作ったストリームを組み合わせて実行してみよう。コードは以下のようになる。

stream :: Stream () Void Int
stream = sourceString |==| reverseString |==| waitString

result :: Int
result = runStream stream

まとめ

ストリームライブラリの実装について、特にわかりにくいと思われる部分を中心に概要を書いた。ストリームが再帰型による命令の列で表現されること、「出力」と「結果」の概念はきちんと区別すること、そして合成によってNotifyとWaitの命令が相殺されて消えることの3点がポイントとなる。各命令にモナドの値を含めることで、ストリームの表現力をさらに高められる。特にベースにIOのモナドが入っていれば、副作用のある処理をストリームの間で互いに連携しながら実行できるようになる。詳細は、前編でも紹介したPipes to Conduitsに書いてある。I/Oリソースの管理方法などもこちらが詳しい。

(蛇足)命令列とFreeモナド

今回のストリームのような命令列をdo記法で構成する手法は、Freeモナドによって抽象化できる。Freeモナドを使えば、Stream型は以下のように記述できる。

data StreamF i o next =
  Notify o next
  | Wait (i -> next)

instance Functor (StreamF i o) where
  fmap f (Notify x y) = Notify x (f y)
  fmap f (Wait c) = Wait (\x -> f (c x))

type Stream i o = Free (StreamF i o)

DoneはFree型が吸収してくれ、Free型のコンストラクタであるPureを代わりに使える。NotifyやWaitはFree型のコンストラクタであるFreeで包んで使う。Freeモナドを使った実装は、gistにupしてある。FreeモナドについてはWhy free monads matterが詳しい。

*1:わかりにくければ副作用と言ってもいいだろう