Pixel Pedals of Tomakomai

北海道苫小牧市出身の初老の日常

I/Oストリーミングライブラリの実装の基礎 - 後編

前回の続きである。予告通り、2つのストリーミングの「出力」と「入力」をつなげる処理、そして、ストリームの命令列を解釈する処理系を実装する。

2つのストリームを1つにする(考え方)

「出力」と「入力」をどうくっつけるといいのか。それぞれのストリームは命令の列なので、2つの命令の列があると考えられる。ちょっと考えると、これらをうまく並べ替えることで、1つの命令の列を作れることがわかる。

例えば、「Notify(1), Wait(2), Done(3)」という列の出力を、「Wait(4), Notify(5), Done(6)」という2つの列の入力につなぐことを考えよう。様々な処理をつなげる場合、取り出したい演算結果は下流で生成されると考えられるだろう。そこで、合成後の命令の列は下流のWait(4)から開始される。Wait(4)では上流からのデータを期待するので、次は上流の命令であるNotify(1)を実行する必要がある。このNotify(1)によってWait(4)がデータを受け取れたので、下流側のNotify(5)を実行することができる。その後、Done(6)を実行すれば処理は完了だ。ここでWait(4)とNotify(1)は互いにデータを受け渡すことで満たされているので、特に命令列として残す必要はない。よってこの場合の合成結果は、「Notify(5), Done(6)」という命令の列となる。Wait(2)やDone(3)は実行する必要がないことに注意されたい。

では、順番を変えて「Wait(4), Notify(5), Done(6)」の出力を「Notify(1), Wait(2), Done(3)」の入力に渡す場合はどうなるか。先ほどと同じように考えると、Notify(1)、Wait(2)、Wait(4)、Notify(5)、Done(3)という順で命令が辿られる。ここでは、Wait(2)でデータを待っているところへNotify(5)でデータを送っており、これらは相殺されていることからこの2つの命令を合成後の列へ残す必要はない。結論としては、「Notify(1), Wait(4), Done(3)」という1つの命令の列に合成できる。

このように2つのストリームは、1つのストリームにまとめることができる。ところで、何気なくNotifyとWaitが相殺されると書いているが、相殺された結果はどこへ行ってしまうのだろうか。Waitは継続持つデータだったため、上流のNotifyで出力されたデータは下流のWaitの継続に渡ってくることになる。そして、以降の処理では変数に束縛されたデータを参照することができ、最終的には何らかの演算が施されてDoneの値として表に出てくることになる。この次の実装の節で詳しく見る。または、今回のライブラリの実装では考えていないが、モナドを扱えるストリームの場合は、モナドの文脈*1に随時影響を与えることもできる。できるというか、むしろその方が普通ではある。

2つのストリームを1つにする(実装)

先ほどの考え方を実装に落としこもう。上流のNotifyと下流のWaitで値を受け渡しできなければいけないので、上流の「出力」の型と下流の「入力」の型は同じものになる。また、「結果」については、上流と下流のどちらからも出てくる可能性がある。Eitherで表現することも可能だろうが、ストリームをたくさん組み合わせた時に型が複雑になり過ぎるので、ここでは「結果」の型は上流と下流で同じものとする。これらの考察をまとめると、2つのストリームを繋ぐ演算 |==| の型は以下のようになる。左側が上流であることを想定している。

(|==|) :: Stream i x a -> Stream x o a -> Stream i o a

考え方のところで書いたように、合成後の命令列は下流の命令から始まる。下流の命令の列の先頭要素によって場合分けを行う。まず、下流の命令が全て終わっていてDoneのみの場合は、上流の命令は無視して列を終えてしまってよい。

p1 |==| Done x = Done x

下流の命令がNotifyの場合は、合成後も「出力」の命令として残す必要がある。よって、合成後の列にもそのままNotifyとして残す。その後に続く命令の列であるnextにはWaitが含まれるかもしれないので、上流の列と再帰的にさらに合成させる。

p1 |==| Notify x next = Notify x (p1 |==| next)

