Resync with httplib2

This commit is contained in:
Sam Ruby 2006-11-19 11:56:36 -05:00
parent c337597302
commit 20cb60df7c
2 changed files with 134 additions and 65 deletions

View File

@ -1,3 +1,4 @@
from __future__ import generators
""" """
httplib2 httplib2
@ -8,21 +9,22 @@ Requires Python 2.3 or later
""" """
from __future__ import generators
__author__ = "Joe Gregorio (joe@bitworking.org)" __author__ = "Joe Gregorio (joe@bitworking.org)"
__copyright__ = "Copyright 2006, Joe Gregorio" __copyright__ = "Copyright 2006, Joe Gregorio"
__contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)", __contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)",
"James Antill", "James Antill",
"Xavier Verges Farrero", "Xavier Verges Farrero",
"Jonathan Feinberg", "Jonathan Feinberg",
"Blair Zajac"] "Blair Zajac",
"Sam Ruby"]
__license__ = "MIT" __license__ = "MIT"
__version__ = "$Rev: 209 $" __version__ = "$Rev: 217 $"
import re import re
import md5 import md5
import rfc822 import email
import email.Utils
import email.Message
import StringIO import StringIO
import gzip import gzip
import zlib import zlib
@ -114,6 +116,49 @@ def parse_uri(uri):
groups = URI.match(uri).groups() groups = URI.match(uri).groups()
return (groups[1], groups[3], groups[4], groups[6], groups[8]) return (groups[1], groups[3], groups[4], groups[6], groups[8])
def urlnorm(uri):
(scheme, authority, path, query, fragment) = parse_uri(uri)
authority = authority.lower()
scheme = scheme.lower()
if not path:
path = "/"
# Could do syntax based normalization of the URI before
# computing the digest. See Section 6.2.2 of Std 66.
request_uri = query and "?".join([path, query]) or path
defrag_uri = scheme + "://" + authority + request_uri
return scheme, authority, request_uri, defrag_uri
# Cache filename construction (original borrowed from Venus http://intertwingly.net/code/venus/)
re_url_scheme = re.compile(r'^\w+://')
re_slash = re.compile(r'[?/:|]+')
def safename(filename):
"""Return a filename suitable for the cache.
Strips dangerous and common characters to create a filename we
can use to store the cache in.
"""
try:
if re_url_scheme.match(filename):
if isinstance(filename,str):
filename=filename.decode('utf-8').encode('idna')
else:
filename=filename.encode('idna')
except:
pass
if isinstance(filename,unicode):
filename=filename.encode('utf-8')
filemd5 = md5.new(filename).hexdigest()
filename = re_url_scheme.sub("", filename)
filename = re_slash.sub(",", filename)
# limit length of filename
if len(filename)>200:
filename=filename[:200]
return ",".join((filename, filemd5))
NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+')
def _normalize_headers(headers): def _normalize_headers(headers):
return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.iteritems()]) return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.iteritems()])
@ -211,13 +256,13 @@ def _entry_disposition(response_headers, request_headers):
elif cc.has_key('only-if-cached'): elif cc.has_key('only-if-cached'):
retval = "FRESH" retval = "FRESH"
elif response_headers.has_key('date'): elif response_headers.has_key('date'):
date = calendar.timegm(rfc822.parsedate_tz(response_headers['date'])) date = calendar.timegm(email.Utils.parsedate_tz(response_headers['date']))
now = time.time() now = time.time()
current_age = max(0, now - date) current_age = max(0, now - date)
if cc_response.has_key('max-age'): if cc_response.has_key('max-age'):
freshness_lifetime = int(cc_response['max-age']) freshness_lifetime = int(cc_response['max-age'])
elif response_headers.has_key('expires'): elif response_headers.has_key('expires'):
expires = rfc822.parsedate_tz(response_headers['expires']) expires = email.Utils.parsedate_tz(response_headers['expires'])
freshness_lifetime = max(0, calendar.timegm(expires) - date) freshness_lifetime = max(0, calendar.timegm(expires) - date)
else: else:
freshness_lifetime = 0 freshness_lifetime = 0
@ -232,12 +277,14 @@ def _entry_disposition(response_headers, request_headers):
def _decompressContent(response, new_content): def _decompressContent(response, new_content):
content = new_content content = new_content
try: try:
if response.get('content-encoding', None) == 'gzip': encoding = response.get('content-encoding', None)
content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read() if encoding in ['gzip', 'deflate']:
response['content-length'] = str(len(content)) if encoding == 'gzip':
if response.get('content-encoding', None) == 'deflate': content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
content = zlib.decompress(content) if encoding == 'deflate':
content = zlib.decompress(content)
response['content-length'] = str(len(content)) response['content-length'] = str(len(content))
del response['content-encoding']
except: except:
content = "" content = ""
raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding')) raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'))
@ -250,14 +297,23 @@ def _updateCache(request_headers, response_headers, content, cache, cachekey):
if cc.has_key('no-store') or cc_response.has_key('no-store'): if cc.has_key('no-store') or cc_response.has_key('no-store'):
cache.delete(cachekey) cache.delete(cachekey)
else: else:
f = StringIO.StringIO("") info = email.Message.Message()
info = rfc822.Message(StringIO.StringIO(""))
for key, value in response_headers.iteritems(): for key, value in response_headers.iteritems():
info[key] = value if key not in ['status','content-encoding','transfer-encoding']:
f.write(str(info)) info[key] = value
f.write("\r\n\r\n")
f.write(content) status = response_headers.status
cache.set(cachekey, f.getvalue()) if status == 304:
status = 200
status_header = 'status: %d\r\n' % response_headers.status
header_str = info.as_string()
header_str = re.sub("\r(?!\n)|(?<!\r)\n", "\r\n", header_str)
text = "".join([status_header, header_str, content])
cache.set(cachekey, text)
def _cnonce(): def _cnonce():
dig = md5.new("%s:%s" % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])).hexdigest() dig = md5.new("%s:%s" % (time.ctime(), ["0123456789"[random.randrange(0, 9)] for i in range(20)])).hexdigest()
@ -498,20 +554,23 @@ AUTH_SCHEME_CLASSES = {
AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"] AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"]
def _md5(s):
return
class FileCache: class FileCache:
"""Uses a local directory as a store for cached files. """Uses a local directory as a store for cached files.
Not really safe to use if multiple threads or processes are going to Not really safe to use if multiple threads or processes are going to
be running on the same cache. be running on the same cache.
""" """
def __init__(self, cache): def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior
self.cache = cache self.cache = cache
self.safe = safe
if not os.path.exists(cache): if not os.path.exists(cache):
os.makedirs(self.cache) os.makedirs(self.cache)
def get(self, key): def get(self, key):
retval = None retval = None
cacheFullPath = os.path.join(self.cache, key) cacheFullPath = os.path.join(self.cache, self.safe(key))
try: try:
f = file(cacheFullPath, "r") f = file(cacheFullPath, "r")
retval = f.read() retval = f.read()
@ -521,13 +580,13 @@ class FileCache:
return retval return retval
def set(self, key, value): def set(self, key, value):
cacheFullPath = os.path.join(self.cache, key) cacheFullPath = os.path.join(self.cache, self.safe(key))
f = file(cacheFullPath, "w") f = file(cacheFullPath, "w")
f.write(value) f.write(value)
f.close() f.close()
def delete(self, key): def delete(self, key):
cacheFullPath = os.path.join(self.cache, key) cacheFullPath = os.path.join(self.cache, self.safe(key))
if os.path.exists(cacheFullPath): if os.path.exists(cacheFullPath):
os.remove(cacheFullPath) os.remove(cacheFullPath)
@ -639,7 +698,8 @@ class Http:
response['location'] = urlparse.urljoin(absolute_uri, location) response['location'] = urlparse.urljoin(absolute_uri, location)
if response.status == 301 and method in ["GET", "HEAD"]: if response.status == 301 and method in ["GET", "HEAD"]:
response['-x-permanent-redirect-url'] = response['location'] response['-x-permanent-redirect-url'] = response['location']
response['-location'] = absolute_uri if not response.has_key('content-location'):
response['content-location'] = absolute_uri
_updateCache(headers, response, content, self.cache, cachekey) _updateCache(headers, response, content, self.cache, cachekey)
if headers.has_key('if-none-match'): if headers.has_key('if-none-match'):
del headers['if-none-match'] del headers['if-none-match']
@ -648,7 +708,8 @@ class Http:
if response.has_key('location'): if response.has_key('location'):
location = response['location'] location = response['location']
old_response = copy.deepcopy(response) old_response = copy.deepcopy(response)
old_response['-location'] = absolute_uri if not old_response.has_key('content-location'):
old_response['content-location'] = absolute_uri
redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method
(response, content) = self.request(location, redirect_method, body=body, headers = headers, redirections = redirections - 1) (response, content) = self.request(location, redirect_method, body=body, headers = headers, redirections = redirections - 1)
response.previous = old_response response.previous = old_response
@ -656,7 +717,8 @@ class Http:
raise RedirectLimit( _("Redirected more times than rediection_limit allows.")) raise RedirectLimit( _("Redirected more times than rediection_limit allows."))
elif response.status in [200, 203] and method == "GET": elif response.status in [200, 203] and method == "GET":
# Don't cache 206's since we aren't going to handle byte range requests # Don't cache 206's since we aren't going to handle byte range requests
response['-location'] = absolute_uri if not response.has_key('content-location'):
response['content-location'] = absolute_uri
_updateCache(headers, response, content, self.cache, cachekey) _updateCache(headers, response, content, self.cache, cachekey)
return (response, content) return (response, content)
@ -690,14 +752,7 @@ a string that contains the response entity body.
if not headers.has_key('user-agent'): if not headers.has_key('user-agent'):
headers['user-agent'] = "Python-httplib2/%s" % __version__ headers['user-agent'] = "Python-httplib2/%s" % __version__
(scheme, authority, path, query, fragment) = parse_uri(uri) (scheme, authority, request_uri, defrag_uri) = urlnorm(uri)
authority = authority.lower()
if not path:
path = "/"
# Could do syntax based normalization of the URI before
# computing the digest. See Section 6.2.2 of Std 66.
request_uri = query and "?".join([path, query]) or path
defrag_uri = scheme + "://" + authority + request_uri
if not self.connections.has_key(scheme+":"+authority): if not self.connections.has_key(scheme+":"+authority):
connection_type = (scheme == 'https') and httplib.HTTPSConnection or httplib.HTTPConnection connection_type = (scheme == 'https') and httplib.HTTPSConnection or httplib.HTTPConnection
@ -709,17 +764,16 @@ a string that contains the response entity body.
if method in ["GET", "HEAD"] and 'range' not in headers: if method in ["GET", "HEAD"] and 'range' not in headers:
headers['accept-encoding'] = 'compress, gzip' headers['accept-encoding'] = 'compress, gzip'
info = rfc822.Message(StringIO.StringIO("")) info = email.Message.Message()
cached_value = None cached_value = None
if self.cache: if self.cache:
cachekey = md5.new(defrag_uri).hexdigest() cachekey = defrag_uri
cached_value = self.cache.get(cachekey) cached_value = self.cache.get(cachekey)
if cached_value: if cached_value:
try: try:
f = StringIO.StringIO(cached_value) info = email.message_from_string(cached_value)
info = rfc822.Message(f)
content = cached_value.split('\r\n\r\n', 1)[1] content = cached_value.split('\r\n\r\n', 1)[1]
except: except Exception, e:
self.cache.delete(cachekey) self.cache.delete(cachekey)
cachekey = None cachekey = None
cached_value = None cached_value = None
@ -802,7 +856,7 @@ a string that contains the response entity body.
class Response(dict): class Response(dict):
"""An object more like rfc822.Message than httplib.HTTPResponse.""" """An object more like email.Message than httplib.HTTPResponse."""
"""Is this response from our local cache""" """Is this response from our local cache"""
fromcache = False fromcache = False
@ -819,7 +873,7 @@ class Response(dict):
previous = None previous = None
def __init__(self, info): def __init__(self, info):
# info is either an rfc822.Message or # info is either an email.Message or
# an httplib.HTTPResponse object. # an httplib.HTTPResponse object.
if isinstance(info, httplib.HTTPResponse): if isinstance(info, httplib.HTTPResponse):
for key, value in info.getheaders(): for key, value in info.getheaders():
@ -828,7 +882,7 @@ class Response(dict):
self['status'] = str(self.status) self['status'] = str(self.status)
self.reason = info.reason self.reason = info.reason
self.version = info.version self.version = info.version
elif isinstance(info, rfc822.Message): elif isinstance(info, email.Message.Message):
for key, value in info.items(): for key, value in info.items():
self[key] = value self[key] = value
self.status = int(self['status']) self.status = int(self['status'])

