diff --git a/planet/config.py b/planet/config.py index 5c8ffe3..9526f36 100644 --- a/planet/config.py +++ b/planet/config.py @@ -100,6 +100,7 @@ def __init__(): define_planet('owner_email', '') define_planet('output_theme', '') define_planet('output_dir', 'output') + define_planet('spider_threads', 0) define_planet_list('template_files') define_planet_list('bill_of_materials') @@ -282,6 +283,11 @@ def downloadReadingList(list, orig_config, callback, use_cache=True, re_read=Tru except: logger.exception("Unable to read %s readinglist", list) +def http_cache_directory(): + if parser.has_option('Planet', 'http_cache_directory'): + parser.get('Planet', 'http_cache_directory') + else: + return os.path.join(cache_directory(), 'sources/http') def cache_sources_directory(): if parser.has_option('Planet', 'cache_sources_directory'): diff --git a/planet/feedparser.py b/planet/feedparser.py index 00675e1..7bb7c60 100755 --- a/planet/feedparser.py +++ b/planet/feedparser.py @@ -11,7 +11,7 @@ Recommended: Python 2.3 or later Recommended: CJKCodecs and iconv_codec """ -__version__ = "4.2-pre-" + "$Revision: 1.144 $"[11:16] + "-cvs" +__version__ = "4.2-pre-" + "$Revision: 1.142 $"[11:16] + "-cvs" __license__ = """Copyright (c) 2002-2006, Mark Pilgrim, All rights reserved. Redistribution and use in source and binary forms, with or without modification, @@ -218,9 +218,6 @@ class FeedParserDict(UserDict): def __getitem__(self, key): if key == 'category': return UserDict.__getitem__(self, 'tags')[0]['term'] - if key == 'enclosures': - norel = lambda link: FeedParserDict([(name,value) for (name,value) in link.items() if name!='rel']) - return [norel(link) for link in UserDict.__getitem__(self, 'links') if link['rel']=='enclosure'] if key == 'categories': return [(tag['scheme'], tag['term']) for tag in UserDict.__getitem__(self, 'tags')] realkey = self.keymap.get(key, key) @@ -1306,15 +1303,15 @@ class _FeedParserMixin: attrsD.setdefault('type', 'application/atom+xml') else: attrsD.setdefault('type', 'text/html') - context = self._getContext() attrsD = self._itsAnHrefDamnIt(attrsD) if attrsD.has_key('href'): attrsD['href'] = self.resolveURI(attrsD['href']) - if attrsD.get('rel')=='enclosure' and not context.get('id'): - context['id'] = attrsD.get('href') expectingText = self.infeed or self.inentry or self.insource + context = self._getContext() context.setdefault('links', []) context['links'].append(FeedParserDict(attrsD)) + if attrsD['rel'] == 'enclosure': + self._start_enclosure(attrsD) if attrsD.has_key('href'): expectingText = 0 if (attrsD.get('rel') == 'alternate') and (self.mapContentType(attrsD.get('type')) in self.html_types): @@ -1360,7 +1357,6 @@ class _FeedParserMixin: self._start_content(attrsD) else: self.pushContent('description', attrsD, 'text/html', self.infeed or self.inentry or self.insource) - _start_dc_description = _start_description def _start_abstract(self, attrsD): self.pushContent('description', attrsD, 'text/plain', self.infeed or self.inentry or self.insource) @@ -1372,7 +1368,6 @@ class _FeedParserMixin: value = self.popContent('description') self._summaryKey = None _end_abstract = _end_description - _end_dc_description = _end_description def _start_info(self, attrsD): self.pushContent('info', attrsD, 'text/plain', 1) @@ -1432,8 +1427,7 @@ class _FeedParserMixin: def _start_enclosure(self, attrsD): attrsD = self._itsAnHrefDamnIt(attrsD) context = self._getContext() - attrsD['rel']='enclosure' - context.setdefault('links', []).append(FeedParserDict(attrsD)) + context.setdefault('enclosures', []).append(FeedParserDict(attrsD)) href = attrsD.get('href') if href and not context.get('id'): context['id'] = href @@ -3254,7 +3248,7 @@ def _stripDoctype(data): return version, data, dict(replacement and safe_pattern.findall(replacement)) -def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[]): +def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[], resp_headers=None): '''Parse a feed from a URL, file, stream, or string''' result = FeedParserDict() result['feed'] = FeedParserDict() @@ -3263,6 +3257,9 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer result['bozo'] = 0 if type(handlers) == types.InstanceType: handlers = [handlers] + if resp_headers: + f = None + data = url_file_stream_or_string try: f = _open_resource(url_file_stream_or_string, etag, modified, agent, referrer, handlers) data = f.read() @@ -3307,6 +3304,8 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer result['status'] = f.status if hasattr(f, 'headers'): result['headers'] = f.headers.dict + if resp_headers: + result['headers'] = resp_headers if hasattr(f, 'close'): f.close() diff --git a/planet/spider.py b/planet/spider.py index 7e6d91b..ce6cbdd 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -4,7 +4,7 @@ and write each as a set of entries in a cache directory. """ # Standard library modules -import time, calendar, re, os +import time, calendar, re, os, urlparse from xml.dom import minidom # Planet modules import planet, config, feedparser, reconstitute, shell @@ -116,8 +116,11 @@ def scrub(feed, data): source.author_detail.has_key('name'): source.author_detail['name'] = \ str(stripHtml(source.author_detail.name)) +def _is_http_uri(uri): + parsed = urlparse.urlparse(uri) + return parsed[0] in ['http', 'https'] -def spiderFeed(feed, only_if_new=0): +def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): """ Spider (fetch) a single feed """ log = planet.logger @@ -125,6 +128,7 @@ def spiderFeed(feed, only_if_new=0): sources = config.cache_sources_directory() if not os.path.exists(sources): os.makedirs(sources, 0700) + feed_source = filename(sources, feed) feed_info = feedparser.parse(feed_source) if feed_info.feed and only_if_new: @@ -135,14 +139,17 @@ def spiderFeed(feed, only_if_new=0): return # read feed itself - modified = None - try: - modified=time.strptime( - feed_info.feed.get('planet_http_last_modified', None)) - except: - pass - data = feedparser.parse(feed_info.feed.get('planet_http_location',feed), - etag=feed_info.feed.get('planet_http_etag',None), modified=modified) + if content: + data = feedparser.parse(content, resp_headers) + else: + modified = None + try: + modified=time.strptime( + feed_info.feed.get('planet_http_last_modified', None)) + except: + pass + data = feedparser.parse(feed_info.feed.get('planet_http_location',feed), + etag=feed_info.feed.get('planet_http_etag',None), modified=modified) # capture http status if not data.has_key("status"): @@ -319,12 +326,62 @@ def spiderFeed(feed, only_if_new=0): def spiderPlanet(only_if_new = False): """ Spider (fetch) an entire planet """ log = planet.getLogger(config.log_level(),config.log_format()) - planet.setTimeout(config.feed_timeout()) global index index = True - for feed in config.subscriptions(): + if config.spider_threads(): + import Queue + from threading import Thread + import httplib2 + + work_queue = Queue() + awaiting_parsing = Queue() + + def _spider_proc(): + h = httplib2.Http(config.http_cache_directory()) + while True: + # The non-blocking get will throw an exception when the queue + # is empty which will terminate the thread. + uri = work_queue.get(block=False): + log.info("Fetching %s", uri) + (resp, content) = h.request(uri) + awaiting_parsing.put(block=True, (resp, content, uri)) + + # Load the work_queue with all the HTTP(S) uris. + map(work_queue.put, [uri for uri in config.subscriptions if _is_http_uri(uri)]) + + # Start all the worker threads + threads = dict([(i, Thread(target=_spider_proc)) for i in range(config.spider_threads())]) + for t in threads.itervalues(): + t.start() + + # Process the results as they arrive + while work_queue.qsize() and awaiting_parsing.qsize() and threads: + item = awaiting_parsing.get(False) + if not item and threads: + time.sleep(1) + while item: + try: + (resp_headers, content, uri) = item + spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + except Exception, e: + import sys, traceback + type, value, tb = sys.exc_info() + log.error('Error processing %s', feed) + for line in (traceback.format_exception_only(type, value) + + traceback.format_tb(tb)): + log.error(line.rstrip()) + item = awaiting_parsing.get(False) + for index in threads: + if not threads[index].isAlive(): + del threads[index] + + + planet.setTimeout(config.feed_timeout()) + # Process non-HTTP uris if we are threading, otherwise process *all* uris here. + unthreaded_work_queue = [uri for uri in config.subscriptions if not config.spider_threads() or not _is_http_uri(uri)] + for feed in unthreaded_work_queue: try: spiderFeed(feed, only_if_new=only_if_new) except Exception,e: @@ -334,3 +391,6 @@ def spiderPlanet(only_if_new = False): for line in (traceback.format_exception_only(type, value) + traceback.format_tb(tb)): log.error(line.rstrip()) + + +