From b9604d8330de3a09749682e9888d1c08132ab25b Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Thu, 2 Nov 2006 11:59:25 -0500 Subject: [PATCH 01/39] Different approach to threading --- planet/config.py | 6 ++++ planet/feedparser.py | 23 ++++++------ planet/spider.py | 84 +++++++++++++++++++++++++++++++++++++------- 3 files changed, 89 insertions(+), 24 deletions(-) diff --git a/planet/config.py b/planet/config.py index 5c8ffe3..9526f36 100644 --- a/planet/config.py +++ b/planet/config.py @@ -100,6 +100,7 @@ def __init__(): define_planet('owner_email', '') define_planet('output_theme', '') define_planet('output_dir', 'output') + define_planet('spider_threads', 0) define_planet_list('template_files') define_planet_list('bill_of_materials') @@ -282,6 +283,11 @@ def downloadReadingList(list, orig_config, callback, use_cache=True, re_read=Tru except: logger.exception("Unable to read %s readinglist", list) +def http_cache_directory(): + if parser.has_option('Planet', 'http_cache_directory'): + parser.get('Planet', 'http_cache_directory') + else: + return os.path.join(cache_directory(), 'sources/http') def cache_sources_directory(): if parser.has_option('Planet', 'cache_sources_directory'): diff --git a/planet/feedparser.py b/planet/feedparser.py index 00675e1..7bb7c60 100755 --- a/planet/feedparser.py +++ b/planet/feedparser.py @@ -11,7 +11,7 @@ Recommended: Python 2.3 or later Recommended: CJKCodecs and iconv_codec """ -__version__ = "4.2-pre-" + "$Revision: 1.144 $"[11:16] + "-cvs" +__version__ = "4.2-pre-" + "$Revision: 1.142 $"[11:16] + "-cvs" __license__ = """Copyright (c) 2002-2006, Mark Pilgrim, All rights reserved. Redistribution and use in source and binary forms, with or without modification, @@ -218,9 +218,6 @@ class FeedParserDict(UserDict): def __getitem__(self, key): if key == 'category': return UserDict.__getitem__(self, 'tags')[0]['term'] - if key == 'enclosures': - norel = lambda link: FeedParserDict([(name,value) for (name,value) in link.items() if name!='rel']) - return [norel(link) for link in UserDict.__getitem__(self, 'links') if link['rel']=='enclosure'] if key == 'categories': return [(tag['scheme'], tag['term']) for tag in UserDict.__getitem__(self, 'tags')] realkey = self.keymap.get(key, key) @@ -1306,15 +1303,15 @@ class _FeedParserMixin: attrsD.setdefault('type', 'application/atom+xml') else: attrsD.setdefault('type', 'text/html') - context = self._getContext() attrsD = self._itsAnHrefDamnIt(attrsD) if attrsD.has_key('href'): attrsD['href'] = self.resolveURI(attrsD['href']) - if attrsD.get('rel')=='enclosure' and not context.get('id'): - context['id'] = attrsD.get('href') expectingText = self.infeed or self.inentry or self.insource + context = self._getContext() context.setdefault('links', []) context['links'].append(FeedParserDict(attrsD)) + if attrsD['rel'] == 'enclosure': + self._start_enclosure(attrsD) if attrsD.has_key('href'): expectingText = 0 if (attrsD.get('rel') == 'alternate') and (self.mapContentType(attrsD.get('type')) in self.html_types): @@ -1360,7 +1357,6 @@ class _FeedParserMixin: self._start_content(attrsD) else: self.pushContent('description', attrsD, 'text/html', self.infeed or self.inentry or self.insource) - _start_dc_description = _start_description def _start_abstract(self, attrsD): self.pushContent('description', attrsD, 'text/plain', self.infeed or self.inentry or self.insource) @@ -1372,7 +1368,6 @@ class _FeedParserMixin: value = self.popContent('description') self._summaryKey = None _end_abstract = _end_description - _end_dc_description = _end_description def _start_info(self, attrsD): self.pushContent('info', attrsD, 'text/plain', 1) @@ -1432,8 +1427,7 @@ class _FeedParserMixin: def _start_enclosure(self, attrsD): attrsD = self._itsAnHrefDamnIt(attrsD) context = self._getContext() - attrsD['rel']='enclosure' - context.setdefault('links', []).append(FeedParserDict(attrsD)) + context.setdefault('enclosures', []).append(FeedParserDict(attrsD)) href = attrsD.get('href') if href and not context.get('id'): context['id'] = href @@ -3254,7 +3248,7 @@ def _stripDoctype(data): return version, data, dict(replacement and safe_pattern.findall(replacement)) -def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[]): +def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[], resp_headers=None): '''Parse a feed from a URL, file, stream, or string''' result = FeedParserDict() result['feed'] = FeedParserDict() @@ -3263,6 +3257,9 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer result['bozo'] = 0 if type(handlers) == types.InstanceType: handlers = [handlers] + if resp_headers: + f = None + data = url_file_stream_or_string try: f = _open_resource(url_file_stream_or_string, etag, modified, agent, referrer, handlers) data = f.read() @@ -3307,6 +3304,8 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer result['status'] = f.status if hasattr(f, 'headers'): result['headers'] = f.headers.dict + if resp_headers: + result['headers'] = resp_headers if hasattr(f, 'close'): f.close() diff --git a/planet/spider.py b/planet/spider.py index 7e6d91b..ce6cbdd 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -4,7 +4,7 @@ and write each as a set of entries in a cache directory. """ # Standard library modules -import time, calendar, re, os +import time, calendar, re, os, urlparse from xml.dom import minidom # Planet modules import planet, config, feedparser, reconstitute, shell @@ -116,8 +116,11 @@ def scrub(feed, data): source.author_detail.has_key('name'): source.author_detail['name'] = \ str(stripHtml(source.author_detail.name)) +def _is_http_uri(uri): + parsed = urlparse.urlparse(uri) + return parsed[0] in ['http', 'https'] -def spiderFeed(feed, only_if_new=0): +def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): """ Spider (fetch) a single feed """ log = planet.logger @@ -125,6 +128,7 @@ def spiderFeed(feed, only_if_new=0): sources = config.cache_sources_directory() if not os.path.exists(sources): os.makedirs(sources, 0700) + feed_source = filename(sources, feed) feed_info = feedparser.parse(feed_source) if feed_info.feed and only_if_new: @@ -135,14 +139,17 @@ def spiderFeed(feed, only_if_new=0): return # read feed itself - modified = None - try: - modified=time.strptime( - feed_info.feed.get('planet_http_last_modified', None)) - except: - pass - data = feedparser.parse(feed_info.feed.get('planet_http_location',feed), - etag=feed_info.feed.get('planet_http_etag',None), modified=modified) + if content: + data = feedparser.parse(content, resp_headers) + else: + modified = None + try: + modified=time.strptime( + feed_info.feed.get('planet_http_last_modified', None)) + except: + pass + data = feedparser.parse(feed_info.feed.get('planet_http_location',feed), + etag=feed_info.feed.get('planet_http_etag',None), modified=modified) # capture http status if not data.has_key("status"): @@ -319,12 +326,62 @@ def spiderFeed(feed, only_if_new=0): def spiderPlanet(only_if_new = False): """ Spider (fetch) an entire planet """ log = planet.getLogger(config.log_level(),config.log_format()) - planet.setTimeout(config.feed_timeout()) global index index = True - for feed in config.subscriptions(): + if config.spider_threads(): + import Queue + from threading import Thread + import httplib2 + + work_queue = Queue() + awaiting_parsing = Queue() + + def _spider_proc(): + h = httplib2.Http(config.http_cache_directory()) + while True: + # The non-blocking get will throw an exception when the queue + # is empty which will terminate the thread. + uri = work_queue.get(block=False): + log.info("Fetching %s", uri) + (resp, content) = h.request(uri) + awaiting_parsing.put(block=True, (resp, content, uri)) + + # Load the work_queue with all the HTTP(S) uris. + map(work_queue.put, [uri for uri in config.subscriptions if _is_http_uri(uri)]) + + # Start all the worker threads + threads = dict([(i, Thread(target=_spider_proc)) for i in range(config.spider_threads())]) + for t in threads.itervalues(): + t.start() + + # Process the results as they arrive + while work_queue.qsize() and awaiting_parsing.qsize() and threads: + item = awaiting_parsing.get(False) + if not item and threads: + time.sleep(1) + while item: + try: + (resp_headers, content, uri) = item + spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + except Exception, e: + import sys, traceback + type, value, tb = sys.exc_info() + log.error('Error processing %s', feed) + for line in (traceback.format_exception_only(type, value) + + traceback.format_tb(tb)): + log.error(line.rstrip()) + item = awaiting_parsing.get(False) + for index in threads: + if not threads[index].isAlive(): + del threads[index] + + + planet.setTimeout(config.feed_timeout()) + # Process non-HTTP uris if we are threading, otherwise process *all* uris here. + unthreaded_work_queue = [uri for uri in config.subscriptions if not config.spider_threads() or not _is_http_uri(uri)] + for feed in unthreaded_work_queue: try: spiderFeed(feed, only_if_new=only_if_new) except Exception,e: @@ -334,3 +391,6 @@ def spiderPlanet(only_if_new = False): for line in (traceback.format_exception_only(type, value) + traceback.format_tb(tb)): log.error(line.rstrip()) + + + From 58bb4b6e05400491528d51d4d6df13a03d9d7649 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Thu, 2 Nov 2006 13:29:01 -0500 Subject: [PATCH 02/39] Seems to working now --- planet.py | 5 ++++- planet/spider.py | 43 ++++++++++++++++++++++++------------------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/planet.py b/planet.py index 62fb7ac..a285f6c 100755 --- a/planet.py +++ b/planet.py @@ -54,7 +54,10 @@ if __name__ == "__main__": if not offline: from planet import spider - spider.spiderPlanet(only_if_new=only_if_new) + try: + spider.spiderPlanet(only_if_new=only_if_new) + except Exception, e: + print e from planet import splice doc = splice.splice() diff --git a/planet/spider.py b/planet/spider.py index ce6cbdd..3e98365 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -330,40 +330,45 @@ def spiderPlanet(only_if_new = False): global index index = True - if config.spider_threads(): - import Queue + if int(config.spider_threads()): + from Queue import Queue, Empty from threading import Thread import httplib2 work_queue = Queue() awaiting_parsing = Queue() - def _spider_proc(): + def _spider_proc(thread_index): h = httplib2.Http(config.http_cache_directory()) - while True: - # The non-blocking get will throw an exception when the queue - # is empty which will terminate the thread. - uri = work_queue.get(block=False): - log.info("Fetching %s", uri) - (resp, content) = h.request(uri) - awaiting_parsing.put(block=True, (resp, content, uri)) + try: + while True: + # The non-blocking get will throw an exception when the queue + # is empty which will terminate the thread. + uri = work_queue.get(block=False) + log.info("Fetching %s via %d", uri, thread_index) + (resp, content) = h.request(uri) + awaiting_parsing.put(block=True, item=(resp, content, uri)) + except Empty, e: + log.info("Thread %d finished", thread_index) + pass # Load the work_queue with all the HTTP(S) uris. - map(work_queue.put, [uri for uri in config.subscriptions if _is_http_uri(uri)]) + map(work_queue.put, [uri for uri in config.subscriptions() if _is_http_uri(uri)]) # Start all the worker threads - threads = dict([(i, Thread(target=_spider_proc)) for i in range(config.spider_threads())]) + threads = dict([(i, Thread(target=_spider_proc, args=(i,))) for i in range(int(config.spider_threads()))]) for t in threads.itervalues(): t.start() # Process the results as they arrive - while work_queue.qsize() and awaiting_parsing.qsize() and threads: - item = awaiting_parsing.get(False) - if not item and threads: + while work_queue.qsize() or awaiting_parsing.qsize() or threads: + if awaiting_parsing.qsize() == 0 and threads: time.sleep(1) - while item: + while awaiting_parsing.qsize(): + item = awaiting_parsing.get(False) try: (resp_headers, content, uri) = item + log.info("Parsing pre-fetched %s", uri) spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) except Exception, e: import sys, traceback @@ -372,15 +377,15 @@ def spiderPlanet(only_if_new = False): for line in (traceback.format_exception_only(type, value) + traceback.format_tb(tb)): log.error(line.rstrip()) - item = awaiting_parsing.get(False) - for index in threads: + for index in threads.keys(): if not threads[index].isAlive(): del threads[index] + log.info("Finished threaded part of processing.") planet.setTimeout(config.feed_timeout()) # Process non-HTTP uris if we are threading, otherwise process *all* uris here. - unthreaded_work_queue = [uri for uri in config.subscriptions if not config.spider_threads() or not _is_http_uri(uri)] + unthreaded_work_queue = [uri for uri in config.subscriptions() if not int(config.spider_threads()) or not _is_http_uri(uri)] for feed in unthreaded_work_queue: try: spiderFeed(feed, only_if_new=only_if_new) From 217e850e41147d3c9274c0bf91b78fcf27ec8971 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Thu, 2 Nov 2006 14:48:47 -0500 Subject: [PATCH 03/39] Still having problems with channel_name. --- planet/feedparser.py | 16 +++++++++++----- planet/spider.py | 8 ++++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/planet/feedparser.py b/planet/feedparser.py index 7bb7c60..d24d82a 100755 --- a/planet/feedparser.py +++ b/planet/feedparser.py @@ -11,7 +11,7 @@ Recommended: Python 2.3 or later Recommended: CJKCodecs and iconv_codec """ -__version__ = "4.2-pre-" + "$Revision: 1.142 $"[11:16] + "-cvs" +__version__ = "4.2-pre-" + "$Revision: 1.145 $"[11:16] + "-cvs" __license__ = """Copyright (c) 2002-2006, Mark Pilgrim, All rights reserved. Redistribution and use in source and binary forms, with or without modification, @@ -218,6 +218,9 @@ class FeedParserDict(UserDict): def __getitem__(self, key): if key == 'category': return UserDict.__getitem__(self, 'tags')[0]['term'] + if key == 'enclosures': + norel = lambda link: FeedParserDict([(name,value) for (name,value) in link.items() if name!='rel']) + return [norel(link) for link in UserDict.__getitem__(self, 'links') if link['rel']=='enclosure'] if key == 'categories': return [(tag['scheme'], tag['term']) for tag in UserDict.__getitem__(self, 'tags')] realkey = self.keymap.get(key, key) @@ -1303,15 +1306,15 @@ class _FeedParserMixin: attrsD.setdefault('type', 'application/atom+xml') else: attrsD.setdefault('type', 'text/html') + context = self._getContext() attrsD = self._itsAnHrefDamnIt(attrsD) if attrsD.has_key('href'): attrsD['href'] = self.resolveURI(attrsD['href']) + if attrsD.get('rel')=='enclosure' and not context.get('id'): + context['id'] = attrsD.get('href') expectingText = self.infeed or self.inentry or self.insource - context = self._getContext() context.setdefault('links', []) context['links'].append(FeedParserDict(attrsD)) - if attrsD['rel'] == 'enclosure': - self._start_enclosure(attrsD) if attrsD.has_key('href'): expectingText = 0 if (attrsD.get('rel') == 'alternate') and (self.mapContentType(attrsD.get('type')) in self.html_types): @@ -1357,6 +1360,7 @@ class _FeedParserMixin: self._start_content(attrsD) else: self.pushContent('description', attrsD, 'text/html', self.infeed or self.inentry or self.insource) + _start_dc_description = _start_description def _start_abstract(self, attrsD): self.pushContent('description', attrsD, 'text/plain', self.infeed or self.inentry or self.insource) @@ -1368,6 +1372,7 @@ class _FeedParserMixin: value = self.popContent('description') self._summaryKey = None _end_abstract = _end_description + _end_dc_description = _end_description def _start_info(self, attrsD): self.pushContent('info', attrsD, 'text/plain', 1) @@ -1427,7 +1432,8 @@ class _FeedParserMixin: def _start_enclosure(self, attrsD): attrsD = self._itsAnHrefDamnIt(attrsD) context = self._getContext() - context.setdefault('enclosures', []).append(FeedParserDict(attrsD)) + attrsD['rel']='enclosure' + context.setdefault('links', []).append(FeedParserDict(attrsD)) href = attrsD.get('href') if href and not context.get('id'): context['id'] = href diff --git a/planet/spider.py b/planet/spider.py index 3e98365..2ffad1e 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -140,7 +140,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): # read feed itself if content: - data = feedparser.parse(content, resp_headers) + data = feedparser.parse(content, resp_headers=resp_headers) else: modified = None try: @@ -338,8 +338,12 @@ def spiderPlanet(only_if_new = False): work_queue = Queue() awaiting_parsing = Queue() + http_cache = config.http_cache_directory() + if not os.path.exists(http_cache): + os.makedirs(http_cache, 0700) + def _spider_proc(thread_index): - h = httplib2.Http(config.http_cache_directory()) + h = httplib2.Http(http_cache) try: while True: # The non-blocking get will throw an exception when the queue From b2ccc8c1ff6be8e6d2030c67463b3b9e773348e0 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Fri, 3 Nov 2006 11:40:16 -0500 Subject: [PATCH 04/39] added 304 checking before calling spiderFeed() --- planet/spider.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/planet/spider.py b/planet/spider.py index 2ffad1e..3bf0d7b 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -372,8 +372,9 @@ def spiderPlanet(only_if_new = False): item = awaiting_parsing.get(False) try: (resp_headers, content, uri) = item - log.info("Parsing pre-fetched %s", uri) - spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + if not resp_headers.fromcache: + log.info("Parsing pre-fetched %s", uri) + spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) except Exception, e: import sys, traceback type, value, tb = sys.exc_info() From 72318e770b929c3308f48303e5edd7bacfe52968 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Fri, 3 Nov 2006 11:45:34 -0500 Subject: [PATCH 05/39] Documented spider_threads --- docs/config.html | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/config.html b/docs/config.html index b20d28c..b201b26 100644 --- a/docs/config.html +++ b/docs/config.html @@ -98,6 +98,9 @@ use for logging output. Note: this configuration value is processed
Number of seconds to wait for any given feed
new_feed_items
Number of items to take from new feeds
+
spider_threads
+
The number of threads to use when spidering. When set to 0, the default, + no threads are used and spidering follows the traditional algorithm.
From 796da216b2eeca13bf16b51e5e7951965a845cce Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Fri, 3 Nov 2006 12:00:27 -0500 Subject: [PATCH 06/39] Added httplib2 --- httplib2/__init__.py | 820 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 820 insertions(+) create mode 100644 httplib2/__init__.py diff --git a/httplib2/__init__.py b/httplib2/__init__.py new file mode 100644 index 0000000..83421b4 --- /dev/null +++ b/httplib2/__init__.py @@ -0,0 +1,820 @@ +""" +httplib2 + +A caching http interface that supports ETags and gzip +to conserve bandwidth. + +Requires Python 2.3 or later + +""" + +__author__ = "Joe Gregorio (joe@bitworking.org)" +__copyright__ = "Copyright 2006, Joe Gregorio" +__contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)", + "James Antill", + "Xavier Verges Farrero", + "Jonathan Feinberg", + "Blair Zajac"] +__license__ = "MIT" +__version__ = "$Rev: 204 $" + +import re +import md5 +import rfc822 +import StringIO +import gzip +import zlib +import httplib +import urlparse +import base64 +import os +import copy +import calendar +import time +import random +import sha +import hmac +from gettext import gettext as _ + +__all__ = ['Http', 'Response', 'HttpLib2Error', + 'RedirectMissingLocation', 'RedirectLimit', 'FailedToDecompressContent', + 'UnimplementedDigestAuthOptionError', 'UnimplementedHmacDigestAuthOptionError', + 'debuglevel'] + + +# The httplib debug level, set to a non-zero value to get debug output +debuglevel = 0 + +# Python 2.3 support +if 'sorted' not in __builtins__: + def sorted(seq): + seq.sort() + return seq + +# Python 2.3 support +def HTTPResponse__getheaders(self): + """Return list of (header, value) tuples.""" + if self.msg is None: + print "================================" + raise httplib.ResponseNotReady() + return self.msg.items() + +if not hasattr(httplib.HTTPResponse, 'getheaders'): + httplib.HTTPResponse.getheaders = HTTPResponse__getheaders + +# All exceptions raised here derive from HttpLib2Error +class HttpLib2Error(Exception): pass + +class RedirectMissingLocation(HttpLib2Error): pass +class RedirectLimit(HttpLib2Error): pass +class FailedToDecompressContent(HttpLib2Error): pass +class UnimplementedDigestAuthOptionError(HttpLib2Error): pass +class UnimplementedHmacDigestAuthOptionError(HttpLib2Error): pass + +# Open Items: +# ----------- +# Proxy support + +# Are we removing the cached content too soon on PUT (only delete on 200 Maybe?) + +# Pluggable cache storage (supports storing the cache in +# flat files by default. We need a plug-in architecture +# that can support Berkeley DB and Squid) + +# == Known Issues == +# Does not handle a resource that uses conneg and Last-Modified but no ETag as a cache validator. +# Does not handle Cache-Control: max-stale +# Does not use Age: headers when calculating cache freshness. + + +# The number of redirections to follow before giving up. +# Note that only GET redirects are automatically followed. +# Will also honor 301 requests by saving that info and never +# requesting that URI again. +DEFAULT_MAX_REDIRECTS = 5 + +# Which headers are hop-by-hop headers by default +HOP_BY_HOP = ['connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', 'upgrade'] + +def _get_end2end_headers(response): + hopbyhop = list(HOP_BY_HOP) + hopbyhop.extend([x.strip() for x in response.get('connection', '').split(',')]) + return [header for header in response.keys() if header not in hopbyhop] + +URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?") + +def parse_uri(uri): + """Parses a URI using the regex given in Appendix B of RFC 3986. + + (scheme, authority, path, query, fragment) = parse_uri(uri) + """ + groups = URI.match(uri).groups() + return (groups[1], groups[3], groups[4], groups[6], groups[8]) + +NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') +def _normalize_headers(headers): + return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.iteritems()]) + +def _parse_cache_control(headers): + retval = {} + if headers.has_key('cache-control'): + parts = headers['cache-control'].split(',') + parts_with_args = [tuple([x.strip() for x in part.split("=")]) for part in parts if -1 != part.find("=")] + parts_wo_args = [(name.strip(), 1) for name in parts if -1 == name.find("=")] + retval = dict(parts_with_args + parts_wo_args) + return retval + +# Whether to use a strict mode to parse WWW-Authenticate headers +# Might lead to bad results in case of ill-formed header value, +# so disabled by default, falling back to relaxed parsing. +# Set to true to turn on, usefull for testing servers. +USE_WWW_AUTH_STRICT_PARSING = 0 + +# In regex below: +# [^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+ matches a "token" as defined by HTTP +# "(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?" matches a "quoted-string" as defined by HTTP, when LWS have already been replaced by a single space +# Actually, as an auth-param value can be either a token or a quoted-string, they are combined in a single pattern which matches both: +# \"?((?<=\")(?:[^\0-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?@,;:\\\"/[\]?={} \t]+(?!\"))\"? +WWW_AUTH_STRICT = re.compile(r"^(?:\s*(?:,\s*)?([^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+)\s*=\s*\"?((?<=\")(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?@,;:\\\"/[\]?={} \t]+(?!\"))\"?)(.*)$") +WWW_AUTH_RELAXED = re.compile(r"^(?:\s*(?:,\s*)?([^ \t\r\n=]+)\s*=\s*\"?((?<=\")(?:[^\\\"]|\\.)*?(?=\")|(? current_age: + retval = "FRESH" + return retval + +def _decompressContent(response, new_content): + content = new_content + try: + if response.get('content-encoding', None) == 'gzip': + content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() + if response.get('content-encoding', None) == 'deflate': + content = zlib.decompress(content) + except: + content = "" + raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding')) + return content + +def _updateCache(request_headers, response_headers, content, cache, cachekey): + if cachekey: + cc = _parse_cache_control(request_headers) + cc_response = _parse_cache_control(response_headers) + if cc.has_key('no-store') or cc_response.has_key('no-store'): + cache.delete(cachekey) + else: + f = StringIO.StringIO("") + info = rfc822.Message(StringIO.StringIO("")) + for key, value in response_headers.iteritems(): + info[key] = value + f.write(str(info)) + f.write("\r\n\r\n") + f.write(content) + cache.set(cachekey, f.getvalue()) + +def _cnonce(): + dig = md5.new("%s:%s" % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])).hexdigest() + return dig[:16] + +def _wsse_username_token(cnonce, iso_now, password): + return base64.encodestring(sha.new("%s%s%s" % (cnonce, iso_now, password)).digest()).strip() + + +# For credentials we need two things, first +# a pool of credential to try (not necesarily tied to BAsic, Digest, etc.) +# Then we also need a list of URIs that have already demanded authentication +# That list is tricky since sub-URIs can take the same auth, or the +# auth scheme may change as you descend the tree. +# So we also need each Auth instance to be able to tell us +# how close to the 'top' it is. + +class Authentication: + def __init__(self, credentials, host, request_uri, headers, response, content, http): + (scheme, authority, path, query, fragment) = parse_uri(request_uri) + self.path = path + self.host = host + self.credentials = credentials + self.http = http + + def depth(self, request_uri): + (scheme, authority, path, query, fragment) = parse_uri(request_uri) + return request_uri[len(self.path):].count("/") + + def inscope(self, host, request_uri): + # XXX Should we normalize the request_uri? + (scheme, authority, path, query, fragment) = parse_uri(request_uri) + return (host == self.host) and path.startswith(self.path) + + def request(self, method, request_uri, headers, content): + """Modify the request headers to add the appropriate + Authorization header. Over-rise this in sub-classes.""" + pass + + def response(self, response, content): + """Gives us a chance to update with new nonces + or such returned from the last authorized response. + Over-rise this in sub-classes if necessary. + + Return TRUE is the request is to be retried, for + example Digest may return stale=true. + """ + return False + + + +class BasicAuthentication(Authentication): + def __init__(self, credentials, host, request_uri, headers, response, content, http): + Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) + + def request(self, method, request_uri, headers, content): + """Modify the request headers to add the appropriate + Authorization header.""" + headers['authorization'] = 'Basic ' + base64.encodestring("%s:%s" % self.credentials).strip() + + +class DigestAuthentication(Authentication): + """Only do qop='auth' and MD5, since that + is all Apache currently implements""" + def __init__(self, credentials, host, request_uri, headers, response, content, http): + Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) + challenge = _parse_www_authenticate(response, 'www-authenticate') + self.challenge = challenge['digest'] + qop = self.challenge.get('qop') + self.challenge['qop'] = ('auth' in [x.strip() for x in qop.split()]) and 'auth' or None + if self.challenge['qop'] is None: + raise UnimplementedDigestAuthOptionError( _("Unsupported value for qop: %s." % qop)) + self.challenge['algorithm'] = self.challenge.get('algorithm', 'MD5') + if self.challenge['algorithm'] != 'MD5': + raise UnimplementedDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm'])) + self.A1 = "".join([self.credentials[0], ":", self.challenge['realm'], ":", self.credentials[1]]) + self.challenge['nc'] = 1 + + def request(self, method, request_uri, headers, content, cnonce = None): + """Modify the request headers""" + H = lambda x: md5.new(x).hexdigest() + KD = lambda s, d: H("%s:%s" % (s, d)) + A2 = "".join([method, ":", request_uri]) + self.challenge['cnonce'] = cnonce or _cnonce() + request_digest = '"%s"' % KD(H(self.A1), "%s:%s:%s:%s:%s" % (self.challenge['nonce'], + '%08x' % self.challenge['nc'], + self.challenge['cnonce'], + self.challenge['qop'], H(A2) + )) + headers['Authorization'] = 'Digest username="%s", realm="%s", nonce="%s", uri="%s", algorithm=%s, response=%s, qop=%s, nc=%08x, cnonce="%s"' % ( + self.credentials[0], + self.challenge['realm'], + self.challenge['nonce'], + request_uri, + self.challenge['algorithm'], + request_digest, + self.challenge['qop'], + self.challenge['nc'], + self.challenge['cnonce'], + ) + self.challenge['nc'] += 1 + + def response(self, response, content): + if not response.has_key('authentication-info'): + challenge = _parse_www_authenticate(response, 'www-authenticate')['digest'] + if 'true' == challenge.get('stale'): + self.challenge['nonce'] = challenge['nonce'] + self.challenge['nc'] = 1 + return True + else: + updated_challenge = _parse_www_authenticate(response, 'authentication-info')['digest'] + + if updated_challenge.has_key('nextnonce'): + self.challenge['nonce'] = updated_challenge['nextnonce'] + self.challenge['nc'] = 1 + return False + + +class HmacDigestAuthentication(Authentication): + """Adapted from Robert Sayre's code and DigestAuthentication above.""" + __author__ = "Thomas Broyer (t.broyer@ltgt.net)" + + def __init__(self, credentials, host, request_uri, headers, response, content, http): + Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) + challenge = _parse_www_authenticate(response, 'www-authenticate') + self.challenge = challenge['hmacdigest'] + print self.challenge + # TODO: self.challenge['domain'] + self.challenge['reason'] = self.challenge.get('reason', 'unauthorized') + if self.challenge['reason'] not in ['unauthorized', 'integrity']: + self.challenge['reason'] = 'unauthorized' + self.challenge['salt'] = self.challenge.get('salt', '') + if not self.challenge.get('snonce'): + raise UnimplementedHmacDigestAuthOptionError( _("The challenge doesn't contain a server nonce, or this one is empty.")) + self.challenge['algorithm'] = self.challenge.get('algorithm', 'HMAC-SHA-1') + if self.challenge['algorithm'] not in ['HMAC-SHA-1', 'HMAC-MD5']: + raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for algorithm: %s." % self.challenge['algorithm'])) + self.challenge['pw-algorithm'] = self.challenge.get('pw-algorithm', 'SHA-1') + if self.challenge['pw-algorithm'] not in ['SHA-1', 'MD5']: + raise UnimplementedHmacDigestAuthOptionError( _("Unsupported value for pw-algorithm: %s." % self.challenge['pw-algorithm'])) + if self.challenge['algorithm'] == 'HMAC-MD5': + self.hashmod = md5 + else: + self.hashmod = sha + if self.challenge['pw-algorithm'] == 'MD5': + self.pwhashmod = md5 + else: + self.pwhashmod = sha + self.key = "".join([self.credentials[0], ":", + self.pwhashmod.new("".join([self.credentials[1], self.challenge['salt']])).hexdigest().lower(), + ":", self.challenge['realm'] + ]) + print response['www-authenticate'] + print "".join([self.credentials[1], self.challenge['salt']]) + print "key_str = %s" % self.key + self.key = self.pwhashmod.new(self.key).hexdigest().lower() + + def request(self, method, request_uri, headers, content): + """Modify the request headers""" + keys = _get_end2end_headers(headers) + keylist = "".join(["%s " % k for k in keys]) + headers_val = "".join([headers[k] for k in keys]) + created = time.strftime('%Y-%m-%dT%H:%M:%SZ',time.gmtime()) + cnonce = _cnonce() + request_digest = "%s:%s:%s:%s:%s" % (method, request_uri, cnonce, self.challenge['snonce'], headers_val) + print "key = %s" % self.key + print "msg = %s" % request_digest + request_digest = hmac.new(self.key, request_digest, self.hashmod).hexdigest().lower() + headers['Authorization'] = 'HMACDigest username="%s", realm="%s", snonce="%s", cnonce="%s", uri="%s", created="%s", response="%s", headers="%s"' % ( + self.credentials[0], + self.challenge['realm'], + self.challenge['snonce'], + cnonce, + request_uri, + created, + request_digest, + keylist, + ) + + def response(self, response, content): + challenge = _parse_www_authenticate(response, 'www-authenticate').get('hmacdigest', {}) + if challenge.get('reason') in ['integrity', 'stale']: + return True + return False + + +class WsseAuthentication(Authentication): + """This is thinly tested and should not be relied upon. + At this time there isn't any third party server to test against. + Blogger and TypePad implemented this algorithm at one point + but Blogger has since switched to Basic over HTTPS and + TypePad has implemented it wrong, by never issuing a 401 + challenge but instead requiring your client to telepathically know that + their endpoint is expecting WSSE profile="UsernameToken".""" + def __init__(self, credentials, host, request_uri, headers, response, content, http): + Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) + + def request(self, method, request_uri, headers, content): + """Modify the request headers to add the appropriate + Authorization header.""" + headers['Authorization'] = 'WSSE profile="UsernameToken"' + iso_now = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) + cnonce = _cnonce() + password_digest = _wsse_username_token(cnonce, iso_now, self.credentials[1]) + headers['X-WSSE'] = 'UsernameToken Username="%s", PasswordDigest="%s", Nonce="%s", Created="%s"' % ( + self.credentials[0], + password_digest, + cnonce, + iso_now) + +class GoogleLoginAuthentication(Authentication): + def __init__(self, credentials, host, request_uri, headers, response, content, http): + from urllib import urlencode + Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http) + + auth = dict(Email=credentials[0], Passwd=credentials[1], service='cl', source=headers['user-agent']) + resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'}) + lines = content.split('\n') + d = dict([tuple(line.split("=", 1)) for line in lines if line]) + if resp.status == 403: + self.Auth = "" + else: + self.Auth = d['Auth'] + + def request(self, method, request_uri, headers, content): + """Modify the request headers to add the appropriate + Authorization header.""" + headers['authorization'] = 'GoogleLogin Auth=' + self.Auth + + +AUTH_SCHEME_CLASSES = { + "basic": BasicAuthentication, + "wsse": WsseAuthentication, + "digest": DigestAuthentication, + "hmacdigest": HmacDigestAuthentication, + "googlelogin": GoogleLoginAuthentication +} + +AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"] + + +class FileCache: + """Uses a local directory as a store for cached files. + Not really safe to use if multiple threads or processes are going to + be running on the same cache. + """ + def __init__(self, cache): + self.cache = cache + if not os.path.exists(cache): + os.makedirs(self.cache) + + def get(self, key): + retval = None + cacheFullPath = os.path.join(self.cache, key) + try: + f = file(cacheFullPath, "r") + retval = f.read() + f.close() + except: + pass + return retval + + def set(self, key, value): + cacheFullPath = os.path.join(self.cache, key) + f = file(cacheFullPath, "w") + f.write(value) + f.close() + + def delete(self, key): + cacheFullPath = os.path.join(self.cache, key) + if os.path.exists(cacheFullPath): + os.remove(cacheFullPath) + +class Http: + """An HTTP client that handles all + methods, caching, ETags, compression, + HTTPS, Basic, Digest, WSSE, etc. + """ + def __init__(self, cache=None): + # Map domain name to an httplib connection + self.connections = {} + # The location of the cache, for now a directory + # where cached responses are held. + if cache and isinstance(cache, str): + self.cache = FileCache(cache) + else: + self.cache = cache + + # tuples of name, password + self.credentials = [] + + # authorization objects + self.authorizations = [] + + self.follow_all_redirects = False + + self.ignore_etag = False + + def _auth_from_challenge(self, host, request_uri, headers, response, content): + """A generator that creates Authorization objects + that can be applied to requests. + """ + challenges = _parse_www_authenticate(response, 'www-authenticate') + for cred in self.credentials: + for scheme in AUTH_SCHEME_ORDER: + if challenges.has_key(scheme): + yield AUTH_SCHEME_CLASSES[scheme](cred, host, request_uri, headers, response, content, self) + + def add_credentials(self, name, password): + """Add a name and password that will be used + any time a request requires authentication.""" + self.credentials.append((name, password)) + + def clear_credentials(self): + """Remove all the names and passwords + that are used for authentication""" + self.credentials = [] + self.authorizations = [] + + def _conn_request(self, conn, request_uri, method, body, headers): + for i in range(2): + try: + conn.request(method, request_uri, body, headers) + response = conn.getresponse() + except: + if i == 0: + conn.close() + conn.connect() + continue + else: + raise + else: + content = response.read() + response = Response(response) + content = _decompressContent(response, content) + + break; + return (response, content) + + + def _request(self, conn, host, absolute_uri, request_uri, method, body, headers, redirections, cachekey): + """Do the actual request using the connection object + and also follow one level of redirects if necessary""" + + auths = [(auth.depth(request_uri), auth) for auth in self.authorizations if auth.inscope(host, request_uri)] + auth = auths and sorted(auths)[0][1] or None + if auth: + auth.request(method, request_uri, headers, body) + + (response, content) = self._conn_request(conn, request_uri, method, body, headers) + + if auth: + if auth.response(response, body): + auth.request(method, request_uri, headers, body) + (response, content) = self._conn_request(conn, request_uri, method, body, headers ) + response._stale_digest = 1 + + if response.status == 401: + for authorization in self._auth_from_challenge(host, request_uri, headers, response, content): + authorization.request(method, request_uri, headers, body) + (response, content) = self._conn_request(conn, request_uri, method, body, headers, ) + if response.status != 401: + self.authorizations.append(authorization) + authorization.response(response, body) + break + + if (self.follow_all_redirects or method in ["GET", "HEAD"]) or response.status == 303: + if response.status in [300, 301, 302, 303, 307]: + # Pick out the location header and basically start from the beginning + # remembering first to strip the ETag header and decrement our 'depth' + if redirections: + if not response.has_key('location') and response.status != 300: + raise RedirectMissingLocation( _("Redirected but the response is missing a Location: header.")) + if response.status == 301 and method in ["GET", "HEAD"]: + response['-x-permanent-redirect-url'] = response['location'] + _updateCache(headers, response, content, self.cache, cachekey) + if headers.has_key('if-none-match'): + del headers['if-none-match'] + if headers.has_key('if-modified-since'): + del headers['if-modified-since'] + if response.has_key('location'): + old_response = copy.deepcopy(response) + location = response['location'] + (scheme, authority, path, query, fragment) = parse_uri(location) + if authority == None: + location = urlparse.urljoin(absolute_uri, location) + redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method + (response, content) = self.request(location, redirect_method, body=body, headers = headers, redirections = redirections - 1) + response.previous = old_response + else: + raise RedirectLimit( _("Redirected more times than rediection_limit allows.")) + elif response.status in [200, 203] and method == "GET": + # Don't cache 206's since we aren't going to handle byte range requests + _updateCache(headers, response, content, self.cache, cachekey) + + return (response, content) + + def request(self, uri, method="GET", body=None, headers=None, redirections=DEFAULT_MAX_REDIRECTS): + """ Performs a single HTTP request. +The 'uri' is the URI of the HTTP resource and can begin +with either 'http' or 'https'. The value of 'uri' must be an absolute URI. + +The 'method' is the HTTP method to perform, such as GET, POST, DELETE, etc. +There is no restriction on the methods allowed. + +The 'body' is the entity body to be sent with the request. It is a string +object. + +Any extra headers that are to be sent with the request should be provided in the +'headers' dictionary. + +The maximum number of redirect to follow before raising an +exception is 'redirections. The default is 5. + +The return value is a tuple of (response, content), the first +being and instance of the 'Response' class, the second being +a string that contains the response entity body. + """ + if headers is None: + headers = {} + else: + headers = _normalize_headers(headers) + + if not headers.has_key('user-agent'): + headers['user-agent'] = "Python-httplib2/%s" % __version__ + + (scheme, authority, path, query, fragment) = parse_uri(uri) + authority = authority.lower() + if not path: + path = "/" + # Could do syntax based normalization of the URI before + # computing the digest. See Section 6.2.2 of Std 66. + request_uri = query and "?".join([path, query]) or path + defrag_uri = scheme + "://" + authority + request_uri + + if not self.connections.has_key(scheme+":"+authority): + connection_type = (scheme == 'https') and httplib.HTTPSConnection or httplib.HTTPConnection + conn = self.connections[scheme+":"+authority] = connection_type(authority) + conn.set_debuglevel(debuglevel) + else: + conn = self.connections[scheme+":"+authority] + + if method in ["GET", "HEAD"] and 'range' not in headers: + headers['accept-encoding'] = 'compress, gzip' + + info = rfc822.Message(StringIO.StringIO("")) + cached_value = None + if self.cache: + cachekey = md5.new(defrag_uri).hexdigest() + cached_value = self.cache.get(cachekey) + if cached_value: + #try: + f = StringIO.StringIO(cached_value) + info = rfc822.Message(f) + content = cached_value.split('\r\n\r\n', 1)[1] + #except: + # self.cache.delete(cachekey) + # cachekey = None + else: + cachekey = None + + if method in ["PUT"] and self.cache and info.has_key('etag') and not self.ignore_etag: + # http://www.w3.org/1999/04/Editing/ + headers['if-match'] = info['etag'] + + if method not in ["GET", "HEAD"] and self.cache and cachekey: + # RFC 2616 Section 13.10 + self.cache.delete(cachekey) + + if method in ["GET", "HEAD"] and self.cache and 'range' not in headers: + if info.has_key('-x-permanent-redirect-url'): + # Should cached permanent redirects be counted in our redirection count? For now, yes. + (response, new_content) = self.request(info['-x-permanent-redirect-url'], "GET", headers = headers, redirections = redirections - 1) + response.previous = Response(info) + response.previous.fromcache = True + else: + # Determine our course of action: + # Is the cached entry fresh or stale? + # Has the client requested a non-cached response? + # + # There seems to be three possible answers: + # 1. [FRESH] Return the cache entry w/o doing a GET + # 2. [STALE] Do the GET (but add in cache validators if available) + # 3. [TRANSPARENT] Do a GET w/o any cache validators (Cache-Control: no-cache) on the request + entry_disposition = _entry_disposition(info, headers) + + if entry_disposition == "FRESH": + if not cached_value: + info['status'] = '504' + content = "" + response = Response(info) + if cached_value: + response.fromcache = True + return (response, content) + + if entry_disposition == "STALE": + if info.has_key('etag') and not self.ignore_etag: + headers['if-none-match'] = info['etag'] + if info.has_key('last-modified'): + headers['if-modified-since'] = info['last-modified'] + elif entry_disposition == "TRANSPARENT": + pass + + (response, new_content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey) + + if response.status == 304 and method == "GET": + # Rewrite the cache entry with the new end-to-end headers + # Take all headers that are in response + # and overwrite their values in info. + # unless they are hop-by-hop, or are listed in the connection header. + + for key in _get_end2end_headers(response): + info[key] = response[key] + merged_response = Response(info) + if hasattr(response, "_stale_digest"): + merged_response._stale_digest = response._stale_digest + _updateCache(headers, merged_response, content, self.cache, cachekey) + response = merged_response + response.status = 200 + response.fromcache = True + + elif response.status == 200: + content = new_content + else: + self.cache.delete(cachekey) + content = new_content + else: + (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey) + return (response, content) + + + +class Response(dict): + """An object more like rfc822.Message than httplib.HTTPResponse.""" + + """Is this response from our local cache""" + fromcache = False + + """HTTP protocol version used by server. 10 for HTTP/1.0, 11 for HTTP/1.1. """ + version = 11 + + "Status code returned by server. " + status = 200 + + """Reason phrase returned by server.""" + reason = "Ok" + + previous = None + + def __init__(self, info): + # info is either an rfc822.Message or + # an httplib.HTTPResponse object. + if isinstance(info, httplib.HTTPResponse): + for key, value in info.getheaders(): + self[key] = value + self.status = info.status + self['status'] = str(self.status) + self.reason = info.reason + self.version = info.version + elif isinstance(info, rfc822.Message): + for key, value in info.items(): + self[key] = value + self.status = int(self['status']) + + From 4569dba5e2184d17200b73b4837c395a4ff7708b Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Sat, 4 Nov 2006 11:31:52 -0500 Subject: [PATCH 07/39] Moved httplib2 directory --- {httplib2 => planet/httplib2}/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename {httplib2 => planet/httplib2}/__init__.py (100%) diff --git a/httplib2/__init__.py b/planet/httplib2/__init__.py similarity index 100% rename from httplib2/__init__.py rename to planet/httplib2/__init__.py From 681eb117f8a74e8d321647e57ffcfe43b6f8a7dc Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Sat, 4 Nov 2006 16:58:03 -0500 Subject: [PATCH 08/39] Fixed one bug with passing non-2xx responses to feedparser. Also added a try/except to help debug the problem with 'content' undefined in httplib2. --- planet/httplib2/__init__.py | 21 +++++++++++++-------- planet/spider.py | 15 +++++++++++---- 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/planet/httplib2/__init__.py b/planet/httplib2/__init__.py index 83421b4..73e9bf7 100644 --- a/planet/httplib2/__init__.py +++ b/planet/httplib2/__init__.py @@ -35,6 +35,7 @@ import random import sha import hmac from gettext import gettext as _ +from socket import gaierror __all__ = ['Http', 'Response', 'HttpLib2Error', 'RedirectMissingLocation', 'RedirectLimit', 'FailedToDecompressContent', @@ -704,13 +705,13 @@ a string that contains the response entity body. cachekey = md5.new(defrag_uri).hexdigest() cached_value = self.cache.get(cachekey) if cached_value: - #try: - f = StringIO.StringIO(cached_value) - info = rfc822.Message(f) - content = cached_value.split('\r\n\r\n', 1)[1] - #except: - # self.cache.delete(cachekey) - # cachekey = None + try: + f = StringIO.StringIO(cached_value) + info = rfc822.Message(f) + content = cached_value.split('\r\n\r\n', 1)[1] + except: + self.cache.delete(cachekey) + cachekey = None else: cachekey = None @@ -769,7 +770,11 @@ a string that contains the response entity body. merged_response = Response(info) if hasattr(response, "_stale_digest"): merged_response._stale_digest = response._stale_digest - _updateCache(headers, merged_response, content, self.cache, cachekey) + try: + _updateCache(headers, merged_response, content, self.cache, cachekey) + except: + print locals() + raise response = merged_response response.status = 200 response.fromcache = True diff --git a/planet/spider.py b/planet/spider.py index 3bf0d7b..41b2d57 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -334,6 +334,7 @@ def spiderPlanet(only_if_new = False): from Queue import Queue, Empty from threading import Thread import httplib2 + from socket import gaierror work_queue = Queue() awaiting_parsing = Queue() @@ -350,8 +351,11 @@ def spiderPlanet(only_if_new = False): # is empty which will terminate the thread. uri = work_queue.get(block=False) log.info("Fetching %s via %d", uri, thread_index) - (resp, content) = h.request(uri) - awaiting_parsing.put(block=True, item=(resp, content, uri)) + try: + (resp, content) = h.request(uri) + awaiting_parsing.put(block=True, item=(resp, content, uri)) + except gaierror: + log.error("Fail to resolve server name %s via %d", uri, thread_index) except Empty, e: log.info("Thread %d finished", thread_index) pass @@ -373,8 +377,11 @@ def spiderPlanet(only_if_new = False): try: (resp_headers, content, uri) = item if not resp_headers.fromcache: - log.info("Parsing pre-fetched %s", uri) - spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + if resp_headers.status < 300: + log.info("Parsing pre-fetched %s", uri) + spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) + else: + log.error("Status code %d from %s", resp_headers.status, uri) except Exception, e: import sys, traceback type, value, tb = sys.exc_info() From b58d815a0d757d4c5bc22e77bcc4f12092d4f44b Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Sat, 4 Nov 2006 17:19:59 -0500 Subject: [PATCH 09/39] Fixed very weird bug where we would break on relative 301's, but *only* on the second attempt, i.e. only when reading the cache 301 redirect --- planet/httplib2/__init__.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/planet/httplib2/__init__.py b/planet/httplib2/__init__.py index 73e9bf7..f8bb205 100644 --- a/planet/httplib2/__init__.py +++ b/planet/httplib2/__init__.py @@ -627,6 +627,12 @@ class Http: if redirections: if not response.has_key('location') and response.status != 300: raise RedirectMissingLocation( _("Redirected but the response is missing a Location: header.")) + # Fix-up relative redirects (which violate an RFC 2616 MUST) + if response.has_key('location'): + location = response['location'] + (scheme, authority, path, query, fragment) = parse_uri(location) + if authority == None: + response['location'] = urlparse.urljoin(absolute_uri, location) if response.status == 301 and method in ["GET", "HEAD"]: response['-x-permanent-redirect-url'] = response['location'] _updateCache(headers, response, content, self.cache, cachekey) @@ -635,11 +641,8 @@ class Http: if headers.has_key('if-modified-since'): del headers['if-modified-since'] if response.has_key('location'): - old_response = copy.deepcopy(response) location = response['location'] - (scheme, authority, path, query, fragment) = parse_uri(location) - if authority == None: - location = urlparse.urljoin(absolute_uri, location) + old_response = copy.deepcopy(response) redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method (response, content) = self.request(location, redirect_method, body=body, headers = headers, redirections = redirections - 1) response.previous = old_response From 4b9e85e4f7bddc6084c97ccddfba3dc56591131d Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Sun, 5 Nov 2006 22:00:05 -0500 Subject: [PATCH 10/39] reverted feedparser to HEAD, i.e. it doesn't need changes to be used with an external http client. Made the changes as suggested by Sam on how to get httplib2 and feedparser working together. Added a 'dict' attribute to httplib2.Response to get it to work as feedparser expects. --- planet/feedparser.py | 9 ++------- planet/httplib2/__init__.py | 10 +++++++++- planet/spider.py | 21 ++++++++++++++++++--- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/planet/feedparser.py b/planet/feedparser.py index d24d82a..1860539 100755 --- a/planet/feedparser.py +++ b/planet/feedparser.py @@ -11,7 +11,7 @@ Recommended: Python 2.3 or later Recommended: CJKCodecs and iconv_codec """ -__version__ = "4.2-pre-" + "$Revision: 1.145 $"[11:16] + "-cvs" +__version__ = "4.2-pre-" + "$Revision: 1.146 $"[11:16] + "-cvs" __license__ = """Copyright (c) 2002-2006, Mark Pilgrim, All rights reserved. Redistribution and use in source and binary forms, with or without modification, @@ -3254,7 +3254,7 @@ def _stripDoctype(data): return version, data, dict(replacement and safe_pattern.findall(replacement)) -def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[], resp_headers=None): +def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[]): '''Parse a feed from a URL, file, stream, or string''' result = FeedParserDict() result['feed'] = FeedParserDict() @@ -3263,9 +3263,6 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer result['bozo'] = 0 if type(handlers) == types.InstanceType: handlers = [handlers] - if resp_headers: - f = None - data = url_file_stream_or_string try: f = _open_resource(url_file_stream_or_string, etag, modified, agent, referrer, handlers) data = f.read() @@ -3310,8 +3307,6 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer result['status'] = f.status if hasattr(f, 'headers'): result['headers'] = f.headers.dict - if resp_headers: - result['headers'] = resp_headers if hasattr(f, 'close'): f.close() diff --git a/planet/httplib2/__init__.py b/planet/httplib2/__init__.py index f8bb205..b96130b 100644 --- a/planet/httplib2/__init__.py +++ b/planet/httplib2/__init__.py @@ -715,6 +715,7 @@ a string that contains the response entity body. except: self.cache.delete(cachekey) cachekey = None + cached_value = None else: cachekey = None @@ -726,7 +727,7 @@ a string that contains the response entity body. # RFC 2616 Section 13.10 self.cache.delete(cachekey) - if method in ["GET", "HEAD"] and self.cache and 'range' not in headers: + if cached_value and method in ["GET", "HEAD"] and self.cache and 'range' not in headers: if info.has_key('-x-permanent-redirect-url'): # Should cached permanent redirects be counted in our redirection count? For now, yes. (response, new_content) = self.request(info['-x-permanent-redirect-url'], "GET", headers = headers, redirections = redirections - 1) @@ -825,4 +826,11 @@ class Response(dict): self[key] = value self.status = int(self['status']) + def __getattr__(self, name): + if name == 'dict': + return self + else: + raise AttributeError, name + + diff --git a/planet/spider.py b/planet/spider.py index 41b2d57..a12cf95 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -8,6 +8,7 @@ import time, calendar, re, os, urlparse from xml.dom import minidom # Planet modules import planet, config, feedparser, reconstitute, shell +from StringIO import StringIO # Regular expressions to sanitise cache filenames re_url_scheme = re.compile(r'^\w+:/*(\w+:|www\.)?') @@ -140,7 +141,11 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): # read feed itself if content: - data = feedparser.parse(content, resp_headers=resp_headers) + f = StringIO(content) + setattr(f, 'url', feed) + if resp_headers: + setattr(f, 'headers', resp_headers) + data = feedparser.parse(f) else: modified = None try: @@ -334,7 +339,7 @@ def spiderPlanet(only_if_new = False): from Queue import Queue, Empty from threading import Thread import httplib2 - from socket import gaierror + from socket import gaierror, error work_queue = Queue() awaiting_parsing = Queue() @@ -356,6 +361,16 @@ def spiderPlanet(only_if_new = False): awaiting_parsing.put(block=True, item=(resp, content, uri)) except gaierror: log.error("Fail to resolve server name %s via %d", uri, thread_index) + except error, e: + log.error("HTTP Error: %s in thread-%d", str(e), thread_index) + except Exception, e: + import sys, traceback + type, value, tb = sys.exc_info() + log.error('Error processing %s', uri) + for line in (traceback.format_exception_only(type, value) + + traceback.format_tb(tb)): + log.error(line.rstrip()) + except Empty, e: log.info("Thread %d finished", thread_index) pass @@ -385,7 +400,7 @@ def spiderPlanet(only_if_new = False): except Exception, e: import sys, traceback type, value, tb = sys.exc_info() - log.error('Error processing %s', feed) + log.error('Error processing %s', uri) for line in (traceback.format_exception_only(type, value) + traceback.format_tb(tb)): log.error(line.rstrip()) From 56a447e1beda4ec67595b5407af3c8df4e87498c Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Sun, 5 Nov 2006 22:48:30 -0500 Subject: [PATCH 11/39] Updated to latest httplib2. Now deleting 'content-encoding' header from the httplib2 response before passing to feedparser --- planet/httplib2/__init__.py | 5 +++-- planet/spider.py | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/planet/httplib2/__init__.py b/planet/httplib2/__init__.py index b96130b..2941c73 100644 --- a/planet/httplib2/__init__.py +++ b/planet/httplib2/__init__.py @@ -16,7 +16,7 @@ __contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)", "Jonathan Feinberg", "Blair Zajac"] __license__ = "MIT" -__version__ = "$Rev: 204 $" +__version__ = "$Rev: 208 $" import re import md5 @@ -232,8 +232,10 @@ def _decompressContent(response, new_content): try: if response.get('content-encoding', None) == 'gzip': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() + response['content-length'] = str(len(content)) if response.get('content-encoding', None) == 'deflate': content = zlib.decompress(content) + response['content-length'] = str(len(content)) except: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding')) @@ -833,4 +835,3 @@ class Response(dict): raise AttributeError, name - diff --git a/planet/spider.py b/planet/spider.py index a12cf95..a438eeb 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -141,9 +141,13 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): # read feed itself if content: + # httplib2 was used to get the content, so prepare a + # proper object to pass to feedparser. f = StringIO(content) setattr(f, 'url', feed) if resp_headers: + if resp_headers.has_key('content-encoding'): + del resp_headers['content-encoding'] setattr(f, 'headers', resp_headers) data = feedparser.parse(f) else: From daec4769c77046b939f73a459bde8576eff34811 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Tue, 7 Nov 2006 13:19:42 -0500 Subject: [PATCH 12/39] Added in support for '-location' in httlib2 responses --- planet/httplib2/__init__.py | 5 ++++- planet/spider.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/planet/httplib2/__init__.py b/planet/httplib2/__init__.py index 2941c73..a92540e 100644 --- a/planet/httplib2/__init__.py +++ b/planet/httplib2/__init__.py @@ -16,7 +16,7 @@ __contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)", "Jonathan Feinberg", "Blair Zajac"] __license__ = "MIT" -__version__ = "$Rev: 208 $" +__version__ = "$Rev: 209 $" import re import md5 @@ -637,6 +637,7 @@ class Http: response['location'] = urlparse.urljoin(absolute_uri, location) if response.status == 301 and method in ["GET", "HEAD"]: response['-x-permanent-redirect-url'] = response['location'] + response['-location'] = absolute_uri _updateCache(headers, response, content, self.cache, cachekey) if headers.has_key('if-none-match'): del headers['if-none-match'] @@ -645,6 +646,7 @@ class Http: if response.has_key('location'): location = response['location'] old_response = copy.deepcopy(response) + old_response['-location'] = absolute_uri redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method (response, content) = self.request(location, redirect_method, body=body, headers = headers, redirections = redirections - 1) response.previous = old_response @@ -652,6 +654,7 @@ class Http: raise RedirectLimit( _("Redirected more times than rediection_limit allows.")) elif response.status in [200, 203] and method == "GET": # Don't cache 206's since we aren't going to handle byte range requests + response['-location'] = absolute_uri _updateCache(headers, response, content, self.cache, cachekey) return (response, content) diff --git a/planet/spider.py b/planet/spider.py index a438eeb..f7a0d28 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -144,7 +144,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): # httplib2 was used to get the content, so prepare a # proper object to pass to feedparser. f = StringIO(content) - setattr(f, 'url', feed) + setattr(f, 'url', resp_headers.get('-location', feed)) if resp_headers: if resp_headers.has_key('content-encoding'): del resp_headers['content-encoding'] From 45f0f92110e7fb40a863923c36a63743148a2a35 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Tue, 7 Nov 2006 22:39:35 -0500 Subject: [PATCH 13/39] Switched to standard socket timeouts. http://mail.python.org/pipermail/python-list/2005-May/281697.html --- planet/__init__.py | 21 -- planet/spider.py | 4 +- planet/timeoutsocket.py | 424 ---------------------------------------- 3 files changed, 2 insertions(+), 447 deletions(-) delete mode 100644 planet/timeoutsocket.py diff --git a/planet/__init__.py b/planet/__init__.py index 6be34ed..166acff 100644 --- a/planet/__init__.py +++ b/planet/__init__.py @@ -30,25 +30,4 @@ def getLogger(level, format): return logger -def setTimeout(timeout): - """ time out rather than hang forever on ultra-slow servers.""" - if timeout: - try: - timeout = float(timeout) - except: - logger.warning("Timeout set to invalid value '%s', skipping", timeout) - timeout = None - if timeout: - try: - from planet import timeoutsocket - timeoutsocket.setDefaultSocketTimeout(timeout) - logger.info("Socket timeout set to %d seconds", timeout) - except ImportError: - import socket - if hasattr(socket, 'setdefaulttimeout'): - logger.debug("timeoutsocket not found, using python function") - socket.setdefaulttimeout(timeout) - logger.info("Socket timeout set to %d seconds", timeout) - else: - logger.error("Unable to set timeout to %d seconds", timeout) diff --git a/planet/spider.py b/planet/spider.py index f7a0d28..f4badf2 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -7,7 +7,7 @@ and write each as a set of entries in a cache directory. import time, calendar, re, os, urlparse from xml.dom import minidom # Planet modules -import planet, config, feedparser, reconstitute, shell +import planet, config, feedparser, reconstitute, shell, socket from StringIO import StringIO # Regular expressions to sanitise cache filenames @@ -338,6 +338,7 @@ def spiderPlanet(only_if_new = False): global index index = True + socket.setdefaulttimeout(float(config.feed_timeout())) if int(config.spider_threads()): from Queue import Queue, Empty @@ -414,7 +415,6 @@ def spiderPlanet(only_if_new = False): log.info("Finished threaded part of processing.") - planet.setTimeout(config.feed_timeout()) # Process non-HTTP uris if we are threading, otherwise process *all* uris here. unthreaded_work_queue = [uri for uri in config.subscriptions() if not int(config.spider_threads()) or not _is_http_uri(uri)] for feed in unthreaded_work_queue: diff --git a/planet/timeoutsocket.py b/planet/timeoutsocket.py deleted file mode 100644 index b698df0..0000000 --- a/planet/timeoutsocket.py +++ /dev/null @@ -1,424 +0,0 @@ - -#### -# Copyright 2000,2001 by Timothy O'Malley -# -# All Rights Reserved -# -# Permission to use, copy, modify, and distribute this software -# and its documentation for any purpose and without fee is hereby -# granted, provided that the above copyright notice appear in all -# copies and that both that copyright notice and this permission -# notice appear in supporting documentation, and that the name of -# Timothy O'Malley not be used in advertising or publicity -# pertaining to distribution of the software without specific, written -# prior permission. -# -# Timothy O'Malley DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS -# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY -# AND FITNESS, IN NO EVENT SHALL Timothy O'Malley BE LIABLE FOR -# ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, -# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS -# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR -# PERFORMANCE OF THIS SOFTWARE. -# -#### - -"""Timeout Socket - -This module enables a timeout mechanism on all TCP connections. It -does this by inserting a shim into the socket module. After this module -has been imported, all socket creation goes through this shim. As a -result, every TCP connection will support a timeout. - -The beauty of this method is that it immediately and transparently -enables the entire python library to support timeouts on TCP sockets. -As an example, if you wanted to SMTP connections to have a 20 second -timeout: - - import timeoutsocket - import smtplib - timeoutsocket.setDefaultSocketTimeout(20) - - -The timeout applies to the socket functions that normally block on -execution: read, write, connect, and accept. If any of these -operations exceeds the specified timeout, the exception Timeout -will be raised. - -The default timeout value is set to None. As a result, importing -this module does not change the default behavior of a socket. The -timeout mechanism only activates when the timeout has been set to -a numeric value. (This behavior mimics the behavior of the -select.select() function.) - -This module implements two classes: TimeoutSocket and TimeoutFile. - -The TimeoutSocket class defines a socket-like object that attempts to -avoid the condition where a socket may block indefinitely. The -TimeoutSocket class raises a Timeout exception whenever the -current operation delays too long. - -The TimeoutFile class defines a file-like object that uses the TimeoutSocket -class. When the makefile() method of TimeoutSocket is called, it returns -an instance of a TimeoutFile. - -Each of these objects adds two methods to manage the timeout value: - - get_timeout() --> returns the timeout of the socket or file - set_timeout() --> sets the timeout of the socket or file - - -As an example, one might use the timeout feature to create httplib -connections that will timeout after 30 seconds: - - import timeoutsocket - import httplib - H = httplib.HTTP("www.python.org") - H.sock.set_timeout(30) - -Note: When used in this manner, the connect() routine may still -block because it happens before the timeout is set. To avoid -this, use the 'timeoutsocket.setDefaultSocketTimeout()' function. - -Good Luck! - -""" - -__version__ = "$Revision: 1.1.1.1 $" -__author__ = "Timothy O'Malley " - -# -# Imports -# -import select, string -import socket -if not hasattr(socket, "_no_timeoutsocket"): - _socket = socket.socket -else: - _socket = socket._no_timeoutsocket - - -# -# Set up constants to test for Connected and Blocking operations. -# We delete 'os' and 'errno' to keep our namespace clean(er). -# Thanks to Alex Martelli and G. Li for the Windows error codes. -# -import os -if os.name == "nt": - _IsConnected = ( 10022, 10056 ) - _ConnectBusy = ( 10035, ) - _AcceptBusy = ( 10035, ) -else: - import errno - _IsConnected = ( errno.EISCONN, ) - _ConnectBusy = ( errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK ) - _AcceptBusy = ( errno.EAGAIN, errno.EWOULDBLOCK ) - del errno -del os - - -# -# Default timeout value for ALL TimeoutSockets -# -_DefaultTimeout = None -def setDefaultSocketTimeout(timeout): - global _DefaultTimeout - _DefaultTimeout = timeout -def getDefaultSocketTimeout(): - return _DefaultTimeout - -# -# Exceptions for socket errors and timeouts -# -Error = socket.error -class Timeout(Exception): - pass - - -# -# Factory function -# -from socket import AF_INET, SOCK_STREAM -def timeoutsocket(family=AF_INET, type=SOCK_STREAM, proto=None): - if family != AF_INET or type != SOCK_STREAM: - if proto: - return _socket(family, type, proto) - else: - return _socket(family, type) - return TimeoutSocket( _socket(family, type), _DefaultTimeout ) -# end timeoutsocket - -# -# The TimeoutSocket class definition -# -class TimeoutSocket: - """TimeoutSocket object - Implements a socket-like object that raises Timeout whenever - an operation takes too long. - The definition of 'too long' can be changed using the - set_timeout() method. - """ - - _copies = 0 - _blocking = 1 - - def __init__(self, sock, timeout): - self._sock = sock - self._timeout = timeout - # end __init__ - - def __getattr__(self, key): - return getattr(self._sock, key) - # end __getattr__ - - def get_timeout(self): - return self._timeout - # end set_timeout - - def set_timeout(self, timeout=None): - self._timeout = timeout - # end set_timeout - - def setblocking(self, blocking): - self._blocking = blocking - return self._sock.setblocking(blocking) - # end set_timeout - - def connect_ex(self, addr): - errcode = 0 - try: - self.connect(addr) - except Error, why: - errcode = why[0] - return errcode - # end connect_ex - - def connect(self, addr, port=None, dumbhack=None): - # In case we were called as connect(host, port) - if port != None: addr = (addr, port) - - # Shortcuts - sock = self._sock - timeout = self._timeout - blocking = self._blocking - - # First, make a non-blocking call to connect - try: - sock.setblocking(0) - sock.connect(addr) - sock.setblocking(blocking) - return - except Error, why: - # Set the socket's blocking mode back - sock.setblocking(blocking) - - # If we are not blocking, re-raise - if not blocking: - raise - - # If we are already connected, then return success. - # If we got a genuine error, re-raise it. - errcode = why[0] - if dumbhack and errcode in _IsConnected: - return - elif errcode not in _ConnectBusy: - raise - - # Now, wait for the connect to happen - # ONLY if dumbhack indicates this is pass number one. - # If select raises an error, we pass it on. - # Is this the right behavior? - if not dumbhack: - r,w,e = select.select([], [sock], [], timeout) - if w: - return self.connect(addr, dumbhack=1) - - # If we get here, then we should raise Timeout - raise Timeout("Attempted connect to %s timed out." % str(addr) ) - # end connect - - def accept(self, dumbhack=None): - # Shortcuts - sock = self._sock - timeout = self._timeout - blocking = self._blocking - - # First, make a non-blocking call to accept - # If we get a valid result, then convert the - # accept'ed socket into a TimeoutSocket. - # Be carefult about the blocking mode of ourselves. - try: - sock.setblocking(0) - newsock, addr = sock.accept() - sock.setblocking(blocking) - timeoutnewsock = self.__class__(newsock, timeout) - timeoutnewsock.setblocking(blocking) - return (timeoutnewsock, addr) - except Error, why: - # Set the socket's blocking mode back - sock.setblocking(blocking) - - # If we are not supposed to block, then re-raise - if not blocking: - raise - - # If we got a genuine error, re-raise it. - errcode = why[0] - if errcode not in _AcceptBusy: - raise - - # Now, wait for the accept to happen - # ONLY if dumbhack indicates this is pass number one. - # If select raises an error, we pass it on. - # Is this the right behavior? - if not dumbhack: - r,w,e = select.select([sock], [], [], timeout) - if r: - return self.accept(dumbhack=1) - - # If we get here, then we should raise Timeout - raise Timeout("Attempted accept timed out.") - # end accept - - def send(self, data, flags=0): - sock = self._sock - if self._blocking: - r,w,e = select.select([],[sock],[], self._timeout) - if not w: - raise Timeout("Send timed out") - return sock.send(data, flags) - # end send - - def recv(self, bufsize, flags=0): - sock = self._sock - if self._blocking: - r,w,e = select.select([sock], [], [], self._timeout) - if not r: - raise Timeout("Recv timed out") - return sock.recv(bufsize, flags) - # end recv - - def makefile(self, flags="r", bufsize=-1): - self._copies = self._copies +1 - return TimeoutFile(self, flags, bufsize) - # end makefile - - def close(self): - if self._copies <= 0: - self._sock.close() - else: - self._copies = self._copies -1 - # end close - -# end TimeoutSocket - - -class TimeoutFile: - """TimeoutFile object - Implements a file-like object on top of TimeoutSocket. - """ - - def __init__(self, sock, mode="r", bufsize=4096): - self._sock = sock - self._bufsize = 4096 - if bufsize > 0: self._bufsize = bufsize - if not hasattr(sock, "_inqueue"): self._sock._inqueue = "" - - # end __init__ - - def __getattr__(self, key): - return getattr(self._sock, key) - # end __getattr__ - - def close(self): - self._sock.close() - self._sock = None - # end close - - def write(self, data): - self.send(data) - # end write - - def read(self, size=-1): - _sock = self._sock - _bufsize = self._bufsize - while 1: - datalen = len(_sock._inqueue) - if datalen >= size >= 0: - break - bufsize = _bufsize - if size > 0: - bufsize = min(bufsize, size - datalen ) - buf = self.recv(bufsize) - if not buf: - break - _sock._inqueue = _sock._inqueue + buf - data = _sock._inqueue - _sock._inqueue = "" - if size > 0 and datalen > size: - _sock._inqueue = data[size:] - data = data[:size] - return data - # end read - - def readline(self, size=-1): - _sock = self._sock - _bufsize = self._bufsize - while 1: - idx = string.find(_sock._inqueue, "\n") - if idx >= 0: - break - datalen = len(_sock._inqueue) - if datalen >= size >= 0: - break - bufsize = _bufsize - if size > 0: - bufsize = min(bufsize, size - datalen ) - buf = self.recv(bufsize) - if not buf: - break - _sock._inqueue = _sock._inqueue + buf - - data = _sock._inqueue - _sock._inqueue = "" - if idx >= 0: - idx = idx + 1 - _sock._inqueue = data[idx:] - data = data[:idx] - elif size > 0 and datalen > size: - _sock._inqueue = data[size:] - data = data[:size] - return data - # end readline - - def readlines(self, sizehint=-1): - result = [] - data = self.read() - while data: - idx = string.find(data, "\n") - if idx >= 0: - idx = idx + 1 - result.append( data[:idx] ) - data = data[idx:] - else: - result.append( data ) - data = "" - return result - # end readlines - - def flush(self): pass - -# end TimeoutFile - - -# -# Silently replace the socket() builtin function with -# our timeoutsocket() definition. -# -if not hasattr(socket, "_no_timeoutsocket"): - socket._no_timeoutsocket = socket.socket - socket.socket = timeoutsocket -del socket -socket = timeoutsocket -# Finis From 0df474c8ff6feda95506b26e5ea6f352a1b518ba Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Tue, 14 Nov 2006 10:28:40 -0500 Subject: [PATCH 14/39] Support backlevel versions of Python --- docs/installation.html | 18 +++++++++++++----- planet/__init__.py | 3 ++- planet/spider.py | 4 ++-- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/installation.html b/docs/installation.html index 9994984..6a668d1 100644 --- a/docs/installation.html +++ b/docs/installation.html @@ -69,7 +69,7 @@ right directory.

