Pixel Pedals of Tomakomai

北海道苫小牧市出身の初老の日常

Tatsumaki でブラウザベースの Twitter Streaming

それってHamakiなんですが、試しに作ってみたので一応。ただし、Tatsumakiは、

This is considered as alpha quality software. Most of the stuff are undocumented since it's considered unstable and will likely to change. You should sometimes look at the source code or example apps in eg directory to see how this thing works.

Tatsumaki

なので自己責任で:-p 後、このエントリより eg/chat を見た方が勉強になりますw

Tatsumaki概要

TatsumakiPlackAnyEventをベースにしたTornadoっぽくWEBアプリを作れるフレームワークです。PSGIに対応しているので、psgi.streamingやらpsgi.nonblockingを実装しているPSGIサーバで動きます。現状ではPlack::Server::AnyEventが一番適してると思います。

使うもの

* Tatsumaki::Handler : 継承して各アクションを実装
* Tatsumaki::Application : ルーティングとかの面倒を見てくれる
* Tatsumaki::MessageQueue : comet や mxhr に適したキュー

後、 AnyEvent::Twitter::Stream とか。

ルーティング

Tatsumaki::Application のコンストラクタに正規表現とハンドラ名を指定してルーチングできます。テンプレと静的ファイルの置き場もTatsumaki::Applicationで指定できます。

package main;
use Tatsumaki::Application;

my $app = Tatsumaki::Application->new([
    qr'/(\d+)'      => 'HtmlHandler',
    qr'/poll/(\d+)' => 'TweetHandler',
]);

$app->template_path( "template" );
$app->static_path( "static" );

return $app;

HTMLのハンドラ

Tatsumaki::Handler を継承して get とか post とか適するメソッド名で実装します。後は render メソッドにテンプレ名を渡すだけです。テンプレに渡す引数も渡したければ渡せます。

package HtmlHandler;
use Moose;

extends 'Tatsumaki::Handler';

sub get {
	my $self = shift;
	my ( $uid ) = @_;
	$self->render( 'index.html' );
}

no  Moose;

テンプレはText::MicroTemplate形式です。cometにはjquery.ev.jsがあると便利なので、staticディレクトリに置きました。

また、handlerにHandlerオブジェクトが入ってて、こいつはPATH_INFOをargsに持ってるので使ってます。

% my $uid = $_[0]->{handler}->args->[0];
<html>

<head>
<title>Tweets of <%= $uid %></title>
<script type="text/javascript" src="static/jquery-1.3.2.min.js"></script>
<script type="text/javascript" src="static/jquery.ev.js"></script>
<script type="text/javascript">
jQuery( function ($) {
    $.ev.loop('/poll/' + <%= $uid %> + '?session=' + Math.random(), {
        tweet : function(ev) {
            if(! ev.tweet.user || ! ev.tweet.text) return;
            $( "#tweets" ).prepend( 
                ev.tweet.user.screen_name + ": " + 
                ev.tweet.text + "<br>" 
            );
        },
        message : function(ev) {
            $( "#tweets" ).prepend( 
                '<span style="color: #F00;">' + ev.text + "</span><br>" 
            );
        }
    });
} );
</script>
</head>

<body>
<h1>Tweets of <%= $uid %></h1>

<div id="tweets"></div>

</body>
</html>

Ajaxの部分

ここではnon-blockingなレスポンスを実現するために asynchronous を有効にします。こうすると ハンドラを抜けてもブラウザにレスポンスが返らなくなるので、comet の場合は適当なタイミングで $self->finish を呼んでレスポンスを返します。*1

ブラウザに送信するデータのキューとして、Tatsumaki::MessageQueueを使うと便利です。キューには channel によって分けることができます。また、ブラウザはキューからデータを取得するのに、どこまでデータをもらっているかを保持するために session というクライアントごとのID値が必要です。

今回の例では、初アクセスの時は AnyEvent::Twitter::Stream をセットアップしてます。で、tweetがある度に publish でキューにデータを押し込んでます。取得側は poll_once でキューから適切にデータを拾い、write でJSONとしてブラウザに返してます。

package TweetHandler;
use Moose;
use Config::Pit;
use AnyEvent::Twitter::Stream;
use Tatsumaki::MessageQueue;
use Tatsumaki::Error;

extends 'Tatsumaki::Handler';
__PACKAGE__->asynchronous(1);

my ($username, $password) = do {
	@{ Config::Pit::get( 'twitter.com', require => {
		'username' => 'your username on twitter',
		'password' => 'your password on twitter'
	} ) }{ qw/username password/ };
};

my %streams;

sub create_stream {
	my $self = shift;
	my ( $uid ) = @_;

	my $mq = Tatsumaki::MessageQueue->instance( $uid );

	$streams{ $uid } ||= AnyEvent::Twitter::Stream->new(
		username => $username,
		password => $password,
		method   => 'filter',
		follow   => $uid,
		on_tweet => sub {
			my $tweet = shift;
			$mq->publish( { type => 'tweet', tweet => $tweet, } );
		},
		on_error => sub {
			my $error = join ',', @_;
			$mq->publish( { type => 'message', text => $error, } );
			delete $streams{ $uid };
		},
		on_eof   => sub {
			$mq->publish( { type => 'message', text => 'disconnected', } );
			delete $streams{ $uid };
		},
	);
}


sub get {
	my $self = shift;
	my ( $uid ) = @_;

	my $session = $self->request->param('session')
	                 or Tatsumaki::Error::HTTP->throw(500, "'session' needed");

	$streams{ $uid } or $self->create_stream( $uid );
	my $mq = Tatsumaki::MessageQueue->instance( $uid );
	$mq->poll_once( $session, sub {
		my @events_published = @_;
		$self->write( \@events_published );
		$self->finish;
	} );
}

no  Moose;

起動

Tatsumaki の podにあるようにplackupで起動可能です。app.psgiのあるディレクトリで、 plackup -s AnyEvent を実行して下さい。 その後、 http://localhost:5000/"ユーザID" へアクセスすれば、各ユーザのtweetをリアルタイムで見れます。

*1:mxhrの時は multipart_xhr_push を有効にして stream_write でデータを送る。