Different approach to threading

This commit is contained in:
Joe Gregorio 2006-11-02 11:59:25 -05:00
parent 405290aaab
commit b9604d8330
3 changed files with 89 additions and 24 deletions

View File

@ -100,6 +100,7 @@ def __init__():
define_planet('owner_email', '')
define_planet('output_theme', '')
define_planet('output_dir', 'output')
define_planet('spider_threads', 0)
define_planet_list('template_files')
define_planet_list('bill_of_materials')
@ -282,6 +283,11 @@ def downloadReadingList(list, orig_config, callback, use_cache=True, re_read=Tru
except:
logger.exception("Unable to read %s readinglist", list)
def http_cache_directory():
if parser.has_option('Planet', 'http_cache_directory'):
parser.get('Planet', 'http_cache_directory')
else:
return os.path.join(cache_directory(), 'sources/http')
def cache_sources_directory():
if parser.has_option('Planet', 'cache_sources_directory'):

View File

@ -11,7 +11,7 @@ Recommended: Python 2.3 or later
Recommended: CJKCodecs and iconv_codec <http://cjkpython.i18n.org/>
"""
__version__ = "4.2-pre-" + "$Revision: 1.144 $"[11:16] + "-cvs"
__version__ = "4.2-pre-" + "$Revision: 1.142 $"[11:16] + "-cvs"
__license__ = """Copyright (c) 2002-2006, Mark Pilgrim, All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
@ -218,9 +218,6 @@ class FeedParserDict(UserDict):
def __getitem__(self, key):
if key == 'category':
return UserDict.__getitem__(self, 'tags')[0]['term']
if key == 'enclosures':
norel = lambda link: FeedParserDict([(name,value) for (name,value) in link.items() if name!='rel'])
return [norel(link) for link in UserDict.__getitem__(self, 'links') if link['rel']=='enclosure']
if key == 'categories':
return [(tag['scheme'], tag['term']) for tag in UserDict.__getitem__(self, 'tags')]
realkey = self.keymap.get(key, key)
@ -1306,15 +1303,15 @@ class _FeedParserMixin:
attrsD.setdefault('type', 'application/atom+xml')
else:
attrsD.setdefault('type', 'text/html')
context = self._getContext()
attrsD = self._itsAnHrefDamnIt(attrsD)
if attrsD.has_key('href'):
attrsD['href'] = self.resolveURI(attrsD['href'])
if attrsD.get('rel')=='enclosure' and not context.get('id'):
context['id'] = attrsD.get('href')
expectingText = self.infeed or self.inentry or self.insource
context = self._getContext()
context.setdefault('links', [])
context['links'].append(FeedParserDict(attrsD))
if attrsD['rel'] == 'enclosure':
self._start_enclosure(attrsD)
if attrsD.has_key('href'):
expectingText = 0
if (attrsD.get('rel') == 'alternate') and (self.mapContentType(attrsD.get('type')) in self.html_types):
@ -1360,7 +1357,6 @@ class _FeedParserMixin:
self._start_content(attrsD)
else:
self.pushContent('description', attrsD, 'text/html', self.infeed or self.inentry or self.insource)
_start_dc_description = _start_description
def _start_abstract(self, attrsD):
self.pushContent('description', attrsD, 'text/plain', self.infeed or self.inentry or self.insource)
@ -1372,7 +1368,6 @@ class _FeedParserMixin:
value = self.popContent('description')
self._summaryKey = None
_end_abstract = _end_description
_end_dc_description = _end_description
def _start_info(self, attrsD):
self.pushContent('info', attrsD, 'text/plain', 1)
@ -1432,8 +1427,7 @@ class _FeedParserMixin:
def _start_enclosure(self, attrsD):
attrsD = self._itsAnHrefDamnIt(attrsD)
context = self._getContext()
attrsD['rel']='enclosure'
context.setdefault('links', []).append(FeedParserDict(attrsD))
context.setdefault('enclosures', []).append(FeedParserDict(attrsD))
href = attrsD.get('href')
if href and not context.get('id'):
context['id'] = href
@ -3254,7 +3248,7 @@ def _stripDoctype(data):
return version, data, dict(replacement and safe_pattern.findall(replacement))
def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[]):
def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, referrer=None, handlers=[], resp_headers=None):
'''Parse a feed from a URL, file, stream, or string'''
result = FeedParserDict()
result['feed'] = FeedParserDict()
@ -3263,6 +3257,9 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer
result['bozo'] = 0
if type(handlers) == types.InstanceType:
handlers = [handlers]
if resp_headers:
f = None
data = url_file_stream_or_string
try:
f = _open_resource(url_file_stream_or_string, etag, modified, agent, referrer, handlers)
data = f.read()
@ -3307,6 +3304,8 @@ def parse(url_file_stream_or_string, etag=None, modified=None, agent=None, refer
result['status'] = f.status
if hasattr(f, 'headers'):
result['headers'] = f.headers.dict
if resp_headers:
result['headers'] = resp_headers
if hasattr(f, 'close'):
f.close()

View File

@ -4,7 +4,7 @@ and write each as a set of entries in a cache directory.
"""
# Standard library modules
import time, calendar, re, os
import time, calendar, re, os, urlparse
from xml.dom import minidom
# Planet modules
import planet, config, feedparser, reconstitute, shell
@ -116,8 +116,11 @@ def scrub(feed, data):
source.author_detail.has_key('name'):
source.author_detail['name'] = \
str(stripHtml(source.author_detail.name))
def _is_http_uri(uri):
parsed = urlparse.urlparse(uri)
return parsed[0] in ['http', 'https']
def spiderFeed(feed, only_if_new=0):
def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
""" Spider (fetch) a single feed """
log = planet.logger
@ -125,6 +128,7 @@ def spiderFeed(feed, only_if_new=0):
sources = config.cache_sources_directory()
if not os.path.exists(sources):
os.makedirs(sources, 0700)
feed_source = filename(sources, feed)
feed_info = feedparser.parse(feed_source)
if feed_info.feed and only_if_new:
@ -135,6 +139,9 @@ def spiderFeed(feed, only_if_new=0):
return
# read feed itself
if content:
data = feedparser.parse(content, resp_headers)
else:
modified = None
try:
modified=time.strptime(
@ -319,12 +326,62 @@ def spiderFeed(feed, only_if_new=0):
def spiderPlanet(only_if_new = False):
""" Spider (fetch) an entire planet """
log = planet.getLogger(config.log_level(),config.log_format())
planet.setTimeout(config.feed_timeout())
global index
index = True
for feed in config.subscriptions():
if config.spider_threads():
import Queue
from threading import Thread
import httplib2
work_queue = Queue()
awaiting_parsing = Queue()
def _spider_proc():
h = httplib2.Http(config.http_cache_directory())
while True:
# The non-blocking get will throw an exception when the queue
# is empty which will terminate the thread.
uri = work_queue.get(block=False):
log.info("Fetching %s", uri)
(resp, content) = h.request(uri)
awaiting_parsing.put(block=True, (resp, content, uri))
# Load the work_queue with all the HTTP(S) uris.
map(work_queue.put, [uri for uri in config.subscriptions if _is_http_uri(uri)])
# Start all the worker threads
threads = dict([(i, Thread(target=_spider_proc)) for i in range(config.spider_threads())])
for t in threads.itervalues():
t.start()
# Process the results as they arrive
while work_queue.qsize() and awaiting_parsing.qsize() and threads:
item = awaiting_parsing.get(False)
if not item and threads:
time.sleep(1)
while item:
try:
(resp_headers, content, uri) = item
spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
except Exception, e:
import sys, traceback
type, value, tb = sys.exc_info()
log.error('Error processing %s', feed)
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())
item = awaiting_parsing.get(False)
for index in threads:
if not threads[index].isAlive():
del threads[index]
planet.setTimeout(config.feed_timeout())
# Process non-HTTP uris if we are threading, otherwise process *all* uris here.
unthreaded_work_queue = [uri for uri in config.subscriptions if not config.spider_threads() or not _is_http_uri(uri)]
for feed in unthreaded_work_queue:
try:
spiderFeed(feed, only_if_new=only_if_new)
except Exception,e:
@ -334,3 +391,6 @@ def spiderPlanet(only_if_new = False):
for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)):
log.error(line.rstrip())