Build your own themes, templates, or filters! And share!

-

Mac OS X and Fink Instructions

+

Mac OS X and Fink Instructions

The Fink Project packages @@ -101,12 +101,20 @@ not yet ported to the newer python so Venus will be less featureful. may want to explicitly specify python2.4.

-

Ubuntu Linux (Edgy Eft) instructions

+

Ubuntu Linux (Edgy Eft) instructions

Before starting, issue the following command:

-
    -
  • sudo apt-get install bzr python2.4-librdf
  • -
+ +
sudo apt-get install bzr python2.4-librdf
+ +

Python 2.2 instructions

+ +

If you are running Python 2.2, you may also need to install pyxml. If the +following runs without error, you do not have the problem.

+
python -c "__import__('xml.dom.minidom').dom.minidom.parseString('<entry xml:lang=\"en\"/>')"
+

Installation of pyxml varies by platform. For Ubuntu Linux (Dapper Drake), issue the following command:

+ +
sudo apt-get install python2.2-xml
diff --git a/planet/__init__.py b/planet/__init__.py index 6be34ed..0902bd8 100644 --- a/planet/__init__.py +++ b/planet/__init__.py @@ -16,10 +16,11 @@ def getLogger(level, format): try: import logging + logging.basicConfig(format=format) except: import compat_logging as logging + logging.basicConfig(format=format) - logging.basicConfig(format=format) logging.getLogger().setLevel(logging.getLevelName(level)) logger = logging.getLogger("planet.runner") try: diff --git a/planet/spider.py b/planet/spider.py index aff3884..72f339c 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -260,7 +260,7 @@ def spiderFeed(feed, only_if_new=0): # apply any filters xdoc = reconstitute.reconstitute(data, entry) - output = xdoc.toxml('utf-8') + output = xdoc.toxml().encode('utf-8') xdoc.unlink() for filter in config.filters(feed): output = shell.run(filter, output, mode="filter") @@ -320,7 +320,7 @@ def spiderFeed(feed, only_if_new=0): xdoc=minidom.parseString('''\n''' % planet.xmlns) reconstitute.source(xdoc.documentElement,data.feed,data.bozo,data.version) - write(xdoc.toxml('utf-8'), filename(sources, feed)) + write(xdoc.toxml().encode('utf-8'), filename(sources, feed)) xdoc.unlink() def spiderPlanet(only_if_new = False): From ba25b691ff85ebb602c441e6fc39173971c5a81b Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Tue, 14 Nov 2006 11:05:09 -0500 Subject: [PATCH 15/39] Fix windows regression --- planet/splice.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/planet/splice.py b/planet/splice.py index e50f927..4853619 100644 --- a/planet/splice.py +++ b/planet/splice.py @@ -68,8 +68,8 @@ def splice(): # insert entry information items = 0 for mtime,file in dir: - if index: - base = file.split('/')[-1] + if index != None: + base = os.path.basename(file) if index.has_key(base) and index[base] not in sub_ids: continue try: From 167f0de4da64b1c095759c0b08febcb62ec5474e Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Wed, 15 Nov 2006 07:46:35 -0500 Subject: [PATCH 16/39] More bullet-proofing --- planet/spider.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/planet/spider.py b/planet/spider.py index a41edc8..32e03ce 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -257,9 +257,8 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): # get updated-date either from the entry or the cache (default to now) mtime = None - if not entry.has_key('updated_parsed'): - if entry.has_key('published_parsed'): - entry['updated_parsed'] = entry['published_parsed'] + if not entry.has_key('updated_parsed') or not entry['updated_parsed']: + entry['updated_parsed'] = entry.get('published_parsed',None) if not entry.has_key('updated_parsed'): try: mtime = calendar.timegm(entry.updated_parsed) @@ -270,7 +269,10 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): mtime = os.stat(cache_file).st_mtime except: if data.feed.has_key('updated_parsed'): - mtime = calendar.timegm(data.feed.updated_parsed) + try: + mtime = calendar.timegm(data.feed.updated_parsed) + except: + pass if not mtime or mtime > time.time(): mtime = time.time() entry['updated_parsed'] = time.gmtime(mtime) From ccb5aa4e39fab674cf5456f15f46b15dcbc3c010 Mon Sep 17 00:00:00 2001 From: Harry Fuecks Date: Thu, 16 Nov 2006 16:06:35 +0000 Subject: [PATCH 17/39] Add tests to check entry updated value is preserved --- tests/test_spider.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_spider.py b/tests/test_spider.py index b01eebb..e6ce66f 100644 --- a/tests/test_spider.py +++ b/tests/test_spider.py @@ -59,6 +59,7 @@ class SpiderTest(unittest.TestCase): self.assertEqual(['application/atom+xml'], [link.type for link in data.entries[0].source.links if link.rel=='self']) self.assertEqual('one', data.entries[0].source.planet_name) + self.assertEqual('2006-01-01T00:00:00Z', data.entries[0].updated) self.assertEqual(os.stat(files[2]).st_mtime, calendar.timegm(data.entries[0].updated_parsed)) From bf0c7b736d25d32c724272f4ade2dc3300e75a1f Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Thu, 16 Nov 2006 15:51:27 -0500 Subject: [PATCH 18/39] Fix regression where entry updated was always ignored --- planet/spider.py | 2 +- tests/test_spider.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/planet/spider.py b/planet/spider.py index 32e03ce..4922d76 100644 --- a/planet/spider.py +++ b/planet/spider.py @@ -259,7 +259,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None): mtime = None if not entry.has_key('updated_parsed') or not entry['updated_parsed']: entry['updated_parsed'] = entry.get('published_parsed',None) - if not entry.has_key('updated_parsed'): + if entry.has_key('updated_parsed'): try: mtime = calendar.timegm(entry.updated_parsed) except: diff --git a/tests/test_spider.py b/tests/test_spider.py index e6ce66f..418364c 100644 --- a/tests/test_spider.py +++ b/tests/test_spider.py @@ -59,7 +59,7 @@ class SpiderTest(unittest.TestCase): self.assertEqual(['application/atom+xml'], [link.type for link in data.entries[0].source.links if link.rel=='self']) self.assertEqual('one', data.entries[0].source.planet_name) - self.assertEqual('2006-01-01T00:00:00Z', data.entries[0].updated) + self.assertEqual('2006-01-03T00:00:00Z', data.entries[0].updated) self.assertEqual(os.stat(files[2]).st_mtime, calendar.timegm(data.entries[0].updated_parsed)) From 1ce96ca53b2859fca09f9e214e1f44df90bf3879 Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Thu, 16 Nov 2006 20:18:34 -0500 Subject: [PATCH 19/39] Assign a css-id to each source --- planet/reconstitute.py | 11 +++++++++++ tests/test_spider.py | 1 + themes/asf/index.html.xslt | 3 ++- themes/asf/personalize.js | 3 ++- 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/planet/reconstitute.py b/planet/reconstitute.py index 989a107..6d7f43d 100644 --- a/planet/reconstitute.py +++ b/planet/reconstitute.py @@ -50,6 +50,15 @@ def ncr2c(value): value=unichr(int(value)) return value +nonalpha=re.compile('\W+',re.UNICODE) +def cssid(name): + """ generate a css id from a name """ + try: + name = nonalpha.sub('-',name.decode('utf-8')).lower().encode('utf-8') + except: + name = nonalpha.sub('-',name).lower() + return name.strip('-') + def normalize(text, bozo): """ convert everything to well formed XML """ if text.has_key('type'): @@ -198,6 +207,8 @@ def source(xsource, source, bozo, format): if not bozo == None: source['planet_bozo'] = bozo and 'true' or 'false' # propagate planet inserted information + if source.has_key('planet_name') and not source.has_key('planet_css-id'): + source['planet_css-id'] = cssid(source['planet_name']) for key, value in source.items(): if key.startswith('planet_'): createTextElement(xsource, key.replace('_',':',1), value) diff --git a/tests/test_spider.py b/tests/test_spider.py index 418364c..2bef04a 100644 --- a/tests/test_spider.py +++ b/tests/test_spider.py @@ -91,6 +91,7 @@ class SpiderTest(unittest.TestCase): self.assertEqual(['application/rss+xml'], [link.type for link in data.entries[0].source.links if link.rel=='self']) self.assertEqual('three', data.entries[0].source.author_detail.name) + self.assertEqual('three', data.entries[0].source['planet_css-id']) def test_spiderPlanet(self): config.load(configfile) diff --git a/themes/asf/index.html.xslt b/themes/asf/index.html.xslt index b3d2063..1c7468e 100644 --- a/themes/asf/index.html.xslt +++ b/themes/asf/index.html.xslt @@ -1,4 +1,5 @@ -
+
diff --git a/themes/asf/personalize.js b/themes/asf/personalize.js index 83db3a3..afcec17 100644 --- a/themes/asf/personalize.js +++ b/themes/asf/personalize.js @@ -159,7 +159,8 @@ function findEntries() { var date = localizeDate(span[i]); var parent = span[i]; - while (parent && parent.className != 'news') { + while (parent && + (!parent.className || parent.className.split(' ')[0] != 'news')) { parent = parent.parentNode; } From c337597302b228a617e312e0c2c6d1a10158ff18 Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Thu, 16 Nov 2006 21:46:58 -0500 Subject: [PATCH 20/39] Cleanup --- themes/asf/index.html.xslt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/themes/asf/index.html.xslt b/themes/asf/index.html.xslt index 1c7468e..ca355fe 100644 --- a/themes/asf/index.html.xslt +++ b/themes/asf/index.html.xslt @@ -1,9 +1,9 @@ + xmlns="http://www.w3.org/1999/xhtml" + exclude-result-prefixes="atom planet xhtml"> From 20cb60df7c7e358bed22dcfb08f17eaf1b0e1d50 Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Sun, 19 Nov 2006 11:56:36 -0500 Subject: [PATCH 21/39] Resync with httplib2 --- planet/httplib2/__init__.py | 138 +++++++++++++++++++++++++----------- planet/spider.py | 61 ++++++++++------ 2 files changed, 134 insertions(+), 65 deletions(-) diff --git a/planet/httplib2/__init__.py b/planet/httplib2/__init__.py index 08c87b9..3ee6b4f 100644 --- a/planet/httplib2/__init__.py +++ b/planet/httplib2/__init__.py @@ -1,3 +1,4 @@ +from __future__ import generators """ httplib2 @@ -8,21 +9,22 @@ Requires Python 2.3 or later """ -from __future__ import generators - __author__ = "Joe Gregorio (joe@bitworking.org)" __copyright__ = "Copyright 2006, Joe Gregorio" __contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)", "James Antill", "Xavier Verges Farrero", "Jonathan Feinberg", - "Blair Zajac"] + "Blair Zajac", + "Sam Ruby"] __license__ = "MIT" -__version__ = "$Rev: 209 $" +__version__ = "$Rev: 217 $" import re import md5 -import rfc822 +import email +import email.Utils +import email.Message import StringIO import gzip import zlib @@ -114,6 +116,49 @@ def parse_uri(uri): groups = URI.match(uri).groups() return (groups[1], groups[3], groups[4], groups[6], groups[8]) +def urlnorm(uri): + (scheme, authority, path, query, fragment) = parse_uri(uri) + authority = authority.lower() + scheme = scheme.lower() + if not path: + path = "/" + # Could do syntax based normalization of the URI before + # computing the digest. See Section 6.2.2 of Std 66. + request_uri = query and "?".join([path, query]) or path + defrag_uri = scheme + "://" + authority + request_uri + return scheme, authority, request_uri, defrag_uri + + +# Cache filename construction (original borrowed from Venus http://intertwingly.net/code/venus/) +re_url_scheme = re.compile(r'^\w+://') +re_slash = re.compile(r'[?/:|]+') + +def safename(filename): + """Return a filename suitable for the cache. + + Strips dangerous and common characters to create a filename we + can use to store the cache in. + """ + + try: + if re_url_scheme.match(filename): + if isinstance(filename,str): + filename=filename.decode('utf-8').encode('idna') + else: + filename=filename.encode('idna') + except: + pass + if isinstance(filename,unicode): + filename=filename.encode('utf-8') + filemd5 = md5.new(filename).hexdigest() + filename = re_url_scheme.sub("", filename) + filename = re_slash.sub(",", filename) + + # limit length of filename + if len(filename)>200: + filename=filename[:200] + return ",".join((filename, filemd5)) + NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') def _normalize_headers(headers): return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.iteritems()]) @@ -211,13 +256,13 @@ def _entry_disposition(response_headers, request_headers): elif cc.has_key('only-if-cached'): retval = "FRESH" elif response_headers.has_key('date'): - date = calendar.timegm(rfc822.parsedate_tz(response_headers['date'])) + date = calendar.timegm(email.Utils.parsedate_tz(response_headers['date'])) now = time.time() current_age = max(0, now - date) if cc_response.has_key('max-age'): freshness_lifetime = int(cc_response['max-age']) elif response_headers.has_key('expires'): - expires = rfc822.parsedate_tz(response_headers['expires']) + expires = email.Utils.parsedate_tz(response_headers['expires']) freshness_lifetime = max(0, calendar.timegm(expires) - date) else: freshness_lifetime = 0 @@ -232,12 +277,14 @@ def _entry_disposition(response_headers, request_headers): def _decompressContent(response, new_content): content = new_content try: - if response.get('content-encoding', None) == 'gzip': - content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() - response['content-length'] = str(len(content)) - if response.get('content-encoding', None) == 'deflate': - content = zlib.decompress(content) + encoding = response.get('content-encoding', None) + if encoding in ['gzip', 'deflate']: + if encoding == 'gzip': + content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() + if encoding == 'deflate': + content = zlib.decompress(content) response['content-length'] = str(len(content)) + del response['content-encoding'] except: content = "" raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding')) @@ -250,14 +297,23 @@ def _updateCache(request_headers, response_headers, content, cache, cachekey): if cc.has_key('no-store') or cc_response.has_key('no-store'): cache.delete(cachekey) else: - f = StringIO.StringIO("") - info = rfc822.Message(StringIO.StringIO("")) + info = email.Message.Message() for key, value in response_headers.iteritems(): - info[key] = value - f.write(str(info)) - f.write("\r\n\r\n") - f.write(content) - cache.set(cachekey, f.getvalue()) + if key not in ['status','content-encoding','transfer-encoding']: + info[key] = value + + status = response_headers.status + if status == 304: + status = 200 + + status_header = 'status: %d\r\n' % response_headers.status + + header_str = info.as_string() + + header_str = re.sub("\r(?!\n)|(?0: data.status = 200 - elif data.bozo and data.bozo_exception.__class__.__name__=='Timeout': + elif data.bozo and data.bozo_exception.__class__.__name__.lower()=='timeout': data.status = 408 else: data.status = 500 @@ -380,13 +383,27 @@ def spiderPlanet(only_if_new = False): # is empty which will terminate the thread. uri = work_queue.get(block=False) log.info("Fetching %s via %d", uri, thread_index) + resp = feedparser.FeedParserDict({'status':'500'}) + content = None try: - (resp, content) = h.request(uri) - awaiting_parsing.put(block=True, item=(resp, content, uri)) + try: + if isinstance(uri,unicode): + idna = uri.encode('idna') + else: + idna = uri.decode('utf-8').encode('idna') + if idna != uri: log.info("IRI %s mapped to %s", uri, idna) + except: + log.info("unable to map %s to a URI", uri) + idna = uri + (resp, content) = h.request(idna) except gaierror: log.error("Fail to resolve server name %s via %d", uri, thread_index) except error, e: - log.error("HTTP Error: %s in thread-%d", str(e), thread_index) + if e.__class__.__name__.lower()=='timeout': + resp['status'] = '408' + log.warn("Timeout in thread-%d", thread_index) + else: + log.error("HTTP Error: %s in thread-%d", str(e), thread_index) except Exception, e: import sys, traceback type, value, tb = sys.exc_info() @@ -394,6 +411,7 @@ def spiderPlanet(only_if_new = False): for line in (traceback.format_exception_only(type, value) + traceback.format_tb(tb)): log.error(line.rstrip()) + awaiting_parsing.put(block=True, item=(resp, content, uri)) except Empty, e: log.info("Thread %d finished", thread_index) @@ -409,18 +427,15 @@ def spiderPlanet(only_if_new = False): # Process the results as they arrive while work_queue.qsize() or awaiting_parsing.qsize() or threads: - if awaiting_parsing.qsize() == 0 and threads: - time.sleep(1) + while awaiting_parsing.qsize() == 0 and threads: + time.sleep(0.1) while awaiting_parsing.qsize(): item = awaiting_parsing.get(False) try: (resp_headers, content, uri) = item - if not resp_headers.fromcache: - if resp_headers.status < 300: - log.info("Parsing pre-fetched %s", uri) - spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) - else: - log.error("Status code %d from %s", resp_headers.status, uri) + if resp_headers.status == 200 and resp_headers.fromcache: + resp_headers.status = 304 + spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) except Exception, e: import sys, traceback type, value, tb = sys.exc_info() From 52716d99f7fff0c01c6be4712e817bfb5873ebaf Mon Sep 17 00:00:00 2001 From: Sam Ruby Date: Sun, 19 Nov 2006 12:57:44 -0500 Subject: [PATCH 22/39] Make subscription list collapsible --- themes/asf/default.css | 4 ++ themes/asf/index.html.xslt | 91 +++++++++++++++++++------------------- themes/asf/personalize.js | 53 +++++++++++++++++++--- 3 files changed, 98 insertions(+), 50 deletions(-) diff --git a/themes/asf/default.css b/themes/asf/default.css index c5169f0..2de9db5 100644 --- a/themes/asf/default.css +++ b/themes/asf/default.css @@ -146,6 +146,10 @@ h1 { display: inline; } +#footer img { + display: none; +} + /* ----------------------------- Body ---------------------------- */ #body { diff --git a/themes/asf/index.html.xslt b/themes/asf/index.html.xslt index ca355fe..2beb5d9 100644 --- a/themes/asf/index.html.xslt +++ b/themes/asf/index.html.xslt @@ -30,12 +30,52 @@

- -