下流の命令がWaitの場合は、上流の命令を解釈する必要がある。今度は上流の命令の列の先頭要素について場合分けする。上流がDoneの場合は、下流の残りの命令は無視してDoneを置いて終わりとする。

Done x |==| Wait cont = Done x

上流がWaitだった場合は、合成後も「入力」の命令として残す。上流の後に続く命令にNotifyが含まれるかもしれないので、後に続く命令の列であるcont' iと下流の命令の列をさらに再帰的に合成する。Waitは継続渡しで表現される命令のため、後に続く命令列を取り出すための継続を記述する必要があることに注意する。

Wait cont' |==| Wait cont = Wait (\i -> (cont' i) |==| Wait cont)

上流がNotifyであれば、下流のWaitに値を受け渡すことができる。上流でNotifyしようとしている値であるxを、下流のWaitの継続であるcontに渡す。下流の残りの命令列では、cont内の束縛変数に束縛された上流からの値を利用できる。このNotifyとWaitは合成後の列には含めずに除去し、上流と下流で残っている命令の列をさらに再帰的に合成する。

Notify x next |==| Wait cont = next |==| (cont x)
== 再帰は毎回上流または下流の命令を1つ以上消費するので、有限の列同士であれば必ず停止する。少なくとも、同じ命令が延々と続くことはない。

ストリームを実行して結果を得る

前にも書いたが、ここまではあくまでも命令の列を定義しただけで、その列を解釈して実行する部分はまだない。言わば、ストリームというプログラムを記述するDSLだけができている状態なので、次はその処理系を作らなければならない。

幸い、ストリームにはWaitとNotifyしかないので処理系を作るのは簡単である。が、意図せぬ動作を防ぐためには、すべてのストリームを実行可能とするのではなく、特定の型を持つストリームのみ実行可能であるとしておいた方が都合がよい。そのためには、処理系がWaitとNotifyという命令を読み込んだ際に何をすべきかなのかを考える必要がある。まず、Waitであるが、これは何か上流からの入力を期待している状態である。しかし、漠然と入力が欲しいと言われたからと言って、処理系から適当な値を渡してもいい結果は得られないであろう。例えば「入力」の型がIntであるストリームからWait命令を繰り返し出されても、1から順に自然数を供給するのがいいのか、それとも0を渡し続けるのがいいのか、処理系側は判断しかねる。よって、実行できるストリームの「入力」の型を、()であるべきと定める。こうしておけば、Wait命令に対して処理系からは()をひたすら入力すればいいことになる。

ではNotifyはどうだろう。これは下流に値を出力しようとしている状態であり、処理系がその値をすべて集めるということも可能である。しかし、命令列は無限かもしれなく、従ってNotifyを無限に含む可能性があることを考えると、「出力」をすべて処理系が集めるのはあまり懸命ではなさそうだ。逆に、Notify命令で出力しようとしている値をすべて捨てるような処理系を実装することもできる。しかし、そもそも捨てても構わないような値が出てくるという状況は、ストリームを合成した際の不具合である可能性が高い。そう考えると、評価できるストリームからは何も出力されないのが理想である。よって、「出力」の型には何も値を持たない型であるVoidを利用する。「出力」の型がVoidであるということは、言い換えるれば、評価できるストリームにはNotify命令は含まれないということだ。

ストリームの命令列をすべて処理すると、最後に「結果」を含むDoneがある。この値をストリームを評価した戻り値とする。ここまでの話を実装したのが以下だ。命令列をすべて辿るために、runStreamは再帰関数として書かれている。Waitが出てきた場合は、継続に対して処理系から機械的に()を供給する。Notifyは定義はしているが、実際は利用されることはない。

runStream :: Stream () Void a -> a
runStream p = case p of
runStream (Done x) = x
runStream (Notify _ next) = runStream next
runStream (Wait cont) = runStream (cont ())

