@maki_daisukeさんに教えてもらったのを読んで実装してみた。
use strict; use warnings; { package AsyncArrow; use Scalar::Util qw/weaken/; use AnyEvent; use Exporter qw/import/; use Class::Accessor::Lite new => 1, rw => ['code']; our @EXPORT = qw/repeat done/; sub repeat_request(;$) { {repeat => 1, value => shift} } sub done_request(;$) { {done => 1, value => shift} } sub arr { my ($class, $f) = @_; $class->new(code => sub { my ($v, $progress, $cont) = @_; $cont->($f->($v), $progress); }); } sub compose { my ($self, $other) = @_; return (ref $self)->new(code => sub { my ($v, $progress, $cont) = @_; $self->code->($v, $progress, sub { my ($v, $progress) = @_; $other->code->($v, $progress, $cont); }); }); } sub run { my ($self, $v, $progress) = @_; $progress ||= ProgressArrow->new_without_args; $self->code->($v, $progress, sub { my ($v, $progress) = @_; return; }); return $progress; } sub repeat { my $self = shift; my $weaken_loop; my $loop = sub { my ($v, $progress, $cont) = @_; $self->code->($v, $progress, sub { my ($v, $progress) = @_; if ($v->{repeat}) { my $canceller; my $t = AE::timer 0, 0, sub { $progress->advance($canceller); $weaken_loop->($v->{value}, $progress, $cont); }; $progress->add_canceller($canceller = sub { undef $t }); } elsif ($v->{done}) { $cont->($v->{value}, $progress); } else { die } }); }; weaken($weaken_loop = $loop); (ref $self)->new(code => $loop); } } { package ProgressArrow; use parent qw/-norequire AsyncArrow/; use Scalar::Util qw/weaken/; use Class::Accessor::Lite rw => ['cancellers', 'observers']; sub new_without_args { my $class = shift; my $self = $class->new( cancellers => [], observers => [], ); weaken(my $weaken_self = $self); $self->code(sub { my ($v, $progress, $cont) = @_; push @{$weaken_self->observers}, sub { my ($v) = @_; $cont->($v, $progress); }; }); return $self; } sub add_canceller { my $self = shift; push @{$self->cancellers}, @_; } sub advance { my ($self, $canceller) = @_; @{$self->cancellers} = grep {$canceller != $_} @{$self->cancellers}; while (my $observer = shift @{$self->observers}) { $observer->(); } } sub cancel { my ($self, $canceller) = @_; while (my $cancel = shift @{$self->cancellers}) { $cancel->(); } } } use AnyEvent; use AnyEvent::Handle; sub inputted_line() { AsyncArrow->new(code => sub { my ($v, $progress, $cont) = @_; my $canceller; my $hdl = AnyEvent::Handle->new( fh => \*STDIN, on_error => sub { my ($hdl, $fatal, $message) = @_; $hdl->destroy; $! and warn "$message(fatal=$fatal)"; $cont->(); }, on_read => sub { my ($hdl) = @_; $hdl->push_read(line => sub { my ($hdl, $line, $eol) = @_; $progress->advance($canceller); $canceller->(); $cont->($line, $progress); }); }, ); $progress->add_canceller($canceller = sub { undef $hdl }); }); } my $cv = AE::cv; my $total_length = 0; my $p = inputted_line->compose(AsyncArrow->arr(sub { if (defined $_[0]) { $total_length += length $_[0]; print "INPUT: $_[0]\n"; AsyncArrow::repeat_request; } else { print "DONE\n"; AsyncArrow::done_request; } }))->repeat ->compose(AsyncArrow->arr(sub { $cv->send })) ->run; $p->compose(AsyncArrow->arr(sub { print "total length: $total_length\n" }))->compose(AsyncArrow->arr(\&AsyncArrow::repeat_request)) ->repeat ->run; $cv->recv;
ArrowsはGeneralising Monads to Arrowsにモナドの一般化として載ってます。ざっくり言えば、高階関数の圏の直積を保存するように作った、新たな圏への関手。
で、このJSの実装なんだけど、CpsA の実装までは美しいのに、 AsyncA の実装がイマイチな気がするんですよね。特に ProgressA って、状態をバリバリに持ってるSubject的なクラスなのに、AsyncA を継承して作る利点がイマイチわからず。