root/trunk/plagger/lib/Plagger/Plugin/Aggregator/Async.pm

Revision 1941 (checked in by kazeburo, 3 years ago)

added Aggregator::Sync

Line 
1 package Plagger::Plugin::Aggregator::Async;
2 use strict;
3 use base qw( Plagger::Plugin::Aggregator::Simple );
4 use HTTP::Async 0.07;
5
6 __PACKAGE__->mk_accessors( qw/async/ );
7
8 sub register {
9     my($self, $context) = @_;
10
11     $self->async(
12         HTTP::Async->new( %{$self->conf->{async_args} || {}} )
13     );
14     $self->{_id2feed} = {};
15
16     $context->register_hook(
17         $self,
18         'customfeed.handle'   => \&aggregate,
19         'aggregator.finalize' => \&finalize,
20     );
21 }
22
23 sub aggregate {
24     my($self, $context, $args) = @_;
25     my $url = $args->{feed}->url;
26
27     my $id = $self->async->add( $self->prep_req( $context, $url) );
28     $self->{_id2feed}->{ $id } = $args->{feed};
29 }
30
31 sub prep_req {
32     my ( $self, $context, $url ) = @_;
33     my $req = HTTP::Request->new(
34         GET => $url
35     );
36     $req->user_agent( "Plagger/$Plagger::VERSION (http://plagger.org/)" );
37    
38     my $ref = $self->cache->get($url);
39     if ( $ref ) {
40         $req->if_modified_since( $ref->{LastModified} )
41             if $ref->{LastModified};
42         $req->header('If-None-Match', $ref->{ETag} )
43             if $ref->{ETag};
44     }
45
46     $req;
47 }
48
49 sub finalize {
50     my($self, $context, $args) = @_;
51     while ( my ( $response, $id ) = $self->async->wait_for_next_response ) {
52         my $feed = $self->{_id2feed}->{$id};
53         $context->log(info => "Fetch " . $feed->url);
54         $self->handle_response( $context, $response, $feed );
55     }
56 }
57
58 sub handle_response {
59     my ( $self, $context, $response, $feed )  = @_;
60     my $url = $response->request->uri;
61
62     if ( $response->code == 304) {
63         $context->log(error => "Not Modified: $url");
64         return;
65     }
66     elsif (! $response->is_success) {
67         $context->log(error => "Fetch for $url failed: " . $response->code);
68         return;
69     }
70
71     my $ufr = TO_URI_FETCH_RESPONSE( $response );
72     my $feed_url = Plagger::FeedParser->discover($ufr);
73     if ($url eq $feed_url) {
74         $self->handle_feed($url, \$response->content, $feed);
75     } elsif ($feed_url) {
76         my $new_id = $self->async->add( $self->prep_req($context, $feed_url ) );
77         $self->{_id2feed}->{$new_id} = $feed;
78     } else {
79         return;
80     }
81
82     $self->cache->set(
83         $response->request->uri,
84         {
85             ETag => $response->header('ETag') || '',
86             LastModified => $response->header('Last-Modified') || ''
87         }
88     );
89
90     return 1;
91 }
92
93
94 ## XXX copy from Xango
95 sub TO_URI_FETCH_RESPONSE
96 {
97     my ($r) = @_;
98
99     my $ufr = URI::Fetch::Response->new();
100     $ufr->http_status($r->code);
101     $ufr->http_response($r);
102     $ufr->status(
103         $r->previous && $r->previous->code == &HTTP::Status::RC_MOVED_PERMANENTLY ? &URI::Fetch::URI_MOVED_PERMANENTLY :
104         $r->code == &HTTP::Status::RC_GONE ? &URI::Fetch::URI_GONE :
105         $r->code == &HTTP::Status::RC_NOT_MODIFIED ? &URI::Fetch::URI_NOT_MODIFIED :
106         &URI::Fetch::URI_OK
107     );
108     $ufr->etag($r->header('ETag'));
109     $ufr->last_modified($r->header('Last-Modified'));
110     $ufr->uri($r->request->uri);
111     $ufr->content($r->content);
112     $ufr->content_type($r->content_type);
113
114     return $ufr;
115 }
116
117
118
119 1;
120 __END__
121
122 =head1 NAME
123
124 Plagger::Plugin::Aggregator::Async -Aggregate with HTTP::Async
125
126 =head1 SYNOPSIS
127
128   - module: Aggregator::Async
129     conf:
130       async_args:
131         slots: 10
132         max_redirect: 7
133
134 =head1 DESCRIPTION
135
136 This plugin implements paralle feed aggregation without blocking.
137
138 =head1 CONFIG
139
140 =over 4
141
142 =item async_args
143
144 =back
145
146 =head1 AUTHOR
147
148 Masahiro Nagano
149
150 =head1 SEE ALSO
151
152 L<Plagger>, L<Plagger::Aggregator::Simple>, L<HTTP::Async>
153
154 =cut
Note: See TracBrowser for help on using the browser.