Updated expunge test cases to pass...
This commit is contained in:
parent
a51d09ec07
commit
806b0ee53c
@ -15,7 +15,7 @@ def expungeCache():
|
||||
data=feedparser.parse(filename(sources,sub))
|
||||
if not data.feed.has_key('id'): continue
|
||||
if config.feed_options(sub).has_key('cache_keep_entries'):
|
||||
entry_count[data.feed.id] = config.feed_options(sub)['cache_keep_entries']
|
||||
entry_count[data.feed.id] = int(config.feed_options(sub)['cache_keep_entries'])
|
||||
else:
|
||||
entry_count[data.feed.id] = config.cache_keep_entries()
|
||||
|
||||
@ -60,7 +60,7 @@ def expungeCache():
|
||||
log.debug("Removing %s, not subscribed to %s",
|
||||
file, ids[0].childNodes[0].nodeValue)
|
||||
# remove old entry
|
||||
#os.unlink(file)
|
||||
os.unlink(file)
|
||||
|
||||
except:
|
||||
log.error("Error parsing %s", file)
|
||||
|
@ -3,18 +3,18 @@ name = test planet
|
||||
cache_directory = tests/work/expunge/cache
|
||||
cache_keep_entries = 1
|
||||
|
||||
[tests/data/expunge/testfeed1.atom]
|
||||
[tag:bzr.mfd-consult.dk,2007:venus-expunge-testfeed1]
|
||||
name = no source
|
||||
|
||||
[tests/data/expunge/testfeed2.atom]
|
||||
[tag:bzr.mfd-consult.dk,2007:venus-expunge-testfeed2]
|
||||
name = no source id
|
||||
|
||||
[tests/data/expunge/testfeed3.atom]
|
||||
[tag:bzr.mfd-consult.dk,2007:venus-expunge-testfeed3]
|
||||
name = global setting
|
||||
|
||||
[tests/data/expunge/testfeed4.atom]
|
||||
[tag:bzr.mfd-consult.dk,2007:venus-expunge-testfeed4]
|
||||
name = local setting
|
||||
cache_keep_entries = 2
|
||||
|
||||
#[tests/data/expunge/testfeed5.atom]
|
||||
#[tag:bzr.mfd-consult.dk,2007:venus-expunge-testfeed5]
|
||||
#name = unsubbed
|
||||
|
@ -1,12 +1,15 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
#import unittest, os, glob, calendar, shutil, time
|
||||
#from planet.spider import filename, spiderPlanet, writeCache
|
||||
#from planet import feedparser, config
|
||||
#import planet
|
||||
import unittest, os, glob, shutil, time
|
||||
from planet.spider import filename
|
||||
from planet import feedparser, config
|
||||
from planet.expunge import expungeCache
|
||||
from xml.dom import minidom
|
||||
import planet
|
||||
|
||||
workdir = 'tests/work/expunge/cache'
|
||||
testfeed = 'tests/data/expunge/testfeed%s.atom'
|
||||
sourcesdir = 'tests/work/expunge/cache/sources'
|
||||
testentries = 'tests/data/expunge/test*.entry'
|
||||
testfeeds = 'tests/data/expunge/test*.atom'
|
||||
configfile = 'tests/data/expunge/config.ini'
|
||||
|
||||
class ExpungeTest(unittest.TestCase):
|
||||
@ -16,130 +19,65 @@ class ExpungeTest(unittest.TestCase):
|
||||
planet.getLogger('CRITICAL',None)
|
||||
|
||||
try:
|
||||
os.makedirs(workdir)
|
||||
os.makedirs(workdir)
|
||||
os.makedirs(sourcesdir)
|
||||
except:
|
||||
self.tearDown()
|
||||
os.makedirs(workdir)
|
||||
self.tearDown()
|
||||
os.makedirs(workdir)
|
||||
os.makedirs(sourcesdir)
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(workdir)
|
||||
os.removedirs(os.path.split(workdir)[0])
|
||||
|
||||
def test_filename(self):
|
||||
self.assertEqual(os.path.join('.', 'example.com,index.html'),
|
||||
filename('.', 'http://example.com/index.html'))
|
||||
self.assertEqual(os.path.join('.',
|
||||
'planet.intertwingly.net,2006,testfeed1,1'),
|
||||
filename('.', u'tag:planet.intertwingly.net,2006:testfeed1,1'))
|
||||
self.assertEqual(os.path.join('.',
|
||||
'00000000-0000-0000-0000-000000000000'),
|
||||
filename('.', u'urn:uuid:00000000-0000-0000-0000-000000000000'))
|
||||
def test_expunge(self):
|
||||
config.load(configfile)
|
||||
|
||||
# Requires Python 2.3
|
||||
try:
|
||||
import encodings.idna
|
||||
except:
|
||||
return
|
||||
self.assertEqual(os.path.join('.', 'xn--8ws00zhy3a.com'),
|
||||
filename('.', u'http://www.\u8a79\u59c6\u65af.com/'))
|
||||
# create test entries in cache with correct timestamp
|
||||
for entry in glob.glob(testentries):
|
||||
e=minidom.parse(entry)
|
||||
e.normalize()
|
||||
eid = e.getElementsByTagName('id')
|
||||
efile = filename(workdir, eid[0].childNodes[0].nodeValue)
|
||||
eupdated = e.getElementsByTagName('updated')[0].childNodes[0].nodeValue
|
||||
emtime = time.mktime(feedparser._parse_date_w3dtf(eupdated))
|
||||
if not eid or not eupdated: continue
|
||||
shutil.copyfile(entry, efile)
|
||||
os.utime(efile, (emtime, emtime))
|
||||
|
||||
def spiderFeed(self, feed_uri):
|
||||
feed_info = feedparser.parse('<feed/>')
|
||||
data = feedparser.parse(feed_uri)
|
||||
writeCache(feed_uri, feed_info, data)
|
||||
# create test feeds in cache
|
||||
sources = config.cache_sources_directory()
|
||||
for feed in glob.glob(testfeeds):
|
||||
f=minidom.parse(feed)
|
||||
f.normalize()
|
||||
fid = f.getElementsByTagName('id')
|
||||
if not fid: continue
|
||||
ffile = filename(sources, fid[0].childNodes[0].nodeValue)
|
||||
shutil.copyfile(feed, ffile)
|
||||
|
||||
def verify_spiderFeed(self):
|
||||
# verify that exactly nine entries + one source dir were produced
|
||||
files = glob.glob(workdir+"/*")
|
||||
files.sort()
|
||||
self.assertEqual(10, len(files))
|
||||
|
||||
# verify that exactly four files + one sources dir were produced
|
||||
self.assertEqual(5, len(files))
|
||||
# verify that exactly four feeds were produced in source dir
|
||||
files = glob.glob(sources+"/*")
|
||||
self.assertEqual(4, len(files))
|
||||
|
||||
# verify that the file names are as expected
|
||||
self.assertTrue(os.path.join(workdir,
|
||||
'planet.intertwingly.net,2006,testfeed1,1') in files)
|
||||
# expunge...
|
||||
expungeCache()
|
||||
|
||||
# verify that the file timestamps match atom:updated
|
||||
data = feedparser.parse(files[2])
|
||||
self.assertEqual(['application/atom+xml'], [link.type
|
||||
for link in data.entries[0].source.links if link.rel=='self'])
|
||||
self.assertEqual('one', data.entries[0].source.planet_name)
|
||||
self.assertEqual('2006-01-03T00:00:00Z', data.entries[0].updated)
|
||||
self.assertEqual(os.stat(files[2]).st_mtime,
|
||||
calendar.timegm(data.entries[0].updated_parsed))
|
||||
|
||||
def test_spiderFeed(self):
|
||||
config.load(configfile)
|
||||
self.spiderFeed(testfeed % '1b')
|
||||
self.verify_spiderFeed()
|
||||
|
||||
def test_spiderUpdate(self):
|
||||
config.load(configfile)
|
||||
self.spiderFeed(testfeed % '1a')
|
||||
self.spiderFeed(testfeed % '1b')
|
||||
self.verify_spiderFeed()
|
||||
|
||||
def verify_spiderPlanet(self):
|
||||
# verify that five entries and one source dir are left
|
||||
files = glob.glob(workdir+"/*")
|
||||
self.assertEqual(6, len(files))
|
||||
|
||||
# verify that exactly eight files + 1 source dir were produced
|
||||
self.assertEqual(14, len(files))
|
||||
|
||||
# verify that the file names are as expected
|
||||
# verify that the right five entries are left
|
||||
self.assertTrue(os.path.join(workdir,
|
||||
'planet.intertwingly.net,2006,testfeed1,1') in files)
|
||||
'bzr.mfd-consult.dk,2007,venus-expunge-test1,1') in files)
|
||||
self.assertTrue(os.path.join(workdir,
|
||||
'planet.intertwingly.net,2006,testfeed2,1') in files)
|
||||
|
||||
data = feedparser.parse(workdir +
|
||||
'/planet.intertwingly.net,2006,testfeed3,1')
|
||||
self.assertEqual(['application/rss+xml'], [link.type
|
||||
for link in data.entries[0].source.links if link.rel=='self'])
|
||||
self.assertEqual('three', data.entries[0].source.author_detail.name)
|
||||
self.assertEqual('three', data.entries[0].source['planet_css-id'])
|
||||
|
||||
def test_spiderPlanet(self):
|
||||
config.load(configfile)
|
||||
spiderPlanet()
|
||||
self.verify_spiderPlanet()
|
||||
|
||||
def test_spiderThreads(self):
|
||||
config.load(configfile.replace('config','threaded'))
|
||||
_PORT = config.parser.getint('Planet','test_port')
|
||||
|
||||
log = []
|
||||
from SimpleHTTPServer import SimpleHTTPRequestHandler
|
||||
class TestRequestHandler(SimpleHTTPRequestHandler):
|
||||
def log_message(self, format, *args):
|
||||
log.append(args)
|
||||
|
||||
from threading import Thread
|
||||
class TestServerThread(Thread):
|
||||
def __init__(self):
|
||||
self.ready = 0
|
||||
self.done = 0
|
||||
Thread.__init__(self)
|
||||
def run(self):
|
||||
from BaseHTTPServer import HTTPServer
|
||||
httpd = HTTPServer(('',_PORT), TestRequestHandler)
|
||||
self.ready = 1
|
||||
while not self.done:
|
||||
httpd.handle_request()
|
||||
|
||||
httpd = TestServerThread()
|
||||
httpd.start()
|
||||
while not httpd.ready:
|
||||
time.sleep(0.1)
|
||||
|
||||
try:
|
||||
spiderPlanet()
|
||||
finally:
|
||||
httpd.done = 1
|
||||
import urllib
|
||||
urllib.urlopen('http://127.0.0.1:%d/' % _PORT).read()
|
||||
|
||||
status = [int(rec[1]) for rec in log if str(rec[0]).startswith('GET ')]
|
||||
status.sort()
|
||||
self.assertEqual([200,200,200,200,404], status)
|
||||
|
||||
self.verify_spiderPlanet()
|
||||
'bzr.mfd-consult.dk,2007,venus-expunge-test2,1') in files)
|
||||
self.assertTrue(os.path.join(workdir,
|
||||
'bzr.mfd-consult.dk,2007,venus-expunge-test3,3') in files)
|
||||
self.assertTrue(os.path.join(workdir,
|
||||
'bzr.mfd-consult.dk,2007,venus-expunge-test4,2') in files)
|
||||
self.assertTrue(os.path.join(workdir,
|
||||
'bzr.mfd-consult.dk,2007,venus-expunge-test4,3') in files)
|
||||
|
Loading…
x
Reference in New Issue
Block a user