Partial refactoring
This commit is contained in:
parent
52716d99f7
commit
c6c9bed994
123
planet/spider.py
123
planet/spider.py
@ -121,7 +121,7 @@ def _is_http_uri(uri):
|
|||||||
parsed = urlparse.urlparse(uri)
|
parsed = urlparse.urlparse(uri)
|
||||||
return parsed[0] in ['http', 'https']
|
return parsed[0] in ['http', 'https']
|
||||||
|
|
||||||
def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
def spiderFeed(feed_uri, only_if_new=0):
|
||||||
""" Spider (fetch) a single feed """
|
""" Spider (fetch) a single feed """
|
||||||
log = planet.logger
|
log = planet.logger
|
||||||
|
|
||||||
@ -130,38 +130,30 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
if not os.path.exists(sources):
|
if not os.path.exists(sources):
|
||||||
os.makedirs(sources, 0700)
|
os.makedirs(sources, 0700)
|
||||||
|
|
||||||
feed_source = filename(sources, feed)
|
feed_source = filename(sources, feed_uri)
|
||||||
feed_info = feedparser.parse(feed_source)
|
feed_info = feedparser.parse(feed_source)
|
||||||
if feed_info.feed and only_if_new:
|
if feed_info.feed and only_if_new:
|
||||||
log.info("Feed %s already in cache", feed)
|
log.info("Feed %s already in cache", feed_uri)
|
||||||
return
|
return
|
||||||
if feed_info.feed.get('planet_http_status',None) == '410':
|
if feed_info.feed.get('planet_http_status',None) == '410':
|
||||||
log.info("Feed %s gone", feed)
|
log.info("Feed %s gone", feed_uri)
|
||||||
return
|
return
|
||||||
|
|
||||||
# read feed itself
|
# read feed itself
|
||||||
if not resp_headers:
|
modified = None
|
||||||
modified = None
|
try:
|
||||||
try:
|
modified=time.strptime(
|
||||||
modified=time.strptime(
|
feed_info.feed.get('planet_http_last_modified', None))
|
||||||
feed_info.feed.get('planet_http_last_modified', None))
|
except:
|
||||||
except:
|
pass
|
||||||
pass
|
data = feedparser.parse(feed_info.feed.get('planet_http_location',feed_uri),
|
||||||
data = feedparser.parse(feed_info.feed.get('planet_http_location',feed),
|
etag=feed_info.feed.get('planet_http_etag',None), modified=modified)
|
||||||
etag=feed_info.feed.get('planet_http_etag',None), modified=modified)
|
|
||||||
elif int(resp_headers.status) < 300:
|
writeCache(feed_uri, feed_info, data)
|
||||||
# httplib2 was used to get the content, so prepare a
|
|
||||||
# proper object to pass to feedparser.
|
def writeCache(feed_uri, feed_info, data):
|
||||||
f = StringIO(content)
|
log = planet.logger
|
||||||
setattr(f, 'url', resp_headers.get('content-location', feed))
|
sources = config.cache_sources_directory()
|
||||||
if resp_headers:
|
|
||||||
if resp_headers.has_key('content-encoding'):
|
|
||||||
del resp_headers['content-encoding']
|
|
||||||
setattr(f, 'headers', resp_headers)
|
|
||||||
data = feedparser.parse(f)
|
|
||||||
else:
|
|
||||||
data = feedparser.FeedParserDict({'status': int(resp_headers.status),
|
|
||||||
'headers':resp_headers, 'version':None, 'entries': []})
|
|
||||||
|
|
||||||
# capture http status
|
# capture http status
|
||||||
if not data.has_key("status"):
|
if not data.has_key("status"):
|
||||||
@ -173,20 +165,20 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
data.status = 500
|
data.status = 500
|
||||||
|
|
||||||
activity_horizon = \
|
activity_horizon = \
|
||||||
time.gmtime(time.time()-86400*config.activity_threshold(feed))
|
time.gmtime(time.time()-86400*config.activity_threshold(feed_uri))
|
||||||
|
|
||||||
# process based on the HTTP status code
|
# process based on the HTTP status code
|
||||||
if data.status == 200 and data.has_key("url"):
|
if data.status == 200 and data.has_key("url"):
|
||||||
data.feed['planet_http_location'] = data.url
|
data.feed['planet_http_location'] = data.url
|
||||||
if feed == data.url:
|
if feed_uri == data.url:
|
||||||
log.info("Updating feed %s", feed)
|
log.info("Updating feed %s", feed_uri)
|
||||||
else:
|
else:
|
||||||
log.info("Updating feed %s @ %s", feed, data.url)
|
log.info("Updating feed %s @ %s", feed_uri, data.url)
|
||||||
elif data.status == 301 and data.has_key("entries") and len(data.entries)>0:
|
elif data.status == 301 and data.has_key("entries") and len(data.entries)>0:
|
||||||
log.warning("Feed has moved from <%s> to <%s>", feed, data.url)
|
log.warning("Feed has moved from <%s> to <%s>", feed_uri, data.url)
|
||||||
data.feed['planet_http_location'] = data.url
|
data.feed['planet_http_location'] = data.url
|
||||||
elif data.status == 304:
|
elif data.status == 304:
|
||||||
log.info("Feed %s unchanged", feed)
|
log.info("Feed %s unchanged", feed_uri)
|
||||||
|
|
||||||
if not feed_info.feed.has_key('planet_message'):
|
if not feed_info.feed.has_key('planet_message'):
|
||||||
if feed_info.feed.has_key('planet_updated'):
|
if feed_info.feed.has_key('planet_updated'):
|
||||||
@ -199,13 +191,13 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
del feed_info.feed['planet_message']
|
del feed_info.feed['planet_message']
|
||||||
|
|
||||||
elif data.status == 410:
|
elif data.status == 410:
|
||||||
log.info("Feed %s gone", feed)
|
log.info("Feed %s gone", feed_uri)
|
||||||
elif data.status == 408:
|
elif data.status == 408:
|
||||||
log.warning("Feed %s timed out", feed)
|
log.warning("Feed %s timed out", feed_uri)
|
||||||
elif data.status >= 400:
|
elif data.status >= 400:
|
||||||
log.error("Error %d while updating feed %s", data.status, feed)
|
log.error("Error %d while updating feed %s", data.status, feed_uri)
|
||||||
else:
|
else:
|
||||||
log.info("Updating feed %s", feed)
|
log.info("Updating feed %s", feed_uri)
|
||||||
|
|
||||||
# if read failed, retain cached information
|
# if read failed, retain cached information
|
||||||
if not data.version and feed_info.version:
|
if not data.version and feed_info.version:
|
||||||
@ -236,12 +228,12 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
data.feed.links.append(feedparser.FeedParserDict(
|
data.feed.links.append(feedparser.FeedParserDict(
|
||||||
{'rel':'self', 'type':feedtype, 'href':feed}))
|
{'rel':'self', 'type':feedtype, 'href':feed_uri}))
|
||||||
for name, value in config.feed_options(feed).items():
|
for name, value in config.feed_options(feed_uri).items():
|
||||||
data.feed['planet_'+name] = value
|
data.feed['planet_'+name] = value
|
||||||
|
|
||||||
# perform user configured scrub operations on the data
|
# perform user configured scrub operations on the data
|
||||||
scrub(feed, data)
|
scrub(feed_uri, data)
|
||||||
|
|
||||||
from planet import idindex
|
from planet import idindex
|
||||||
global index
|
global index
|
||||||
@ -283,7 +275,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
xdoc = reconstitute.reconstitute(data, entry)
|
xdoc = reconstitute.reconstitute(data, entry)
|
||||||
output = xdoc.toxml().encode('utf-8')
|
output = xdoc.toxml().encode('utf-8')
|
||||||
xdoc.unlink()
|
xdoc.unlink()
|
||||||
for filter in config.filters(feed):
|
for filter in config.filters(feed_uri):
|
||||||
output = shell.run(filter, output, mode="filter")
|
output = shell.run(filter, output, mode="filter")
|
||||||
if not output: break
|
if not output: break
|
||||||
if not output: continue
|
if not output: continue
|
||||||
@ -302,7 +294,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
if index: index.close()
|
if index: index.close()
|
||||||
|
|
||||||
# identify inactive feeds
|
# identify inactive feeds
|
||||||
if config.activity_threshold(feed):
|
if config.activity_threshold(feed_uri):
|
||||||
updated = [entry.updated_parsed for entry in data.entries
|
updated = [entry.updated_parsed for entry in data.entries
|
||||||
if entry.has_key('updated_parsed')]
|
if entry.has_key('updated_parsed')]
|
||||||
updated.sort()
|
updated.sort()
|
||||||
@ -314,7 +306,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
updated = [feedparser._parse_date_iso8601(data.feed.planet_updated)]
|
updated = [feedparser._parse_date_iso8601(data.feed.planet_updated)]
|
||||||
|
|
||||||
if not updated or updated[-1] < activity_horizon:
|
if not updated or updated[-1] < activity_horizon:
|
||||||
msg = "no activity in %d days" % config.activity_threshold(feed)
|
msg = "no activity in %d days" % config.activity_threshold(feed_uri)
|
||||||
log.info(msg)
|
log.info(msg)
|
||||||
data.feed['planet_message'] = msg
|
data.feed['planet_message'] = msg
|
||||||
|
|
||||||
@ -341,7 +333,7 @@ def spiderFeed(feed, only_if_new=0, content=None, resp_headers=None):
|
|||||||
xdoc=minidom.parseString('''<feed xmlns:planet="%s"
|
xdoc=minidom.parseString('''<feed xmlns:planet="%s"
|
||||||
xmlns="http://www.w3.org/2005/Atom"/>\n''' % planet.xmlns)
|
xmlns="http://www.w3.org/2005/Atom"/>\n''' % planet.xmlns)
|
||||||
reconstitute.source(xdoc.documentElement,data.feed,data.bozo,data.version)
|
reconstitute.source(xdoc.documentElement,data.feed,data.bozo,data.version)
|
||||||
write(xdoc.toxml().encode('utf-8'), filename(sources, feed))
|
write(xdoc.toxml().encode('utf-8'), filename(sources, feed_uri))
|
||||||
xdoc.unlink()
|
xdoc.unlink()
|
||||||
|
|
||||||
def spiderPlanet(only_if_new = False):
|
def spiderPlanet(only_if_new = False):
|
||||||
@ -367,6 +359,7 @@ def spiderPlanet(only_if_new = False):
|
|||||||
from threading import Thread
|
from threading import Thread
|
||||||
import httplib2
|
import httplib2
|
||||||
from socket import gaierror, error
|
from socket import gaierror, error
|
||||||
|
from httplib import BadStatusLine
|
||||||
|
|
||||||
work_queue = Queue()
|
work_queue = Queue()
|
||||||
awaiting_parsing = Queue()
|
awaiting_parsing = Queue()
|
||||||
@ -384,8 +377,20 @@ def spiderPlanet(only_if_new = False):
|
|||||||
uri = work_queue.get(block=False)
|
uri = work_queue.get(block=False)
|
||||||
log.info("Fetching %s via %d", uri, thread_index)
|
log.info("Fetching %s via %d", uri, thread_index)
|
||||||
resp = feedparser.FeedParserDict({'status':'500'})
|
resp = feedparser.FeedParserDict({'status':'500'})
|
||||||
|
feed_info = None
|
||||||
content = None
|
content = None
|
||||||
try:
|
try:
|
||||||
|
# read cached feed info
|
||||||
|
sources = config.cache_sources_directory()
|
||||||
|
feed_source = filename(sources, uri)
|
||||||
|
feed_info = feedparser.parse(feed_source)
|
||||||
|
if feed_info.feed and only_if_new:
|
||||||
|
log.info("Feed %s already in cache", uri)
|
||||||
|
continue
|
||||||
|
if feed_info.feed.get('planet_http_status',None) == '410':
|
||||||
|
log.info("Feed %s gone", uri)
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if isinstance(uri,unicode):
|
if isinstance(uri,unicode):
|
||||||
idna = uri.encode('idna')
|
idna = uri.encode('idna')
|
||||||
@ -398,6 +403,8 @@ def spiderPlanet(only_if_new = False):
|
|||||||
(resp, content) = h.request(idna)
|
(resp, content) = h.request(idna)
|
||||||
except gaierror:
|
except gaierror:
|
||||||
log.error("Fail to resolve server name %s via %d", uri, thread_index)
|
log.error("Fail to resolve server name %s via %d", uri, thread_index)
|
||||||
|
except BadStatusLine:
|
||||||
|
log.error("Bad Status Line received for %s via %d", uri, thread_index)
|
||||||
except error, e:
|
except error, e:
|
||||||
if e.__class__.__name__.lower()=='timeout':
|
if e.__class__.__name__.lower()=='timeout':
|
||||||
resp['status'] = '408'
|
resp['status'] = '408'
|
||||||
@ -411,7 +418,13 @@ def spiderPlanet(only_if_new = False):
|
|||||||
for line in (traceback.format_exception_only(type, value) +
|
for line in (traceback.format_exception_only(type, value) +
|
||||||
traceback.format_tb(tb)):
|
traceback.format_tb(tb)):
|
||||||
log.error(line.rstrip())
|
log.error(line.rstrip())
|
||||||
awaiting_parsing.put(block=True, item=(resp, content, uri))
|
continue
|
||||||
|
|
||||||
|
if feed_info:
|
||||||
|
if resp.status == 200 and resp.fromcache:
|
||||||
|
resp.status = 304
|
||||||
|
awaiting_parsing.put(block=True,
|
||||||
|
item=(resp, content, uri, feed_info))
|
||||||
|
|
||||||
except Empty, e:
|
except Empty, e:
|
||||||
log.info("Thread %d finished", thread_index)
|
log.info("Thread %d finished", thread_index)
|
||||||
@ -432,10 +445,24 @@ def spiderPlanet(only_if_new = False):
|
|||||||
while awaiting_parsing.qsize():
|
while awaiting_parsing.qsize():
|
||||||
item = awaiting_parsing.get(False)
|
item = awaiting_parsing.get(False)
|
||||||
try:
|
try:
|
||||||
(resp_headers, content, uri) = item
|
(resp_headers, content, uri, feed_info) = item
|
||||||
if resp_headers.status == 200 and resp_headers.fromcache:
|
# spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
|
||||||
resp_headers.status = 304
|
|
||||||
spiderFeed(uri, only_if_new=only_if_new, content=content, resp_headers=resp_headers)
|
if int(resp_headers.status) < 300:
|
||||||
|
# httplib2 was used to get the content, so prepare a
|
||||||
|
# proper object to pass to feedparser.
|
||||||
|
f = StringIO(content)
|
||||||
|
setattr(f, 'url', resp_headers.get('content-location', uri))
|
||||||
|
if resp_headers.has_key('content-encoding'):
|
||||||
|
del resp_headers['content-encoding']
|
||||||
|
setattr(f, 'headers', resp_headers)
|
||||||
|
data = feedparser.parse(f)
|
||||||
|
else:
|
||||||
|
data = feedparser.FeedParserDict({'status': int(resp_headers.status),
|
||||||
|
'headers':resp_headers, 'version':None, 'entries': []})
|
||||||
|
|
||||||
|
writeCache(uri, feed_info, data)
|
||||||
|
|
||||||
except Exception, e:
|
except Exception, e:
|
||||||
import sys, traceback
|
import sys, traceback
|
||||||
type, value, tb = sys.exc_info()
|
type, value, tb = sys.exc_info()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user