diff --git a/planet/spider.py b/planet/spider.py index 16595b9..0337479 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -121,7 +121,7 @@ def _is_http_uri(uri): parsed = urlparse.urlparse(uri) return parsed[0] in ['http', 'https'] -def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): +def spiderFeed(feed_uri, only_if_new=0): """ Spider (fetch) a single feed """ log = planet.logger @@ -130,38 +130,30 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): if not os.path.exists(sources): os.makedirs(sources, 0700) - feed_source = filename(sources, feed) + feed_source = filename(sources, feed_uri) feed_info = feedparser.parse(feed_source) if feed_info.feed and only_if_new: - log.info("Feed %s already in cache", feed) + log.info("Feed %s already in cache", feed_uri) return if feed_info.feed.get('planet_http_status',None) == '410': - log.info("Feed %s gone", feed) + log.info("Feed %s gone", feed_uri) return # read feed itself - if not resp_headers: - modified = None - try: - modified=time.strptime( - feed_info.feed.get('planet_http_last_modified', None)) - except: - pass - data = feedparser.parse(feed_info.feed.get('planet_http_location',feed), - etag=feed_info.feed.get('planet_http_etag',None), modified=modified) - elif int(resp_headers.status) < 300: - # httplib2 was used to get the content, so prepare a - # proper object to pass to feedparser. - f = StringIO(content) - setattr(f, 'url', resp_headers.get('content-location', feed)) - if resp_headers: - if resp_headers.has_key('content-encoding'): - del resp_headers['content-encoding'] - setattr(f, 'headers', resp_headers) - data = feedparser.parse(f) - else: - data = feedparser.FeedParserDict({'status': int(resp_headers.status), - 'headers':resp_headers, 'version':None, 'entries': []}) + modified = None + try: + modified=time.strptime( + feed_info.feed.get('planet_http_last_modified', None)) + except: + pass + data = feedparser.parse(feed_info.feed.get('planet_http_location',feed_uri), + etag=feed_info.feed.get('planet_http_etag',None), modified=modified) + + writeCache(feed_uri, feed_info, data) + +def writeCache(feed_uri, feed_info, data): + log = planet.logger + sources = config.cache_sources_directory() # capture http status if not data.has_key("status"): @@ -173,20 +165,20 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): data.status = 500 activity_horizon = \ - time.gmtime(time.time()-86400*config.activity_threshold(feed)) + time.gmtime(time.time()-86400*config.activity_threshold(feed_uri)) # process based on the HTTP status code if data.status == 200 and data.has_key("url"): data.feed['planet_http_location'] = data.url - if feed == data.url: - log.info("Updating feed %s", feed) + if feed_uri == data.url: + log.info("Updating feed %s", feed_uri) else: - log.info("Updating feed %s @ %s", feed, data.url) + log.info("Updating feed %s @ %s", feed_uri, data.url) elif data.status == 301 and data.has_key("entries") and len(data.entries)>0: - log.warning("Feed has moved from <%s> to <%s>", feed, data.url) + log.warning("Feed has moved from <%s> to <%s>", feed_uri, data.url) data.feed['planet_http_location'] = data.url elif data.status == 304: - log.info("Feed %s unchanged", feed) + log.info("Feed %s unchanged", feed_uri) if not feed_info.feed.has_key('planet_message'): if feed_info.feed.has_key('planet_updated'): @@ -199,13 +191,13 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): del feed_info.feed['planet_message'] elif data.status == 410: - log.info("Feed %s gone", feed) + log.info("Feed %s gone", feed_uri) elif data.status == 408: - log.warning("Feed %s timed out", feed) + log.warning("Feed %s timed out", feed_uri) elif data.status >= 400: - log.error("Error %d while updating feed %s", data.status, feed) + log.error("Error %d while updating feed %s", data.status, feed_uri) else: - log.info("Updating feed %s", feed) + log.info("Updating feed %s", feed_uri) # if read failed, retain cached information if not data.version and feed_info.version: @@ -236,12 +228,12 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): break else: data.feed.links.append(feedparser.FeedParserDict( - {'rel':'self', 'type':feedtype, 'href':feed})) - for name, value in config.feed_options(feed).items(): + {'rel':'self', 'type':feedtype, 'href':feed_uri})) + for name, value in config.feed_options(feed_uri).items(): data.feed['planet_'+name] = value # perform user configured scrub operations on the data - scrub(feed, data) + scrub(feed_uri, data) from planet import idindex global index @@ -283,7 +275,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): xdoc = reconstitute.reconstitute(data, entry) output = xdoc.toxml().encode('utf-8') xdoc.unlink() - for filter in config.filters(feed): + for filter in config.filters(feed_uri): output = shell.run(filter, output, mode="filter") if not output: break if not output: continue @@ -302,7 +294,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): if index: index.close() # identify inactive feeds - if config.activity_threshold(feed): + if config.activity_threshold(feed_uri): updated = [entry.updated_parsed for entry in data.entries if entry.has_key('updated_parsed')] updated.sort() @@ -314,7 +306,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): updated = [feedparser._parse_date_iso8601(data.feed.planet_updated)] if not updated or updated[-1] < activity_horizon: - msg = "no activity in %d days" % config.activity_threshold(feed) + msg = "no activity in %d days" % config.activity_threshold(feed_uri) log.info(msg) data.feed['planet_message'] = msg @@ -341,7 +333,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): xdoc=minidom.parseString('''\n''' % planet.xmlns) reconstitute.source(xdoc.documentElement,data.feed,data.bozo,data.version) - write(xdoc.toxml().encode('utf-8'), filename(sources, feed)) + write(xdoc.toxml().encode('utf-8'), filename(sources, feed_uri)) xdoc.unlink() def spiderPlanet(only_if_new = False): @@ -367,6 +359,7 @@ def spiderPlanet(only_if_new = False): from threading import Thread import httplib2 from socket import gaierror, error + from httplib import BadStatusLine work_queue = Queue() awaiting_parsing = Queue() @@ -384,8 +377,20 @@ def spiderPlanet(only_if_new = False): uri = work_queue.get(block=False) log.info("Fetching %s via %d", uri, thread_index) resp = feedparser.FeedParserDict({'status':'500'}) + feed_info = None content = None try: + # read cached feed info + sources = config.cache_sources_directory() + feed_source = filename(sources, uri) + feed_info = feedparser.parse(feed_source) + if feed_info.feed and only_if_new: + log.info("Feed %s already in cache", uri) + continue + if feed_info.feed.get('planet_http_status',None) == '410': + log.info("Feed %s gone", uri) + continue + try: if isinstance(uri,unicode): idna = uri.encode('idna') @@ -398,6 +403,8 @@ def spiderPlanet(only_if_new = False): (resp, content) = h.request(idna) except gaierror: log.error("Fail to resolve server name %s via %d", uri, thread_index) + except BadStatusLine: + log.error("Bad Status Line received for %s via %d", uri, thread_index) except error, e: if e.__class__.__name__.lower()=='timeout': resp['status'] = '408' @@ -411,7 +418,13 @@ def spiderPlanet(only_if_new = False): for line in (traceback.format_exception_only(type, value) + traceback.format_tb(tb)): log.error(line.rstrip()) - awaiting_parsing.put(block=True, item=(resp, content, uri)) + continue + + if feed_info: + if resp.status == 200 and resp.fromcache: + resp.status = 304 + awaiting_parsing.put(block=True, + item=(resp, content, uri, feed_info)) except Empty, e: log.info("Thread %d finished", thread_index) @@ -432,10 +445,24 @@ def spiderPlanet(only_if_new = False): while awaiting_parsing.qsize(): item = awaiting_parsing.get(False) try: - (resp_headers, content, uri) = item - if resp_headers.status == 200 and resp_headers.fromcache: - resp_headers.status = 304 - spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + (resp_headers, content, uri, feed_info) = item + # spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + + if int(resp_headers.status) < 300: + # httplib2 was used to get the content, so prepare a + # proper object to pass to feedparser. + f = StringIO(content) + setattr(f, 'url', resp_headers.get('content-location', uri)) + if resp_headers.has_key('content-encoding'): + del resp_headers['content-encoding'] + setattr(f, 'headers', resp_headers) + data = feedparser.parse(f) + else: + data = feedparser.FeedParserDict({'status': int(resp_headers.status), + 'headers':resp_headers, 'version':None, 'entries': []}) + + writeCache(uri, feed_info, data) + except Exception, e: import sys, traceback type, value, tb = sys.exc_info()