View File

@ -140,17 +140,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
return return
# read feed itself # read feed itself
if content: if not resp_headers:
# httplib2 was used to get the content, so prepare a
# proper object to pass to feedparser.
f = StringIO(content)
setattr(f, 'url', resp_headers.get('-location', feed))
if resp_headers:
if resp_headers.has_key('content-encoding'):
del resp_headers['content-encoding']
setattr(f, 'headers', resp_headers)
data = feedparser.parse(f)
else:
modified = None modified = None
try: try:
modified=time.strptime( modified=time.strptime(
@ -159,12 +149,25 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
pass pass
data = feedparser.parse(feed_info.feed.get('planet_http_location',feed), data = feedparser.parse(feed_info.feed.get('planet_http_location',feed),
etag=feed_info.feed.get('planet_http_etag',None), modified=modified) etag=feed_info.feed.get('planet_http_etag',None), modified=modified)
elif int(resp_headers.status) < 300:
# httplib2 was used to get the content, so prepare a
# proper object to pass to feedparser.
f = StringIO(content)
setattr(f, 'url', resp_headers.get('content-location', feed))
if resp_headers:
if resp_headers.has_key('content-encoding'):
del resp_headers['content-encoding']
setattr(f, 'headers', resp_headers)
data = feedparser.parse(f)
else:
data = feedparser.FeedParserDict({'status': int(resp_headers.status),
'headers':resp_headers, 'version':None, 'entries': []})
# capture http status # capture http status
if not data.has_key("status"): if not data.has_key("status"):
if data.has_key("entries") and len(data.entries)>0: if data.has_key("entries") and len(data.entries)>0:
data.status = 200 data.status = 200
elif data.bozo and data.bozo_exception.__class__.__name__=='Timeout': elif data.bozo and data.bozo_exception.__class__.__name__.lower()=='timeout':
data.status = 408 data.status = 408
else: else:
data.status = 500 data.status = 500
@ -380,13 +383,27 @@ def spiderPlanet(only_if_new = False):
# is empty which will terminate the thread. # is empty which will terminate the thread.
uri = work_queue.get(block=False) uri = work_queue.get(block=False)
log.info("Fetching %s via %d", uri, thread_index) log.info("Fetching %s via %d", uri, thread_index)
resp = feedparser.FeedParserDict({'status':'500'})
content = None
try: try:
(resp, content) = h.request(uri) try:
awaiting_parsing.put(block=True, item=(resp, content, uri)) if isinstance(uri,unicode):
idna = uri.encode('idna')
else:
idna = uri.decode('utf-8').encode('idna')
if idna != uri: log.info("IRI %s mapped to %s", uri, idna)
except:
log.info("unable to map %s to a URI", uri)
idna = uri
(resp, content) = h.request(idna)
except gaierror: except gaierror:
log.error("Fail to resolve server name %s via %d", uri, thread_index) log.error("Fail to resolve server name %s via %d", uri, thread_index)
except error, e: except error, e:
log.error("HTTP Error: %s in thread-%d", str(e), thread_index) if e.__class__.__name__.lower()=='timeout':
resp['status'] = '408'
log.warn("Timeout in thread-%d", thread_index)
else:
log.error("HTTP Error: %s in thread-%d", str(e), thread_index)
except Exception, e: except Exception, e:
import sys, traceback import sys, traceback
type, value, tb = sys.exc_info() type, value, tb = sys.exc_info()
@ -394,6 +411,7 @@ def spiderPlanet(only_if_new = False):
for line in (traceback.format_exception_only(type, value) + for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)): traceback.format_tb(tb)):
log.error(line.rstrip()) log.error(line.rstrip())
awaiting_parsing.put(block=True, item=(resp, content, uri))
except Empty, e: except Empty, e:
log.info("Thread %d finished", thread_index) log.info("Thread %d finished", thread_index)
@ -409,18 +427,15 @@ def spiderPlanet(only_if_new = False):
# Process the results as they arrive # Process the results as they arrive
while work_queue.qsize() or awaiting_parsing.qsize() or threads: while work_queue.qsize() or awaiting_parsing.qsize() or threads:
if awaiting_parsing.qsize() == 0 and threads: while awaiting_parsing.qsize() == 0 and threads:
time.sleep(1) time.sleep(0.1)
while awaiting_parsing.qsize(): while awaiting_parsing.qsize():
item = awaiting_parsing.get(False) item = awaiting_parsing.get(False)
try: try:
(resp_headers, content, uri) = item (resp_headers, content, uri) = item
if not resp_headers.fromcache: if resp_headers.status == 200 and resp_headers.fromcache:
if resp_headers.status < 300: resp_headers.status = 304
log.info("Parsing pre-fetched %s", uri) spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
else:
log.error("Status code %d from %s", resp_headers.status, uri)
except Exception, e: except Exception, e:
import sys, traceback import sys, traceback
type, value, tb = sys.exc_info() type, value, tb = sys.exc_info()