More refactoring

This commit is contained in:
Sam Ruby 2006-11-20 15:55:43 -05:00
parent c6c9bed994
commit e85ae48722

View File

@ -336,6 +336,74 @@ def writeCache(feed_uri, feed_info, data):
write(xdoc.toxml().encode('utf-8'), filename(sources, feed_uri))
xdoc.unlink()
def httpThread(thread_index, input_queue, output_queue, log):
from Queue import Empty
import httplib2
from socket import gaierror, error
from httplib import BadStatusLine
http_cache = config.http_cache_directory()
h = httplib2.Http(http_cache)
try:
while True:
# The non-blocking get will throw an exception when the queue
# is empty which will terminate the thread.
uri, feed_info = input_queue.get(block=False)
log.info("Fetching %s via %d", uri, thread_index)
feed = StringIO('')
setattr(feed, 'url', uri)
setattr(feed, 'headers',
feedparser.FeedParserDict({'status':'500'}))
try:
# map IRI => URI
try:
if isinstance(uri,unicode):
idna = uri.encode('idna')
else:
idna = uri.decode('utf-8').encode('idna')
if idna != uri: log.info("IRI %s mapped to %s", uri, idna)
except:
log.info("unable to map %s to a URI", uri)
idna = uri
# issue request
(resp, content) = h.request(idna)
if resp.status == 200 and resp.fromcache:
resp.status = 304
# build a file-like object
feed = StringIO(content)
setattr(feed, 'url', resp.get('content-location', uri))
if resp.has_key('content-encoding'):
del resp['content-encoding']
setattr(feed, 'headers', resp)
except gaierror:
log.error("Fail to resolve server name %s via %d",
uri, thread_index)
except BadStatusLine:
log.error("Bad Status Line received for %s via %d",
uri, thread_index)
except error, e:
if e.__class__.__name__.lower()=='timeout':
feed.headers['status'] = '408'
log.warn("Timeout in thread-%d", thread_index)
else:
log.error("HTTP Error: %s in thread-%d",
str(e), thread_index)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
continue
output_queue.put(block=True, item=(uri, feed_info, feed))
except Empty, e:
log.info("Thread %d finished", thread_index)
def spiderPlanet(only_if_new = False):
""" Spider (fetch) an entire planet """
log = planet.getLogger(config.log_level(),config.log_format())
@ -355,111 +423,55 @@ def spiderPlanet(only_if_new = False):
log.warning("Timeout set to invalid value '%s', skipping", timeout)
if int(config.spider_threads()):
from Queue import Queue, Empty
from Queue import Queue
from threading import Thread
import httplib2
from socket import gaierror, error
from httplib import BadStatusLine
work_queue = Queue()
awaiting_parsing = Queue()
fetch_queue = Queue()
parse_queue = Queue()
http_cache = config.http_cache_directory()
if not os.path.exists(http_cache):
os.makedirs(http_cache, 0700)
def _spider_proc(thread_index):
h = httplib2.Http(http_cache)
try:
while True:
# The non-blocking get will throw an exception when the queue
# is empty which will terminate the thread.
uri = work_queue.get(block=False)
log.info("Fetching %s via %d", uri, thread_index)
resp = feedparser.FeedParserDict({'status':'500'})
feed_info = None
content = None
try:
# read cached feed info
sources = config.cache_sources_directory()
feed_source = filename(sources, uri)
feed_info = feedparser.parse(feed_source)
if feed_info.feed and only_if_new:
log.info("Feed %s already in cache", uri)
continue
if feed_info.feed.get('planet_http_status',None) == '410':
log.info("Feed %s gone", uri)
continue
# Load the fetch_queue with all the HTTP(S) uris.
log.info("Building work queue")
for uri in config.subscriptions():
if _is_http_uri(uri):
# read cached feed info
sources = config.cache_sources_directory()
feed_source = filename(sources, uri)
feed_info = feedparser.parse(feed_source)
try:
if isinstance(uri,unicode):
idna = uri.encode('idna')
else:
idna = uri.decode('utf-8').encode('idna')
if idna != uri: log.info("IRI %s mapped to %s", uri, idna)
except:
log.info("unable to map %s to a URI", uri)
idna = uri
(resp, content) = h.request(idna)
except gaierror:
log.error("Fail to resolve server name %s via %d", uri, thread_index)
except BadStatusLine:
log.error("Bad Status Line received for %s via %d", uri, thread_index)
except error, e:
if e.__class__.__name__.lower()=='timeout':
resp['status'] = '408'
log.warn("Timeout in thread-%d", thread_index)
else:
log.error("HTTP Error: %s in thread-%d", str(e), thread_index)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
continue
if feed_info.feed and only_if_new:
log.info("Feed %s already in cache", uri)
continue
if feed_info.feed.get('planet_http_status',None) == '410':
log.info("Feed %s gone", uri)
continue
if feed_info:
if resp.status == 200 and resp.fromcache:
resp.status = 304
awaiting_parsing.put(block=True,
item=(resp, content, uri, feed_info))
except Empty, e:
log.info("Thread %d finished", thread_index)
pass
# Load the work_queue with all the HTTP(S) uris.
map(work_queue.put, [uri for uri in config.subscriptions() if _is_http_uri(uri)])
fetch_queue.put(item=((uri, feed_info)))
# Start all the worker threads
threads = dict([(i, Thread(target=_spider_proc, args=(i,))) for i in range(int(config.spider_threads()))])
threads = dict([(i, Thread(target=httpThread,
args=(i,fetch_queue, parse_queue, log)))
for i in range(int(config.spider_threads()))])
for t in threads.itervalues():
t.start()
# Process the results as they arrive
while work_queue.qsize() or awaiting_parsing.qsize() or threads:
while awaiting_parsing.qsize() == 0 and threads:
while fetch_queue.qsize() or parse_queue.qsize() or threads:
while parse_queue.qsize() == 0 and threads:
time.sleep(0.1)
while awaiting_parsing.qsize():
item = awaiting_parsing.get(False)
while parse_queue.qsize():
(uri, feed_info, feed) = parse_queue.get(False)
try:
(resp_headers, content, uri, feed_info) = item
# spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
if int(resp_headers.status) < 300:
# httplib2 was used to get the content, so prepare a
# proper object to pass to feedparser.
f = StringIO(content)
setattr(f, 'url', resp_headers.get('content-location', uri))
if resp_headers.has_key('content-encoding'):
del resp_headers['content-encoding']
setattr(f, 'headers', resp_headers)
data = feedparser.parse(f)
if int(feed.headers.status) < 300:
data = feedparser.parse(feed)
else:
data = feedparser.FeedParserDict({'status': int(resp_headers.status),
'headers':resp_headers, 'version':None, 'entries': []})
data = feedparser.FeedParserDict({'version':None,
'headers':feed.headers, 'entries': [],
'status': int(feed.headers.status)})
writeCache(uri, feed_info, data)