ここで人によっては次のような疑問が出てくるかもしれない。評価できるストリームにNotifyが含まれてはいけないのなら、そもそもなぜNotify命令があるのか。Notify命令は、評価可能なストリームを組み立てるのに使われる、部品となるストリームの中で用いられる。ところが、すでに説明したように、ストリーム同士を適切に合成することでNotifyはWaitと相殺されて消えてしまう。Notifyだけを行うストリームへWaitだけを行うストリームを合成すれば、Done命令だけを含むストリームができる。今回実装した副作用を含まないストリームであれば、合成後の実行可能な命令列にはDone命令1個しか残らないのがほとんどだ。そのため、処理系を実装すると大げさなことを書いたが、実は合成の時点でほとんどの仕事が終わっており、処理系は何もするべきことが残っていないのである。

最後に、前編で作ったストリームを組み合わせて実行してみよう。コードは以下のようになる。

stream :: Stream () Void Int
stream = sourceString |==| reverseString |==| waitString

result :: Int
result = runStream stream

まとめ

ストリームライブラリの実装について、特にわかりにくいと思われる部分を中心に概要を書いた。ストリームが再帰型による命令の列で表現されること、「出力」と「結果」の概念はきちんと区別すること、そして合成によってNotifyとWaitの命令が相殺されて消えることの3点がポイントとなる。各命令にモナドの値を含めることで、ストリームの表現力をさらに高められる。特にベースにIOのモナドが入っていれば、副作用のある処理をストリームの間で互いに連携しながら実行できるようになる。詳細は、前編でも紹介したPipes to Conduitsに書いてある。I/Oリソースの管理方法などもこちらが詳しい。

(蛇足)命令列とFreeモナド

今回のストリームのような命令列をdo記法で構成する手法は、Freeモナドによって抽象化できる。Freeモナドを使えば、Stream型は以下のように記述できる。

data StreamF i o next =
  Notify o next
  | Wait (i -> next)

instance Functor (StreamF i o) where
  fmap f (Notify x y) = Notify x (f y)
  fmap f (Wait c) = Wait (\x -> f (c x))

type Stream i o = Free (StreamF i o)

DoneはFree型が吸収してくれ、Free型のコンストラクタであるPureを代わりに使える。NotifyやWaitはFree型のコンストラクタであるFreeで包んで使う。Freeモナドを使った実装は、gistにupしてある。FreeモナドについてはWhy free monads matterが詳しい。

*1:わかりにくければ副作用と言ってもいいだろう

I/Oストリーミングライブラリの実装の基礎 - 前編

conduitpipesなどのストリーミングライブラリの実装は結構わかりにくい。Pipes to Conduitsという一連のエントリが分かりやすい解説なのだが、それでも序盤からFunctorやFreeモナドを駆使していてハードルが高めな印象を受ける。

理解するには自分で実装するのが一番の近道だろうから、このエントリでごく簡単なストリーミングライブラリを実装してみよう。ストリーミングライブラリではI/Oを扱うのが目的であるため、本来であればモナドを扱えるように実装しなければ意味がないのだが、ストリーミングの基本的な仕組みとしてはモナドは重要ではないので、ここでは純粋な値のみを扱うストリームを作成する。

ストリームを表す型

ここが一番重要かつわかりにくい部分だと思われる。今回の実装では、「入力」「出力」「結果」の3つの型をストリームで利用する。が、「出力」と「結果」の違いとはいったいなんなのか。

ストリームは、命令の列である再帰型で表現する。ちょうど無限リストを先頭から舐めていくのと同様のイメージで、この命令列を先頭から順次読み込み、それぞれの命令を解釈することで処理を進める。この命令列を全て完了した時に返すのが「結果」の型である。一方で、実装するストリームには2つの命令がある。1つは上流からのデータを受け取るWaitで、もう1つが下流へデータを流すNotifyである。この2つの命令で扱う型を、それぞれ「入力」「出力」の型と呼ぶ。

具体的には以下の型となる。命令列のDoneが終端であり、結果の型であるaを返す。Notify、Waitは命令列を形成するために、次の命令を表す自身の型、つまりStream i o a型の値を保持する必要がある。それとは別に、Waitは入力の型であるi、Notifyは出力の型であるoを扱っているのがわかるだろう。

