Pixel Pedals of Tomakomai

北海道苫小牧市出身の初老の日常

POEでmapを並行化してみる(2)

昨日はシングルプロセスで並行mapを作りましたが、今日はforkさせてみます。

細分化できない重い処理

昨日はsleepを使っているサブルーチンをsleepの前後で二つのeventに分けて、代わりにdelayで遅延を起こさせましたが、今日は細分できないような場合をやってみます。

	MyPmap->new(
		cont   => [$self->get_session_id, 'recieve'],
		filter => sub { sleep 1; $_[0] * 2 },
	)->yield('exec', 1 .. 10);

昨日までfilterには変換を行うPOE::Session名を指定してましたが、代わりに遅いサブルーチンを渡すようにしました。シングルプロセスで処理すると、この処理は10秒かかってしまいます。

forkしたバージョン

書き換えてみました。

fork_pmap.pl

MySessionとMyPmapはほとんど書き換えておらず、POE::Sessionの連携の仕方は昨日と同等です。

POE::Wheel::Runでforkさせる

MyPmap::Filter(旧MyFilter)は、先ほどの1秒かかる関数を受け取って実行し、contプロパティで指定された宛先に結果を送信しなければなりません。ここでこの処理を普通に実行するとブロックしてしまって他の処理をストップさせてしまうため、forkして子プロセスに処理をやらせて親プロセスは処理を継続させるようにします。

forkにはPOE::Wheel::Runが使えます。

# package MyPmap::Filter;

has child => (
	isa => 'Maybe[POE::Wheel::Run]',
	is  => 'rw',
);

event exec => sub {
	my $self = shift;
	my ($val) = @_;
	my $filter = $self->filter;  # for closure
	my $task = POE::Wheel::Run->new(
		Program => sub { print $filter->($val), "\n"; },
		StdoutEvent => "stdout",
		StderrEvent => "stderr",
		CloseEvent  => "closed",
	);

	$self->child( $task );
	$POE::Kernel::poe_kernel->sig_child( $task->PID, "reaped" );

};

event stdout => sub {
	my $self = shift;
	my ($val) = @_;
	$POE::Kernel::poe_kernel->post(
		@{ $self->cont } =>
		$val
	);
};

POE::Wheel::RunのProgramにコードリファレンスを渡すと、勝手にforkしてくれます。実行結果はSTDOUT経由で親プロセスに戻すことになるので、コードリファレンス内で戻り値をprintしてやればOKです。

POE::Wheel::Runを使うのに注意点が二つあります。

まず、処理が終わるまでPOE::Wheel::Runのインスタンスを保持してないと、子プロセスが終わってしまって結果を受け取れなくなるので、必ずインスタンスを保持します。保持したインスタンスは、CloseEventで解放するといいです。

# package MyPmap::Filter;

event closed => sub{ 
	my $self = shift;
	$self->child( undef );
};

もう一つ、POE::Kernel->sig_child を呼んでおいて、子プロセスが自動で刈り取られるようにすることです。reapedイベントでは、特に何もする必要はありません(イベント自体もなくてもいい)。

# package MyPmap::Filter;

event reaped => sub{ 
	# NOP
};

注意

この実装ではプロセスの数を制限してないので、要素数の多い配列を食わせるとプロセスの作り過ぎでdieしてしまいます。実際に利用する場合はプロセスの上限を設けて、必要以上に生成しないように制御が必要です。

参考文献