Pixel Pedals of Tomakomai

北海道苫小牧市出身の初老の日常

Directing AE with Arrows

@maki_daisukeさんに教えてもらったのを読んで実装してみた。

use strict;
use warnings;

{
    package AsyncArrow;
    use Scalar::Util qw/weaken/;
    use AnyEvent;
    use Exporter qw/import/;
    use Class::Accessor::Lite new => 1, rw  => ['code'];

    our @EXPORT = qw/repeat done/;

    sub repeat_request(;$) { {repeat => 1, value => shift} }
    sub done_request(;$) { {done => 1, value => shift} }

    sub arr {
        my ($class, $f) = @_;
        $class->new(code => sub {
            my ($v, $progress, $cont) = @_;
            $cont->($f->($v), $progress);
        });
    }

    sub compose {
        my ($self, $other) = @_;
        return (ref $self)->new(code => sub {
            my ($v, $progress, $cont) = @_;
            $self->code->($v, $progress, sub {
                my ($v, $progress) = @_;
                $other->code->($v, $progress, $cont);
            });
        });
    }

    sub run {
        my ($self, $v, $progress) = @_;
        $progress ||= ProgressArrow->new_without_args;

        $self->code->($v, $progress, sub {
            my ($v, $progress) = @_;
            return;
        });

        return $progress;
    }

    sub repeat {
        my $self = shift;

        my $weaken_loop;
        my $loop = sub {
            my ($v, $progress, $cont) = @_;
            $self->code->($v, $progress, sub {
                my ($v, $progress) = @_;
                if ($v->{repeat}) {
                    my $canceller;
                    my $t = AE::timer 0, 0, sub {
                        $progress->advance($canceller);
                        $weaken_loop->($v->{value}, $progress, $cont);
                    };
                    $progress->add_canceller($canceller = sub { undef $t });
                } elsif ($v->{done}) {
                    $cont->($v->{value}, $progress);
                } else { die }
            });
        };
        weaken($weaken_loop = $loop);

        (ref $self)->new(code => $loop);
    }
}


{
    package ProgressArrow;
    use parent qw/-norequire AsyncArrow/;
    use Scalar::Util qw/weaken/;
    use Class::Accessor::Lite rw => ['cancellers', 'observers'];

    sub new_without_args {
        my $class = shift;
        my $self = $class->new(
            cancellers => [], 
            observers => [], 
        );

        weaken(my $weaken_self = $self);
        $self->code(sub {
            my ($v, $progress, $cont) = @_;
            push @{$weaken_self->observers}, sub {
                my ($v) = @_;
                $cont->($v, $progress);
            };
        });

        return $self;
    }

    sub add_canceller {
        my $self = shift;
        push @{$self->cancellers}, @_;
    }

    sub advance {
        my ($self, $canceller) = @_;
        @{$self->cancellers} = grep {$canceller != $_} @{$self->cancellers};
        while (my $observer = shift @{$self->observers}) {
            $observer->();
        }
    }

    sub cancel {
        my ($self, $canceller) = @_;
        while (my $cancel = shift @{$self->cancellers}) {
            $cancel->();
        }
    }
}


use AnyEvent;
use AnyEvent::Handle;

sub inputted_line() {
    AsyncArrow->new(code => sub {
        my ($v, $progress, $cont) = @_;

        my $canceller;
        my $hdl = AnyEvent::Handle->new(
            fh => \*STDIN,
            on_error => sub {
                my ($hdl, $fatal, $message) = @_;
                $hdl->destroy;
                $! and warn "$message(fatal=$fatal)";
                $cont->();
            },
            on_read => sub {
                my ($hdl) = @_;
                $hdl->push_read(line => sub {
                    my ($hdl, $line, $eol) = @_;
                    $progress->advance($canceller);
                    $canceller->();
                    $cont->($line, $progress);
                });
            },
        );

        $progress->add_canceller($canceller = sub { undef $hdl });
    });
}


my $cv = AE::cv;

my $total_length = 0;
my $p = inputted_line->compose(AsyncArrow->arr(sub {
    if (defined $_[0]) {
        $total_length += length $_[0];
        print "INPUT: $_[0]\n";

        AsyncArrow::repeat_request;
    } else {
        print "DONE\n";
        AsyncArrow::done_request;
    }
}))->repeat
   ->compose(AsyncArrow->arr(sub { $cv->send }))
   ->run;

$p->compose(AsyncArrow->arr(sub {
    print "total length: $total_length\n"
}))->compose(AsyncArrow->arr(\&AsyncArrow::repeat_request))
   ->repeat
   ->run;

$cv->recv;

ArrowsはGeneralising Monads to Arrowsモナドの一般化として載ってます。ざっくり言えば、高階関数の圏の直積を保存するように作った、新たな圏への関手。


で、このJSの実装なんだけど、CpsA の実装までは美しいのに、 AsyncA の実装がイマイチな気がするんですよね。特に ProgressA って、状態をバリバリに持ってるSubject的なクラスなのに、AsyncA を継承して作る利点がイマイチわからず。