Seems to working now

This commit is contained in:
Joe Gregorio 2006-11-02 13:29:01 -05:00
parent b9604d8330
commit 58bb4b6e05
2 changed files with 28 additions and 20 deletions

View File

@ -54,7 +54,10 @@ if __name__ == "__main__":
if not offline: if not offline:
from planet import spider from planet import spider
spider.spiderPlanet(only_if_new=only_if_new) try:
spider.spiderPlanet(only_if_new=only_if_new)
except Exception, e:
print e
from planet import splice from planet import splice
doc = splice.splice() doc = splice.splice()

View File

@ -330,40 +330,45 @@ def spiderPlanet(only_if_new = False):
global index global index
index = True index = True
if config.spider_threads(): if int(config.spider_threads()):
import Queue from Queue import Queue, Empty
from threading import Thread from threading import Thread
import httplib2 import httplib2
work_queue = Queue() work_queue = Queue()
awaiting_parsing = Queue() awaiting_parsing = Queue()
def _spider_proc(): def _spider_proc(thread_index):
h = httplib2.Http(config.http_cache_directory()) h = httplib2.Http(config.http_cache_directory())
while True: try:
# The non-blocking get will throw an exception when the queue while True:
# is empty which will terminate the thread. # The non-blocking get will throw an exception when the queue
uri = work_queue.get(block=False): # is empty which will terminate the thread.
log.info("Fetching %s", uri) uri = work_queue.get(block=False)
(resp, content) = h.request(uri) log.info("Fetching %s via %d", uri, thread_index)
awaiting_parsing.put(block=True, (resp, content, uri)) (resp, content) = h.request(uri)
awaiting_parsing.put(block=True, item=(resp, content, uri))
except Empty, e:
log.info("Thread %d finished", thread_index)
pass
# Load the work_queue with all the HTTP(S) uris. # Load the work_queue with all the HTTP(S) uris.
map(work_queue.put, [uri for uri in config.subscriptions if _is_http_uri(uri)]) map(work_queue.put, [uri for uri in config.subscriptions() if _is_http_uri(uri)])
# Start all the worker threads # Start all the worker threads
threads = dict([(i, Thread(target=_spider_proc)) for i in range(config.spider_threads())]) threads = dict([(i, Thread(target=_spider_proc, args=(i,))) for i in range(int(config.spider_threads()))])
for t in threads.itervalues(): for t in threads.itervalues():
t.start() t.start()
# Process the results as they arrive # Process the results as they arrive
while work_queue.qsize() and awaiting_parsing.qsize() and threads: while work_queue.qsize() or awaiting_parsing.qsize() or threads:
item = awaiting_parsing.get(False) if awaiting_parsing.qsize() == 0 and threads:
if not item and threads:
time.sleep(1) time.sleep(1)
while item: while awaiting_parsing.qsize():
item = awaiting_parsing.get(False)
try: try:
(resp_headers, content, uri) = item (resp_headers, content, uri) = item
log.info("Parsing pre-fetched %s", uri)
spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers) spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
except Exception, e: except Exception, e:
import sys, traceback import sys, traceback
@ -372,15 +377,15 @@ def spiderPlanet(only_if_new = False):
for line in (traceback.format_exception_only(type, value) + for line in (traceback.format_exception_only(type, value) +
traceback.format_tb(tb)): traceback.format_tb(tb)):
log.error(line.rstrip()) log.error(line.rstrip())
item = awaiting_parsing.get(False) for index in threads.keys():
for index in threads:
if not threads[index].isAlive(): if not threads[index].isAlive():
del threads[index] del threads[index]
log.info("Finished threaded part of processing.")
planet.setTimeout(config.feed_timeout()) planet.setTimeout(config.feed_timeout())
# Process non-HTTP uris if we are threading, otherwise process *all* uris here. # Process non-HTTP uris if we are threading, otherwise process *all* uris here.
unthreaded_work_queue = [uri for uri in config.subscriptions if not config.spider_threads() or not _is_http_uri(uri)] unthreaded_work_queue = [uri for uri in config.subscriptions() if not int(config.spider_threads()) or not _is_http_uri(uri)]
for feed in unthreaded_work_queue: for feed in unthreaded_work_queue:
try: try:
spiderFeed(feed, only_if_new=only_if_new) spiderFeed(feed, only_if_new=only_if_new)