data Stream i o a =
  Done a
  | Notify o (Stream i o a)
  | Wait (i -> (Stream i o a))

この型の宣言でもう一個わかりにくいのが、Waitが関数を持つことだろう。これは、各命令が引数と戻り値を持つことに関係する。

下流への通知を行う命令であるNotifyは、下流へ渡す値を引数にとるが、戻り値は必要ない。一方、上流から値を受け取る命令であるWaitは、引数は不要だが上流から渡された値を戻り値として受け取りたい。この、戻り値の受け取りが関数となって現れる。いわゆる、継続渡し方式での表現である。

例えばの話だが、Notifyには戻り値はないと書いたが、常に同じ値である()型の値を戻すと見なすこともできる。その場合には、Notifyの定義も継続渡しを用いて以下のようにすることもできる。が、面倒になるだけなので今回の実装では採用しない。

    Notify o (() -> Stream i o a)

ストリームを作る

ストリームは命令の列なので、WaitとNotifyを並べた列を作ることになる。例えば、以下のような入れ子の値ができるとよい。これは、上流から値が来るのを待ち(Wait)、下流へその値を流し(Notify)、その後結果として0を返すという命令の列を表す。なお、ここではまだそういう処理をしたいんだという意志を記述しただけで、実際にこれらの命令を実行する部分はまったく考えていない。

Wait (\x -> Notify x (Done 0))

さて、この程度であればコンストラクタを自分で書いて組み立てることができるが、大きな部品を作るにはこれでは不便過ぎる。Haskellで命令の列を記述すると言えばdo記法なので、これを利用しない手はない。ということで、Streamをモナドインスタンスにする。

instance Monad (Stream i o) where
  return = Done
  m >>= f = case m of
        Done x -> f x
        Notify x next -> Notify x (next >>= f)
        Wait cont -> Wait (\i -> (cont i) >>= f)

この定義でなぜ命令の列を作ることができるのか。これを見てギエーと思う方も居るかもしれないが、実はそんなに難しいことはしていない。アイデアとしては、命令列の末尾にあるはずのDoneを探しに行き、そいつを次の命令列で置換すれば、最初の命令列に次の命令列が連結されるだろうというものだ。WaitとNotifyに対しては、実は何もしていない。次の命令を取り出して、末端のDoneを探すために(>>=)を再帰的に呼び出しているだけである。実際に仕事をしているのはDoneを見つけた時で、これを次に続く命令列に置き換えている。前の命令の列の「結果」であるxは、次に続く命令列を生成するためのfに渡される。この「結果」により、後に続く命令列に影響を与えることができる。

もう一つ、命令列を組立てやすくするための下ごしらえをしておこう。コンストラクタであるWaitとNotifyをそのまま使ってもいいのだが、こいつをもうちょっとそれっぽくラップしておく。具体的に言うと、命令列の末尾にはDoneが必要なので、それを加えるためのラッパーとなる。

notify :: o -> Stream i o ()
notify x = Notify x (Done ())

wait :: Stream i o i
wait = Wait (\x -> Done x)

waitして得られた値は、「結果」として次の命令列に渡す必要がある。そのため、Doneに含める。一方、notifyの命令については、以降の命令に渡す必要がある情報はない。そこで、固定値として()をDoneに含めている。しつこく書いているので大丈夫かとは思うが、waitで得られた値は下流にいるストリームに渡す「出力」ではなく、以降の命令に渡す「結果」であることに注意しよう。

これらの道具さえあれば、命令の列を作るのは簡単だ。文字列を3つ下流へ送り出すストリームは以下のように書ける。

sourceString :: Stream () String Int
sourceString = do
  notify "Hogehoge"
  notify "BarBar"
  notify "Hehhe"
  return (-1)

処理中に上流から文字列を2つ受け、その長さの和を「結果」として返すストリームは以下のように書ける。Voidは値を一つも含まない型で、下流に何も送り出さないことを意味している。

data Void

