読者です 読者をやめる 読者になる 読者になる

北海道苫小牧市出身の初老PGが書くブログ

永遠のプログラマを夢見る、苫小牧市出身のおじさんのちらしの裏

I/Oストリーミングライブラリの実装の基礎 - 前編

conduitpipesなどのストリーミングライブラリの実装は結構わかりにくい。Pipes to Conduitsという一連のエントリが分かりやすい解説なのだが、それでも序盤からFunctorやFreeモナドを駆使していてハードルが高めな印象を受ける。

理解するには自分で実装するのが一番の近道だろうから、このエントリでごく簡単なストリーミングライブラリを実装してみよう。ストリーミングライブラリではI/Oを扱うのが目的であるため、本来であればモナドを扱えるように実装しなければ意味がないのだが、ストリーミングの基本的な仕組みとしてはモナドは重要ではないので、ここでは純粋な値のみを扱うストリームを作成する。

ストリームを表す型

ここが一番重要かつわかりにくい部分だと思われる。今回の実装では、「入力」「出力」「結果」の3つの型をストリームで利用する。が、「出力」と「結果」の違いとはいったいなんなのか。

ストリームは、命令の列である再帰型で表現する。ちょうど無限リストを先頭から舐めていくのと同様のイメージで、この命令列を先頭から順次読み込み、それぞれの命令を解釈することで処理を進める。この命令列を全て完了した時に返すのが「結果」の型である。一方で、実装するストリームには2つの命令がある。1つは上流からのデータを受け取るWaitで、もう1つが下流へデータを流すNotifyである。この2つの命令で扱う型を、それぞれ「入力」「出力」の型と呼ぶ。

具体的には以下の型となる。命令列のDoneが終端であり、結果の型であるaを返す。Notify、Waitは命令列を形成するために、次の命令を表す自身の型、つまりStream i o a型の値を保持する必要がある。それとは別に、Waitは入力の型であるi、Notifyは出力の型であるoを扱っているのがわかるだろう。

data Stream i o a =
  Done a
  | Notify o (Stream i o a)
  | Wait (i -> (Stream i o a))

この型の宣言でもう一個わかりにくいのが、Waitが関数を持つことだろう。これは、各命令が引数と戻り値を持つことに関係する。

下流への通知を行う命令であるNotifyは、下流へ渡す値を引数にとるが、戻り値は必要ない。一方、上流から値を受け取る命令であるWaitは、引数は不要だが上流から渡された値を戻り値として受け取りたい。この、戻り値の受け取りが関数となって現れる。いわゆる、継続渡し方式での表現である。

例えばの話だが、Notifyには戻り値はないと書いたが、常に同じ値である()型の値を戻すと見なすこともできる。その場合には、Notifyの定義も継続渡しを用いて以下のようにすることもできる。が、面倒になるだけなので今回の実装では採用しない。

    Notify o (() -> Stream i o a)

ストリームを作る

ストリームは命令の列なので、WaitとNotifyを並べた列を作ることになる。例えば、以下のような入れ子の値ができるとよい。これは、上流から値が来るのを待ち(Wait)、下流へその値を流し(Notify)、その後結果として0を返すという命令の列を表す。なお、ここではまだそういう処理をしたいんだという意志を記述しただけで、実際にこれらの命令を実行する部分はまったく考えていない。

Wait (\x -> Notify x (Done 0))

さて、この程度であればコンストラクタを自分で書いて組み立てることができるが、大きな部品を作るにはこれでは不便過ぎる。Haskellで命令の列を記述すると言えばdo記法なので、これを利用しない手はない。ということで、Streamをモナドインスタンスにする。

instance Monad (Stream i o) where
  return = Done
  m >>= f = case m of
        Done x -> f x
        Notify x next -> Notify x (next >>= f)
        Wait cont -> Wait (\i -> (cont i) >>= f)

この定義でなぜ命令の列を作ることができるのか。これを見てギエーと思う方も居るかもしれないが、実はそんなに難しいことはしていない。アイデアとしては、命令列の末尾にあるはずのDoneを探しに行き、そいつを次の命令列で置換すれば、最初の命令列に次の命令列が連結されるだろうというものだ。WaitとNotifyに対しては、実は何もしていない。次の命令を取り出して、末端のDoneを探すために(>>=)を再帰的に呼び出しているだけである。実際に仕事をしているのはDoneを見つけた時で、これを次に続く命令列に置き換えている。前の命令の列の「結果」であるxは、次に続く命令列を生成するためのfに渡される。この「結果」により、後に続く命令列に影響を与えることができる。

もう一つ、命令列を組立てやすくするための下ごしらえをしておこう。コンストラクタであるWaitとNotifyをそのまま使ってもいいのだが、こいつをもうちょっとそれっぽくラップしておく。具体的に言うと、命令列の末尾にはDoneが必要なので、それを加えるためのラッパーとなる。

notify :: o -> Stream i o ()
notify x = Notify x (Done ())

wait :: Stream i o i
wait = Wait (\x -> Done x)

waitして得られた値は、「結果」として次の命令列に渡す必要がある。そのため、Doneに含める。一方、notifyの命令については、以降の命令に渡す必要がある情報はない。そこで、固定値として()をDoneに含めている。しつこく書いているので大丈夫かとは思うが、waitで得られた値は下流にいるストリームに渡す「出力」ではなく、以降の命令に渡す「結果」であることに注意しよう。

これらの道具さえあれば、命令の列を作るのは簡単だ。文字列を3つ下流へ送り出すストリームは以下のように書ける。

sourceString :: Stream () String Int
sourceString = do
  notify "Hogehoge"
  notify "BarBar"
  notify "Hehhe"
  return (-1)

処理中に上流から文字列を2つ受け、その長さの和を「結果」として返すストリームは以下のように書ける。Voidは値を一つも含まない型で、下流に何も送り出さないことを意味している。

data Void

waitString :: Stream String Void Int
waitString = do
  x <- wait
  y <- wait
  return (length x + length y)

上流から受けた文字列を逆転して下流に送るストリームは以下のようになる。foreverを使うことで、このストリームは命令の無限列となっている。そのため、「結果」の型はIntとなっているものの、いくら命令を先に進めても命令列の終端、つまりDoneを見つけることはできない。

reverseString :: Stream String String Int
reverseString = forever $ do
  x <- wait
  notify $ reverse x

後編の予告

上流からの値を待つこと、下流へ値を渡すこと、この2つの命令を並べたものとしてストリームを記述することができるようになった。後編では複数のストリームを考える。上流からの値を待つという命令と、下流へ値を渡す命令を持つ2つのストリームがあったとき、この2つのストリームを組み合わせて値の受け渡しを表現できそうな気がする。これを実現する。

さらに、ここまでは単なる命令の列であったストリームの実行部分を作り、ストリームライブラリを完成させる。