読者です 読者をやめる 読者になる 読者になる

北海道苫小牧市出身の初老PGが書くブログ

永遠のプログラマを夢見る、苫小牧市出身のおじさんのちらしの裏

Coroでスレッドプールを使う

perl

Coroでスレッドプールを使うにはasync_poolを使えばいいはずなのですが、何も考えずに使うとうまくいかない場合があります。

最初にまとめ

長いのでまずまとめときます。まとめると単純な話で、「async_poolを使う時には、同時にたくさんのスレッドを利用し過ぎないように気をつける」の一点に尽きます。

ベンチマークをとってみる

asyncとasync_pool でベンチマークをとってみると、以下のようになります。

my $tasks = 100;
Benchmark::cmpthese 1000, {
	async => sub {
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			async { $sem->up };
		}
		$sem->down;
	},
	async_pool => sub {
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			async_pool { $sem->up };
		}
		$sem->down;
	},
};

__END__
[結果]
            Rate async_pool      async
async_pool 565/s         --       -17%
async      680/s        20%         --

直感に反して、async_poolの方が遅いようです。メモリの使用量も比べてみます。

my $sem = Coro::Semaphore->new( 1 - 10000);
for( 1 .. 10000 ){
	async { $sem->up };
};
$sem->down;

warn `ps -o rss= -p $$`;
# 結果: 9240
my $sem = Coro::Semaphore->new( 1 - 10000);
for( 1 .. 10000 ){
	async_pool { $sem->up };
};
$sem->down;

warn `ps -o rss= -p $$`;
# 結果: 9964

あまり変わりません。

これらの現象は、async_poolのスレッドプールに上限がないことに起因しています。今回の例では、async_poolを使ってもスレッドが1000個作られています。よって、プール内のスレッドが再利用されることもないし、メモリが節約されることもありません。

$Coro::POOL_SIZEの役割

$Coro::POOL_SIZEはスレッドプールのサイズを変更するための変数ですが、これはidle状態のスレッドの数を意味します。先ほども書いたように処理中のスレッドも含めた上限ではありません*1

よって、作成されるスレッドの数を調整したければ、Semaphore等を利用して自前で数を調整しなければなりません。

my $limit = Coro::Semaphore->new( $Coro::POOL_SIZE );
my $sem = Coro::Semaphore->new( 1 - 10000);
for( 1 .. 10000 ){
	$limit->down;
	async_pool { $sem->up; $limit->up; };
};
$sem->down;

warn `ps -o rss= -p $$`;
# 結果: 2280

使用されるメモリが先ほどより減りました。また、このバージョンでもう一度速度を比べてみましょう。

my $tasks = 100;
Benchmark::cmpthese 1000, {
	async => sub {
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			async { $sem->up };
		}
		$sem->down;
	},
	async_pool_limited => sub {
		my $limit = Coro::Semaphore->new( $Coro::POOL_SIZE );
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			$limit->down;
			async_pool { $sem->up; $limit->up; };
		}
		$sem->down;
	},
};

__END__
[結果]
                     Rate              async async_pool_limited
async               685/s                 --               -55%
async_pool_limited 1538/s               125%                 --

スレッドを生成しなくて済むようになったので、倍近く速くなっています。「this function is about twice as fast as creating (and destroying) a completely new coro」とperldocに書かれている通りの性能です。

ただし、これはあくまでもタスクの立ち上げが速くなっているというだけで、待ち時間のことを考えるとスレッドをたくさん上げた方が速くなることは十分考えられます。例えば、Coro::Timer::sleep を入れるだけでベンチマークは逆転します。

my $tasks = 100;
Benchmark::cmpthese 1000, {
	async => sub {
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			async { Coro::Timer::sleep 0.001; $sem->up; };
		}
		$sem->down;
	},
	async_pool_limited => sub {
		my $limit = Coro::Semaphore->new( $Coro::POOL_SIZE );
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			$limit->down;
			async_pool { Coro::Timer::sleep 0.001; $sem->up; $limit->up; };
		}
		$sem->down;
	},
};

__END__
[結果]
                    Rate async_pool_limited              async
async_pool_limited 292/s                 --               -25%
async              388/s                33%                 --

ここで、CPUの待ち時間がなくなるように、$Coro::POOL_SIZEを調整して同時に走るスレッドを調整すれば、速くなります。例えば、$Coro::POOL_SIZE = 50にすると*2ベンチマークは以下のようになります。

                    Rate              async async_pool_limited
async              388/s                 --               -37%
async_pool_limited 617/s                59%                 --

自前のキューを作る

async_poolは、スレッドを作り過ぎないように気をつけて使うと便利かつ効果的なのですが、async_poolが使うスレッドプールはプログラム全体で共有だという問題点があります。つまり、自分が $Coro::POOL_SIZE を超えないようにスレッドを作っていても、他の誰かがマナー違反をしていると恩恵にあずかることができません。

そこで、以下のような自前のキューを用意して生成コストを抑えるという手も考えられます。

my $size = 10;
my $que  = Coro::Channel->new( $size );
async { $que->get->() while 1; } for 1 .. $size;

sub my_async_pool(&) { $que->put( +shift ); }

この my_async_pool を使えば、async_poolと同様にスレッドの生成コストや、一度に消費するメモリの量を押さえることができます。

my $tasks = 100;
Benchmark::cmpthese 1000, {
	async => sub {
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			async { Coro::Timer::sleep 0.001; $sem->up; };
		}
		$sem->down;
	},
	my_async_pool => sub {
		my $sem = Coro::Semaphore->new( 1 - $tasks );
		for(1 .. $tasks){
			my_async_pool { Coro::Timer::sleep 0.001; $sem->up; };
		}
		$sem->down;
	},
};

__END__
[結果]
               Rate         async my_async_pool
async         388/s            --          -34%
my_async_pool 592/s           53%            --
my $sem = Coro::Semaphore->new( 1 - 10000);
for( 1 .. 10000 ){
	my_async_pool { $sem->up; };
};
$sem->down;

warn `ps -o rss= -p $$`;
# [結果] 2304

ただし、この実装ではスレッドの生死*3の監視や環境の初期化*4をしていないので、汎用的に使う場合にはそれらも考慮する必要があります。

*1:スレッドプールの枯渇によるデッドロックが起きないようにするためには、適切な仕様だと思う。

*2:デフォルトは8です

*3:勝手にterminateされるとワーカーが減っていく

*4:スレッドに設定したdescやprioが残る