diff --git a/planet/spider.py b/planet/spider.py index 0337479..db350da 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -336,6 +336,74 @@ def writeCache(feed_uri, feed_info, data): write(xdoc.toxml().encode('utf-8'), filename(sources, feed_uri)) xdoc.unlink() +def httpThread(thread_index, input_queue, output_queue, log): + from Queue import Empty + import httplib2 + from socket import gaierror, error + from httplib import BadStatusLine + + http_cache = config.http_cache_directory() + h = httplib2.Http(http_cache) + try: + while True: + # The non-blocking get will throw an exception when the queue + # is empty which will terminate the thread. + uri, feed_info = input_queue.get(block=False) + log.info("Fetching %s via %d", uri, thread_index) + feed = StringIO('') + setattr(feed, 'url', uri) + setattr(feed, 'headers', + feedparser.FeedParserDict({'status':'500'})) + try: + # map IRI => URI + try: + if isinstance(uri,unicode): + idna = uri.encode('idna') + else: + idna = uri.decode('utf-8').encode('idna') + if idna != uri: log.info("IRI %s mapped to %s", uri, idna) + except: + log.info("unable to map %s to a URI", uri) + idna = uri + + # issue request + (resp, content) = h.request(idna) + if resp.status == 200 and resp.fromcache: + resp.status = 304 + + # build a file-like object + feed = StringIO(content) + setattr(feed, 'url', resp.get('content-location', uri)) + if resp.has_key('content-encoding'): + del resp['content-encoding'] + setattr(feed, 'headers', resp) + except gaierror: + log.error("Fail to resolve server name %s via %d", + uri, thread_index) + except BadStatusLine: + log.error("Bad Status Line received for %s via %d", + uri, thread_index) + except error, e: + if e.__class__.__name__.lower()=='timeout': + feed.headers['status'] = '408' + log.warn("Timeout in thread-%d", thread_index) + else: + log.error("HTTP Error: %s in thread-%d", + str(e), thread_index) + except Exception, e: + import sys, traceback + type, value, tb = sys.exc_info() + log.error('Error processing %s', uri) + for line in (traceback.format_exception_only(type, value) + + traceback.format_tb(tb)): + log.error(line.rstrip()) + continue + + output_queue.put(block=True, item=(uri, feed_info, feed)) + + except Empty, e: + log.info("Thread %d finished", thread_index) + def spiderPlanet(only_if_new = False): """ Spider (fetch) an entire planet """ log = planet.getLogger(config.log_level(),config.log_format()) @@ -355,111 +423,55 @@ def spiderPlanet(only_if_new = False): log.warning("Timeout set to invalid value '%s', skipping", timeout) if int(config.spider_threads()): - from Queue import Queue, Empty + from Queue import Queue from threading import Thread - import httplib2 - from socket import gaierror, error - from httplib import BadStatusLine - work_queue = Queue() - awaiting_parsing = Queue() + fetch_queue = Queue() + parse_queue = Queue() http_cache = config.http_cache_directory() if not os.path.exists(http_cache): os.makedirs(http_cache, 0700) - def _spider_proc(thread_index): - h = httplib2.Http(http_cache) - try: - while True: - # The non-blocking get will throw an exception when the queue - # is empty which will terminate the thread. - uri = work_queue.get(block=False) - log.info("Fetching %s via %d", uri, thread_index) - resp = feedparser.FeedParserDict({'status':'500'}) - feed_info = None - content = None - try: - # read cached feed info - sources = config.cache_sources_directory() - feed_source = filename(sources, uri) - feed_info = feedparser.parse(feed_source) - if feed_info.feed and only_if_new: - log.info("Feed %s already in cache", uri) - continue - if feed_info.feed.get('planet_http_status',None) == '410': - log.info("Feed %s gone", uri) - continue + # Load the fetch_queue with all the HTTP(S) uris. + log.info("Building work queue") + for uri in config.subscriptions(): + if _is_http_uri(uri): + # read cached feed info + sources = config.cache_sources_directory() + feed_source = filename(sources, uri) + feed_info = feedparser.parse(feed_source) - try: - if isinstance(uri,unicode): - idna = uri.encode('idna') - else: - idna = uri.decode('utf-8').encode('idna') - if idna != uri: log.info("IRI %s mapped to %s", uri, idna) - except: - log.info("unable to map %s to a URI", uri) - idna = uri - (resp, content) = h.request(idna) - except gaierror: - log.error("Fail to resolve server name %s via %d", uri, thread_index) - except BadStatusLine: - log.error("Bad Status Line received for %s via %d", uri, thread_index) - except error, e: - if e.__class__.__name__.lower()=='timeout': - resp['status'] = '408' - log.warn("Timeout in thread-%d", thread_index) - else: - log.error("HTTP Error: %s in thread-%d", str(e), thread_index) - except Exception, e: - import sys, traceback - type, value, tb = sys.exc_info() - log.error('Error processing %s', uri) - for line in (traceback.format_exception_only(type, value) + - traceback.format_tb(tb)): - log.error(line.rstrip()) - continue + if feed_info.feed and only_if_new: + log.info("Feed %s already in cache", uri) + continue + if feed_info.feed.get('planet_http_status',None) == '410': + log.info("Feed %s gone", uri) + continue - if feed_info: - if resp.status == 200 and resp.fromcache: - resp.status = 304 - awaiting_parsing.put(block=True, - item=(resp, content, uri, feed_info)) - - except Empty, e: - log.info("Thread %d finished", thread_index) - pass - - # Load the work_queue with all the HTTP(S) uris. - map(work_queue.put, [uri for uri in config.subscriptions() if _is_http_uri(uri)]) + fetch_queue.put(item=((uri, feed_info))) # Start all the worker threads - threads = dict([(i, Thread(target=_spider_proc, args=(i,))) for i in range(int(config.spider_threads()))]) + threads = dict([(i, Thread(target=httpThread, + args=(i,fetch_queue, parse_queue, log))) + for i in range(int(config.spider_threads()))]) for t in threads.itervalues(): t.start() # Process the results as they arrive - while work_queue.qsize() or awaiting_parsing.qsize() or threads: - while awaiting_parsing.qsize() == 0 and threads: + while fetch_queue.qsize() or parse_queue.qsize() or threads: + while parse_queue.qsize() == 0 and threads: time.sleep(0.1) - while awaiting_parsing.qsize(): - item = awaiting_parsing.get(False) + while parse_queue.qsize(): + (uri, feed_info, feed) = parse_queue.get(False) try: - (resp_headers, content, uri, feed_info) = item - # spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) - if int(resp_headers.status) < 300: - # httplib2 was used to get the content, so prepare a - # proper object to pass to feedparser. - f = StringIO(content) - setattr(f, 'url', resp_headers.get('content-location', uri)) - if resp_headers.has_key('content-encoding'): - del resp_headers['content-encoding'] - setattr(f, 'headers', resp_headers) - data = feedparser.parse(f) + if int(feed.headers.status) < 300: + data = feedparser.parse(feed) else: - data = feedparser.FeedParserDict({'status': int(resp_headers.status), - 'headers':resp_headers, 'version':None, 'entries': []}) + data = feedparser.FeedParserDict({'version':None, + 'headers':feed.headers, 'entries': [], + 'status': int(feed.headers.status)}) writeCache(uri, feed_info, data)