POEの練習代わりに、POEを使ってmap関数を並行化してみました。今回は MooseX::POE を使っています。
逐次処理バージョン
POEで逐次処理版のmap処理を書いてみると、こんな感じになりました。
package MySession; use MooseX::POE; sub transform { sleep 1; return (shift) * 2; }; sub START{ my $self = shift; my @odds = map { transform($_) } 1 .. 10; print join(",", @odds), "\n"; print "done\n"; } no MooseX::POE; package main; use strict; use warnings; use POE; MySession->new; POE::Kernel->run;
念のため簡単な解説をしておくと、MySessionがMooseX::POEを使って書いたPOE::Sessionクラスです。POE::Kernelをrunすると、各POE::Sessionの_start状態(START関数)から処理が始まります。
このコードではビルトインのmapを使って10個の要素を処理してますが、transform関数が1秒かかる処理なのでこの処理の終了には10秒かかります。
それでは、これを並行処理する自前のPmapを作っててみましょう。
サブルーチン ではなく event を使う
POE内では、サブルーチン(sub {})は並行化の単位となりません。eventの起動が並行化の単位となりますので、なるべくeventで処理を書きます。
eventは、yeildやpostやdelay、alermなどのコマンドで呼び出します。これは、「オブジェクト(POE::Session)に対するメソッド(event)呼び出し」と見なすと理解しやすいです。ただし、この呼び出しでは戻り値を得ることができません。戻り値を得るためには継続(≒callback)を渡すように設計する必要があります。
なお、eventは歴史的な経緯からstateと呼ばれることが多いので覚えておくと混乱がないと思います。
ブロックする処理は書かない
実は速度的な話だけを言えば、この項目だけで解決されてしまいます。
POEはシングルスレッドで動作するため、ブロックされる処理があるとパイプラインが詰まってしまいます。具体的には、今回の例では sleep がNGです。代わりに後続の処理を別のeventにし、delayでそれらの処理が始まるのを遅らせてやります。*1
# package MyFilter; event exec => sub { my $self = shift; my ($val) = @_; # 1秒後に続きの処理を呼ぶ $POE::Kernel::poe_kernel->delay_add( 'exec_after' => 1 => $val ); }; event exec_after => sub { my $self = shift; my ($val) = @_; # ... 後略(続きの処理) ... };
sleepだけでなく、通信系の処理もPOEに準拠している物(POE::Wheel::*)を使わないとブロックしてしまいます。勝手に通信処理を書かないように気をつけたほうがいいです。
複数のPOE::Sessionへ分離
POEではPOE::Sessionごとにメモリを共有することになってますので、並行処理させるためにPOE::Sessionを分離させました。
POE::Sessionは、親が必要になった時点で作成します。例えば、MySessionがmap処理を行いたくなった時点でMyPmapを作成し、execイベントを投げるようなイメージです。execが終われば、POE::Sessionは消滅します。
MySession - MyPmap - MyFilter + MyFilter | .... + MyFilter
eventの呼び出し順序は、以下のようになります。
- MySession ---exec--> MyPmap
- MyPmap ---exec--> MyFilter
- 2を複数回投げる
- MyFilter -recieve-> MyPmap
- 4を複数回受け取る
- MyPmap -recieve-> MySession
ただし、以下の理由からeventさえ細かく分離すればPOE::Sessionの分離は不要かもしれません。
- 現状のPOEの実装はマルチスレッドじゃないので、メモリを共有してても害はない
- POE::Sessionの生成と消滅のコストが高い(多分)
継続
MyPmapやMyFilterは、処理が終わったらどこかにその結果をレポートしなければなりません。今回はこれをcontプロパティとしてPOE::Session作成時に指定しています。
# package MyPmap; has cont => ( isa => 'ArrayRef', is => 'ro', required => 1, );
POEでは POE::SessionのセッションID(sid)とevent名がわかればイベントを投げれますので、それを無名配列に並べたものを継続として使っています。
# package MySession; sub START{ my $self = shift; MyPmap->new( cont => [$self->get_session_id, 'recieve'], filter => 'MyFilter', )->yield('exec', 1 .. 10); }
execイベントを受け取ったPOE::Sessionは、contプロパティで指定された先に結果を送り込む、と言うルールです。この「contプロパティで指定された先に結果を送り込む」と言う行為が、通常の関数の return の処理に当たります。
# package MyPmap; event recieve => sub { # ... 前略 ... $POE::Kernel::poe_kernel->post( @{ $self->cont } => @{ $self->result } ); # ... 後略 ... };
なお、POEでは$_[SENDER]と$_[CALLER_STATE]が入ってますので、宛先にこれらの値を使うと言う手もあります。今回は自由にreturn先を決められるようにsidもeventも外から指定するようにしました。
並行処理された結果をまとめる
MyPmapからMyFilterが複数立ち上げられますが、MyPmapはこれを一つにまとめてMySessionに返す必要があります。幸い、MyPmapは立ち上げたMyFilterの個数を知ってますので、recieveの時に戻って来たMyFilterの結果を数え、依頼した分が全部戻って来たらMySessionに結果を返します。
# package MyPmap; event exec => sub { my $self = shift; my ( @args ) = @_; # MyFilter の数を記録して... $self->count(scalar @args); # ...後略(MyFilterの立ち上げ処理)... }; event recieve => sub { my $self = shift; my ($ret) = @_; # 結果が来る度にカウントを減らして・・・ $self->count($self->count - 1); push @{ $self->result }, $ret; if($self->count == 0){ # カウントが0になったら発火 $POE::Kernel::poe_kernel->post( @{ $self->cont } => @{ $self->result } ); } };
ただし、今回の実装は実はmapとしては不完全です。なぜなら、MyFilterからの結果の返却が、依頼した順に返ってくる保証はないからです。よって、1,2,3 をこのmapで処理すると 2,4,6じゃなく 2,6,4や4,2,6 のようになる可能性があります。これをきちんとするには、依頼したMyFilterのsidを控えておき、receiveされた時に$_[SENDER]のsidを元に並び替えることでできます。
参考文献
- POE
- POE Cookbook
- プログラミングErlang
- 20.2 逐次コードを並列化する
- プログラミングGauche
- 19章 継続
*1:call/ccがあれば一つの関数で書けそう。POE::Session::YieldCCなんてあるのだが未検証。