Complete HttpThread refactoring

This commit is contained in:
Sam Ruby 2006-11-21 09:11:52 -05:00
parent e85ae48722
commit 70f971750b
2 changed files with 132 additions and 127 deletions

View File

@ -344,68 +344,64 @@ def httpThread(thread_index, input_queue, output_queue, log):
http_cache = config.http_cache_directory() http_cache = config.http_cache_directory()
h = httplib2.Http(http_cache) h = httplib2.Http(http_cache)
try: uri, feed_info = input_queue.get(block=True)
while True: while uri:
# The non-blocking get will throw an exception when the queue log.info("Fetching %s via %d", uri, thread_index)
# is empty which will terminate the thread. feed = StringIO('')
uri, feed_info = input_queue.get(block=False) setattr(feed, 'url', uri)
log.info("Fetching %s via %d", uri, thread_index) setattr(feed, 'headers',
feed = StringIO('') feedparser.FeedParserDict({'status':'500'}))
setattr(feed, 'url', uri) try:
setattr(feed, 'headers', # map IRI => URI
feedparser.FeedParserDict({'status':'500'}))
try: try:
# map IRI => URI if isinstance(uri,unicode):
try: idna = uri.encode('idna')
if isinstance(uri,unicode):
idna = uri.encode('idna')
else:
idna = uri.decode('utf-8').encode('idna')
if idna != uri: log.info("IRI %s mapped to %s", uri, idna)
except:
log.info("unable to map %s to a URI", uri)
idna = uri
# issue request
(resp, content) = h.request(idna)
if resp.status == 200 and resp.fromcache:
resp.status = 304
# build a file-like object
feed = StringIO(content)
setattr(feed, 'url', resp.get('content-location', uri))
if resp.has_key('content-encoding'):
del resp['content-encoding']
setattr(feed, 'headers', resp)
except gaierror:
log.error("Fail to resolve server name %s via %d",
uri, thread_index)
except BadStatusLine:
log.error("Bad Status Line received for %s via %d",
uri, thread_index)
except error, e:
if e.__class__.__name__.lower()=='timeout':
feed.headers['status'] = '408'
log.warn("Timeout in thread-%d", thread_index)
else: else:
log.error("HTTP Error: %s in thread-%d", idna = uri.decode('utf-8').encode('idna')
str(e), thread_index) if idna != uri: log.info("IRI %s mapped to %s", uri, idna)
except Exception, e: except:
import sys, traceback log.info("unable to map %s to a URI", uri)
type, value, tb = sys.exc_info() idna = uri
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
continue
output_queue.put(block=True, item=(uri, feed_info, feed)) # issue request
(resp, content) = h.request(idna)
except Empty, e: if resp.status == 200 and resp.fromcache:
log.info("Thread %d finished", thread_index) resp.status = 304
# build a file-like object
feed = StringIO(content)
setattr(feed, 'url', resp.get('content-location', uri))
if resp.has_key('content-encoding'):
del resp['content-encoding']
setattr(feed, 'headers', resp)
except gaierror:
log.error("Fail to resolve server name %s via %d",
uri, thread_index)
except BadStatusLine:
log.error("Bad Status Line received for %s via %d",
uri, thread_index)
except error, e:
if e.__class__.__name__.lower()=='timeout':
feed.headers['status'] = '408'
log.warn("Timeout in thread-%d", thread_index)
else:
log.error("HTTP Error: %s in thread-%d",
str(e), thread_index)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
continue
output_queue.put(block=True, item=(uri, feed_info, feed))
uri, feed_info = input_queue.get(block=True)
def spiderPlanet(only_if_new = False): def spiderPlanet(only_if_new = False):
""" Spider (fetch) an entire planet """ """ Spider (fetch) an entire planet """
# log = planet.getLogger(config.log_level(),config.log_format())
log = planet.getLogger(config.log_level(),config.log_format()) log = planet.getLogger(config.log_level(),config.log_format())
global index global index
@ -414,6 +410,7 @@ def spiderPlanet(only_if_new = False):
timeout = config.feed_timeout() timeout = config.feed_timeout()
try: try:
socket.setdefaulttimeout(float(timeout)) socket.setdefaulttimeout(float(timeout))
log.info("Socket timeout set to %d seconds", timeout)
except: except:
try: try:
from planet import timeoutsocket from planet import timeoutsocket
@ -422,84 +419,87 @@ def spiderPlanet(only_if_new = False):
except: except:
log.warning("Timeout set to invalid value '%s', skipping", timeout) log.warning("Timeout set to invalid value '%s', skipping", timeout)
from Queue import Queue
from threading import Thread
fetch_queue = Queue()
parse_queue = Queue()
threads = {}
if int(config.spider_threads()): if int(config.spider_threads()):
from Queue import Queue
from threading import Thread
fetch_queue = Queue()
parse_queue = Queue()
http_cache = config.http_cache_directory() http_cache = config.http_cache_directory()
if not os.path.exists(http_cache): if not os.path.exists(http_cache):
os.makedirs(http_cache, 0700) os.makedirs(http_cache, 0700)
# Load the fetch_queue with all the HTTP(S) uris.
log.info("Building work queue")
for uri in config.subscriptions():
if _is_http_uri(uri):
# read cached feed info
sources = config.cache_sources_directory()
feed_source = filename(sources, uri)
feed_info = feedparser.parse(feed_source)
if feed_info.feed and only_if_new:
log.info("Feed %s already in cache", uri)
continue
if feed_info.feed.get('planet_http_status',None) == '410':
log.info("Feed %s gone", uri)
continue
fetch_queue.put(item=((uri, feed_info)))
# Start all the worker threads # Start all the worker threads
threads = dict([(i, Thread(target=httpThread, for i in range(int(config.spider_threads())):
args=(i,fetch_queue, parse_queue, log))) threads[i] = Thread(target=httpThread,
for i in range(int(config.spider_threads()))]) args=(i,fetch_queue, parse_queue, log))
for t in threads.itervalues(): threads[i].start()
t.start() else:
log.info("Building work queue")
# Process the results as they arrive # Load the fetch and parse work queues
while fetch_queue.qsize() or parse_queue.qsize() or threads: for uri in config.subscriptions():
while parse_queue.qsize() == 0 and threads: # read cached feed info
time.sleep(0.1) sources = config.cache_sources_directory()
while parse_queue.qsize(): feed_source = filename(sources, uri)
(uri, feed_info, feed) = parse_queue.get(False) feed_info = feedparser.parse(feed_source)
try:
if int(feed.headers.status) < 300: if feed_info.feed and only_if_new:
data = feedparser.parse(feed) log.info("Feed %s already in cache", uri)
else: continue
data = feedparser.FeedParserDict({'version':None, if feed_info.feed.get('planet_http_status',None) == '410':
'headers':feed.headers, 'entries': [], log.info("Feed %s gone", uri)
'status': int(feed.headers.status)}) continue
writeCache(uri, feed_info, data) if threads and _is_http_uri(uri):
fetch_queue.put(item=(uri, feed_info))
else:
parse_queue.put(item=(uri, feed_info, uri))
except Exception, e: # Mark the end of the fetch queue
import sys, traceback for thread in threads.keys():
type, value, tb = sys.exc_info() fetch_queue.put(item=(None, None))
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
for index in threads.keys():
if not threads[index].isAlive():
del threads[index]
log.info("Finished threaded part of processing.")
# Process non-HTTP uris if we are threading, otherwise process *all* uris here. # Process the results as they arrive
unthreaded_work_queue = [uri for uri in config.subscriptions() if not int(config.spider_threads()) or not _is_http_uri(uri)] while fetch_queue.qsize() or parse_queue.qsize() or threads:
for feed in unthreaded_work_queue: while parse_queue.qsize() == 0 and threads:
try: time.sleep(0.1)
spiderFeed(feed, only_if_new=only_if_new) while parse_queue.qsize():
except Exception,e: (uri, feed_info, feed) = parse_queue.get(False)
import sys, traceback try:
type, value, tb = sys.exc_info()
log.error('Error processing %s', feed)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
if not hasattr(feed,'headers') or int(feed.headers.status)<300:
options = {}
if hasattr(feed_info,'feed'):
options['etag'] = \
feed_info.feed.get('planet_http_etag',None)
try:
modified=time.strptime(
feed_info.feed.get('planet_http_last_modified',
None))
except:
pass
data = feedparser.parse(feed, **options)
else:
data = feedparser.FeedParserDict({'version':None,
'headers':feed.headers, 'entries': [],
'status': int(feed.headers.status)})
writeCache(uri, feed_info, data)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', uri)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
for index in threads.keys():
if not threads[index].isAlive():
del threads[index]
if not threads:
log.info("Finished threaded part of processing.")

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
import unittest, os, glob, calendar, shutil, time import unittest, os, glob, calendar, shutil, time
from planet.spider import filename, spiderFeed, spiderPlanet from planet.spider import filename, spiderPlanet, writeCache
from planet import feedparser, config from planet import feedparser, config
import planet import planet
@ -43,6 +43,11 @@ class SpiderTest(unittest.TestCase):
self.assertEqual(os.path.join('.', 'xn--8ws00zhy3a.com'), self.assertEqual(os.path.join('.', 'xn--8ws00zhy3a.com'),
filename('.', u'http://www.\u8a79\u59c6\u65af.com/')) filename('.', u'http://www.\u8a79\u59c6\u65af.com/'))
def spiderFeed(self, feed_uri):
feed_info = feedparser.parse('<feed/>')
data = feedparser.parse(feed_uri)
writeCache(feed_uri, feed_info, data)
def verify_spiderFeed(self): def verify_spiderFeed(self):
files = glob.glob(workdir+"/*") files = glob.glob(workdir+"/*")
files.sort() files.sort()
@ -65,13 +70,13 @@ class SpiderTest(unittest.TestCase):
def test_spiderFeed(self): def test_spiderFeed(self):
config.load(configfile) config.load(configfile)
spiderFeed(testfeed % '1b') self.spiderFeed(testfeed % '1b')
self.verify_spiderFeed() self.verify_spiderFeed()
def test_spiderUpdate(self): def test_spiderUpdate(self):
config.load(configfile) config.load(configfile)
spiderFeed(testfeed % '1a') self.spiderFeed(testfeed % '1a')
spiderFeed(testfeed % '1b') self.spiderFeed(testfeed % '1b')
self.verify_spiderFeed() self.verify_spiderFeed()
def verify_spiderPlanet(self): def verify_spiderPlanet(self):