waitString :: Stream String Void Int
waitString = do
  x <- wait
  y <- wait
  return (length x + length y)

上流から受けた文字列を逆転して下流に送るストリームは以下のようになる。foreverを使うことで、このストリームは命令の無限列となっている。そのため、「結果」の型はIntとなっているものの、いくら命令を先に進めても命令列の終端、つまりDoneを見つけることはできない。

reverseString :: Stream String String Int
reverseString = forever $ do
  x <- wait
  notify $ reverse x

後編の予告

上流からの値を待つこと、下流へ値を渡すこと、この2つの命令を並べたものとしてストリームを記述することができるようになった。後編では複数のストリームを考える。上流からの値を待つという命令と、下流へ値を渡す命令を持つ2つのストリームがあったとき、この2つのストリームを組み合わせて値の受け渡しを表現できそうな気がする。これを実現する。

さらに、ここまでは単なる命令の列であったストリームの実行部分を作り、ストリームライブラリを完成させる。

auで持ち込み機種変した

5Sを買ったので、妻の4Sを余ったiPhone 5を持ち込んで機種変更した。

4Sを買った時の毎月割は継続可能。その点も含めて、全体的にLTE関連のプランに変更が必要なものの、ショップの店員さん曰く、今までと月額の支払いは変わらないとのこと。ただし、プラン変更の影響で無料通話分はなくなった。後、事務手数料が3,000円ちょいとちょっとお高め。

自宅を光回線にした

スマートバリュー適用のためにeAccessのADSLauひかりにした。5Mbps程度だった回線速度が最大値で500Mbpsを記録するようになった。

始めUSENインターコネクトスピードテストを使ってたんだけど、これだと100Mbps以上の回線速度をうまく測れてない気がする。最終的にはRadish Network Speed Testingを使ったけど、こちらはappletが必要となる。

最高速度は有線の場合なので、普段は無線で50Mbps程度。これでも十分な速度なので満足。

今日はYAPC::Asia Tokyo 2013の2日目です

2日目です。地元開催なので、地の利を生かして朝からタリーズでのんびり朝食などとっております。今日もgihyo.jpさんのスペシャルレポートにレポーターとして参加しています。

続きを読む

YAPC::Asia Tokyo 2013 を終えて

今年もYAPCが終わった。スタッフを始めとした関係者の皆様には、本当にありがたいと思っている。また、gihyo.jpの担当の@k_takaさんには、今年も大変お世話になった。

さて、今年のYAPCに参加して感じたことが2つある。まず、世の中は自分が思った以上に進んでいて、他社もどんどん楽しそうなことをやっているのだなということ。Yasuhiro Onishiさんのはてなの開発フローのトークや、handlenameさんのLobiの開発フローのトークを聞いて、自分が最近業務に追われて思考停止してしまっていることに気がつけた。日々の開発で改善できることはいくらでもあるし、億劫がらずにもっと手を動かす必要がある。

もう1つは、Perlの閉塞感だ。座談会でRubyの利点として取り上げられていたコミュニティの活発さは、2、3年前まではPerlの利点として特筆されていた部分だ。Perlに対応していない開発者向けのサービスもこのYAPCだけで何個挙げられただろう。Perlに対する風評被害については長年いろいろ思うところが溜まってはいるが、たとえそれが風評による結果であろうともマイノリティが人手の多さという点でハンデとなるというのは紛れもない事実だ。人が多く集まる活発なコミュニティが形成されているのであれば、そちらから学べることの方が多くあるはずである。

ただ、だからと言ってPerlを使うことを辞める必要があるとは思わない。きちんと機能しているPerlで書かれたシステムは数多く存在しているし、「やり直したい症候群」がいい結果をもたらさないことは歴史が物語っている。それらの資源を今後も生かし続けるために、しっかりとアンテナを伸ばして世間の動向を把握し、必要なものを見つけたら怠けずにどんどん作っていくことが大事なことだろう。

Perlに縛られて視野が狭くならないように気をつけ、有益な情報は逃さないようにすべきであるというのが、自分の中での結論である。