北海道苫小牧市出身の初老PGが書くブログ

永遠のプログラマを夢見る、苫小牧市出身のおじさんのちらしの裏

POEでmapを並行化してみる

POEの練習代わりに、POEを使ってmap関数を並行化してみました。今回は MooseX::POE を使っています。

逐次処理バージョン

POEで逐次処理版のmap処理を書いてみると、こんな感じになりました。

package MySession;
use MooseX::POE;

sub transform {
	sleep 1;
	return (shift) * 2;
};

sub START{
	my $self = shift;
	my @odds = map { transform($_) } 1 .. 10;
	print join(",", @odds), "\n";
	print "done\n";
}

no MooseX::POE;

package main;
use strict;
use warnings;
use POE;
MySession->new;
POE::Kernel->run;

念のため簡単な解説をしておくと、MySessionがMooseX::POEを使って書いたPOE::Sessionクラスです。POE::Kernelをrunすると、各POE::Sessionの_start状態(START関数)から処理が始まります。

このコードではビルトインのmapを使って10個の要素を処理してますが、transform関数が1秒かかる処理なのでこの処理の終了には10秒かかります。

それでは、これを並行処理する自前のPmapを作っててみましょう。

並行処理バージョン

書き換えたらこうなりました。

コードだけでは何なんで、一応解説してみます。

サブルーチン ではなく event を使う

POE内では、サブルーチン(sub {})は並行化の単位となりません。eventの起動が並行化の単位となりますので、なるべくeventで処理を書きます。

eventは、yeildやpostやdelay、alermなどのコマンドで呼び出します。これは、「オブジェクト(POE::Session)に対するメソッド(event)呼び出し」と見なすと理解しやすいです。ただし、この呼び出しでは戻り値を得ることができません。戻り値を得るためには継続(≒callback)を渡すように設計する必要があります。

なお、eventは歴史的な経緯からstateと呼ばれることが多いので覚えておくと混乱がないと思います。

ブロックする処理は書かない

実は速度的な話だけを言えば、この項目だけで解決されてしまいます。

POEはシングルスレッドで動作するため、ブロックされる処理があるとパイプラインが詰まってしまいます。具体的には、今回の例では sleep がNGです。代わりに後続の処理を別のeventにし、delayでそれらの処理が始まるのを遅らせてやります。*1

# package MyFilter;

event exec => sub {
	my $self = shift;
	my ($val) = @_;

	# 1秒後に続きの処理を呼ぶ
	$POE::Kernel::poe_kernel->delay_add(
		'exec_after' => 1 =>
		$val
	);
};

event exec_after => sub {
	my $self = shift;
	my ($val) = @_;
	# ... 後略(続きの処理) ...
};

sleepだけでなく、通信系の処理もPOEに準拠している物(POE::Wheel::*)を使わないとブロックしてしまいます。勝手に通信処理を書かないように気をつけたほうがいいです。

複数のPOE::Sessionへ分離

POEではPOE::Sessionごとにメモリを共有することになってますので、並行処理させるためにPOE::Sessionを分離させました。

POE::Sessionは、親が必要になった時点で作成します。例えば、MySessionがmap処理を行いたくなった時点でMyPmapを作成し、execイベントを投げるようなイメージです。execが終われば、POE::Sessionは消滅します。

MySession - MyPmap - MyFilter
                   + MyFilter
                   |   ....
                   + MyFilter

eventの呼び出し順序は、以下のようになります。

  1. MySession ---exec--> MyPmap
  2. MyPmap ---exec--> MyFilter
  3. 2を複数回投げる
  4. MyFilter -recieve-> MyPmap
  5. 4を複数回受け取る
  6. MyPmap -recieve-> MySession

ただし、以下の理由からeventさえ細かく分離すればPOE::Sessionの分離は不要かもしれません。

  • 現状のPOEの実装はマルチスレッドじゃないので、メモリを共有してても害はない
  • POE::Sessionの生成と消滅のコストが高い(多分)

継続

MyPmapやMyFilterは、処理が終わったらどこかにその結果をレポートしなければなりません。今回はこれをcontプロパティとしてPOE::Session作成時に指定しています。

# package MyPmap;

has cont => (
	isa => 'ArrayRef',
	is  => 'ro',
	required => 1,
);

POEでは POE::SessionのセッションID(sid)とevent名がわかればイベントを投げれますので、それを無名配列に並べたものを継続として使っています。

# package MySession;

sub START{
	my $self = shift;
	MyPmap->new(
		cont   => [$self->get_session_id, 'recieve'],
		filter => 'MyFilter',
	)->yield('exec', 1 .. 10);
}

execイベントを受け取ったPOE::Sessionは、contプロパティで指定された先に結果を送り込む、と言うルールです。この「contプロパティで指定された先に結果を送り込む」と言う行為が、通常の関数の return の処理に当たります。

# package MyPmap;

event recieve => sub {
	# ... 前略 ...
		$POE::Kernel::poe_kernel->post(
			@{ $self->cont } =>
			@{ $self->result }
		);
	# ... 後略 ...
};

なお、POEでは$_[SENDER]と$_[CALLER_STATE]が入ってますので、宛先にこれらの値を使うと言う手もあります。今回は自由にreturn先を決められるようにsidもeventも外から指定するようにしました。

並行処理された結果をまとめる

MyPmapからMyFilterが複数立ち上げられますが、MyPmapはこれを一つにまとめてMySessionに返す必要があります。幸い、MyPmapは立ち上げたMyFilterの個数を知ってますので、recieveの時に戻って来たMyFilterの結果を数え、依頼した分が全部戻って来たらMySessionに結果を返します。

# package MyPmap;

event exec => sub {
	my $self = shift;
	my ( @args ) = @_;

	# MyFilter の数を記録して...
	$self->count(scalar @args);

	# ...後略(MyFilterの立ち上げ処理)... 
};

event recieve => sub {
	my $self = shift;
	my ($ret) = @_;

	# 結果が来る度にカウントを減らして・・・
	$self->count($self->count - 1);
	push @{ $self->result }, $ret;

	if($self->count == 0){
		# カウントが0になったら発火
		$POE::Kernel::poe_kernel->post(
			@{ $self->cont } =>
			@{ $self->result }
		);
	}
};

ただし、今回の実装は実はmapとしては不完全です。なぜなら、MyFilterからの結果の返却が、依頼した順に返ってくる保証はないからです。よって、1,2,3 をこのmapで処理すると 2,4,6じゃなく 2,6,4や4,2,6 のようになる可能性があります。これをきちんとするには、依頼したMyFilterのsidを控えておき、receiveされた時に$_[SENDER]のsidを元に並び替えることでできます。

参考文献

*1:call/ccがあれば一つの関数で書けそう。POE::Session::YieldCCなんてあるのだが未検証。