Anki/tests/test_sync.py
2011-10-03 13:42:53 +09:00

364 lines
11 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# coding: utf-8
import nose, os, tempfile, shutil, time
from tests.shared import assertException
from anki.errors import *
from anki import Deck
from anki.utils import intTime
from anki.sync import Syncer, FullSyncer, LocalServer, RemoteServer, \
MediaSyncer, RemoteMediaServer
from anki.facts import Fact
from anki.cards import Card
from tests.shared import getEmptyDeck
# Local tests
##########################################################################
deck1=None
deck2=None
client=None
server=None
def setup_basic():
global deck1, deck2, client, server
deck1 = getEmptyDeck()
# add a fact to deck 1
f = deck1.newFact()
f['Front'] = u"foo"; f['Back'] = u"bar"; f.tags = [u"foo"]
deck1.addFact(f)
# answer it
deck1.reset(); deck1.sched.answerCard(deck1.sched.getCard(), 4)
# repeat for deck2
deck2 = getEmptyDeck(server=True)
f = deck2.newFact()
f['Front'] = u"bar"; f['Back'] = u"bar"; f.tags = [u"bar"]
deck2.addFact(f)
deck2.reset(); deck2.sched.answerCard(deck2.sched.getCard(), 4)
# start with same schema and sync time
deck1.scm = deck2.scm = 0
# and same mod time, so sync does nothing
t = intTime(1000)
deck1.save(mod=t); deck2.save(mod=t)
server = LocalServer(deck2)
client = Syncer(deck1, server)
def setup_modified():
setup_basic()
# mark deck1 as changed
deck1.save()
@nose.with_setup(setup_basic)
def test_nochange():
assert client.sync() == "noChanges"
@nose.with_setup(setup_modified)
def test_changedSchema():
deck1.scm += 1
assert client.sync() == "fullSync"
@nose.with_setup(setup_modified)
def test_sync():
def check(num):
for d in deck1, deck2:
for t in ("revlog", "facts", "cards", "fsums"):
assert d.db.scalar("select count() from %s" % t) == num
assert len(d.models.all()) == num*2
# the default group and config have an id of 1, so always 1
assert len(d.groups.all()) == 1
assert len(d.groups.gconf) == 1
assert len(d.tags.all()) == num
check(1)
origUsn = deck1.usn()
assert client.sync() == "success"
# last sync times and mod times should agree
assert deck1.mod == deck2.mod
assert deck1._usn == deck2._usn
assert deck1.mod == deck1.ls
assert deck1._usn != origUsn
# because everything was created separately it will be merged in. in
# actual use we use a full sync to ensure initial a common starting point.
check(2)
# repeating it does nothing
assert client.sync() == "noChanges"
# if we bump mod time, everything is copied across again because of the
# 600 second sync leeway. but the decks should remain the same.
deck1.save()
assert client.sync() == "success"
check(2)
@nose.with_setup(setup_modified)
def test_models():
test_sync()
# update model one
cm = deck1.models.current()
cm['name'] = "new"
time.sleep(1)
deck1.models.save(cm)
deck1.save()
assert deck2.models.get(cm['id'])['name'] == "Basic"
assert client.sync() == "success"
assert deck2.models.get(cm['id'])['name'] == "new"
# deleting triggers a full sync
deck1.scm = deck2.scm = 0
deck1.models.rem(cm)
deck1.save()
assert client.sync() == "fullSync"
@nose.with_setup(setup_modified)
def test_facts():
test_sync()
# modifications should be synced
fid = deck1.db.scalar("select id from facts")
fact = deck1.getFact(fid)
assert fact['Front'] != "abc"
fact['Front'] = "abc"
fact.flush()
deck1.save()
assert client.sync() == "success"
assert deck2.getFact(fid)['Front'] == "abc"
# deletions too
assert deck1.db.scalar("select 1 from facts where id = ?", fid)
deck1.remFacts([fid])
deck1.save()
assert client.sync() == "success"
assert not deck1.db.scalar("select 1 from facts where id = ?", fid)
assert not deck2.db.scalar("select 1 from facts where id = ?", fid)
@nose.with_setup(setup_modified)
def test_cards():
test_sync()
fid = deck1.db.scalar("select id from facts")
fact = deck1.getFact(fid)
card = fact.cards()[0]
# answer the card locally
card.startTimer()
deck1.sched.answerCard(card, 4)
assert card.reps == 2
deck1.save()
assert deck2.getCard(card.id).reps == 1
assert client.sync() == "success"
assert deck2.getCard(card.id).reps == 2
# if it's modified on both sides , later mod time should win
for test in ((deck1, deck2), (deck2, deck1)):
time.sleep(1)
c = test[0].getCard(card.id)
c.reps = 5; c.flush()
test[0].save()
time.sleep(1)
c = test[1].getCard(card.id)
c.reps = 3; c.flush()
test[1].save()
assert client.sync() == "success"
assert test[1].getCard(card.id).reps == 3
assert test[0].getCard(card.id).reps == 3
# removals should work too
deck1.remCards([card.id])
deck1.save()
assert deck2.db.scalar("select 1 from cards where id = ?", card.id)
assert client.sync() == "success"
assert not deck2.db.scalar("select 1 from cards where id = ?", card.id)
@nose.with_setup(setup_modified)
def test_tags():
test_sync()
assert deck1.tags.all() == deck2.tags.all()
deck1.tags.register(["abc"])
deck2.tags.register(["xyz"])
assert deck1.tags.all() != deck2.tags.all()
deck1.save()
deck2.save()
assert client.sync() == "success"
assert deck1.tags.all() == deck2.tags.all()
@nose.with_setup(setup_modified)
def test_groups():
test_sync()
assert len(deck1.groups.all()) == 1
assert len(deck1.groups.all()) == len(deck2.groups.all())
deck1.groups.id("new")
assert len(deck1.groups.all()) != len(deck2.groups.all())
time.sleep(0.1)
deck2.groups.id("new2")
deck1.save()
deck2.save()
assert client.sync() == "success"
assert deck1.tags.all() == deck2.tags.all()
assert len(deck1.groups.all()) == len(deck2.groups.all())
assert len(deck1.groups.all()) == 3
assert deck1.groups.conf(1)['maxTaken'] == 60
deck2.groups.conf(1)['maxTaken'] = 30
deck2.groups.save(deck2.groups.conf(1))
deck2.save()
assert client.sync() == "success"
assert deck1.groups.conf(1)['maxTaken'] == 30
@nose.with_setup(setup_modified)
def test_conf():
test_sync()
assert deck2.conf['topGroup'] == 1
deck1.conf['topGroup'] = 2
deck1.save()
assert client.sync() == "success"
assert deck2.conf['topGroup'] == 2
@nose.with_setup(setup_modified)
def test_threeway():
test_sync()
deck1.close(save=False)
d3path = deck1.path.replace(".anki", "2.anki")
shutil.copy2(deck1.path, d3path)
deck1.reopen()
deck3 = Deck(d3path)
client2 = Syncer(deck3, server)
assert client2.sync() == "noChanges"
# client 1 adds a card at time 1
time.sleep(1)
f = deck1.newFact()
f['Front'] = u"1";
deck1.addFact(f)
deck1.save()
# at time 2, client 2 syncs to server
time.sleep(1)
deck3.save()
assert client2.sync() == "success"
# at time 3, client 1 syncs, adding the older fact
time.sleep(1)
assert client.sync() == "success"
assert deck1.factCount() == deck2.factCount()
# syncing client2 should pick it up
assert client2.sync() == "success"
assert deck1.factCount() == deck2.factCount() == deck3.factCount()
def _test_speed():
t = time.time()
deck1 = Deck(os.path.expanduser("~/rapid.anki"))
for tbl in "revlog", "cards", "facts", "graves":
deck1.db.execute("update %s set usn = -1 where usn != -1"%tbl)
for m in deck1.models.all():
m['usn'] = -1
for tx in deck1.tags.all():
deck1.tags.tags[tx] = -1
deck1._usn = -1
deck1.save()
deck2 = getEmptyDeck(server=True)
deck1.scm = deck2.scm = 0
server = LocalServer(deck2)
client = Syncer(deck1, server)
print "load %d" % ((time.time() - t)*1000); t = time.time()
assert client.sync() == "success"
print "sync %d" % ((time.time() - t)*1000); t = time.time()
# Remote tests
##########################################################################
import anki.sync
anki.sync.SYNC_URL = "http://localhost:8001/sync/"
TEST_USER = "synctest@ichi2.net"
TEST_PASS = "synctest"
TEST_HKEY = "k14LvSaEtXFITCJz"
TEST_REMOTE = True
def setup_remote():
global server
setup_basic()
# mark deck1 as changed
deck1.save()
server = RemoteServer(TEST_USER, TEST_HKEY)
client.server = server
@nose.with_setup(setup_remote)
def test_meta():
global TEST_REMOTE
try:
(mod, scm, usn, ts) = server.meta()
except Exception, e:
if e.errno == 61:
TEST_REMOTE = False
print "aborting; server offline"
return
assert mod
assert scm
assert mod != client.deck.mod
assert abs(ts - time.time()) < 3
@nose.with_setup(setup_remote)
def test_hkey():
if not TEST_REMOTE:
return
assertException(Exception, lambda: server.hostKey("wrongpass"))
server.hkey = "abc"
k = server.hostKey(TEST_PASS)
assert k == server.hkey == TEST_HKEY
@nose.with_setup(setup_remote)
def test_download():
if not TEST_REMOTE:
return
f = FullSyncer(client.deck, "abc")
assertException(Exception, f.download)
f.hkey = TEST_HKEY
f.download()
@nose.with_setup(setup_remote)
def test_remoteSync():
if not TEST_REMOTE:
return
# not yet associated, so will require a full sync
assert client.sync() == "fullSync"
# upload
f = FullSyncer(client.deck, TEST_HKEY)
f.upload()
client.deck.reopen()
# should report no changes
assert client.sync() == "noChanges"
# bump local deck
client.deck.save()
assert client.sync() == "success"
# again, no changes
assert client.sync() == "noChanges"
# downloading the remote deck should give us the same mod
lmod = client.deck.mod
f = FullSyncer(client.deck, TEST_HKEY)
f.download()
d = Deck(client.deck.path)
assert d.mod == lmod
# Media tests
##########################################################################
# We can't run many tests for local media, because the desktop code assumes
# the current directory is the media folder
def setup_media():
global client, server
setup_basic()
server = MediaSyncer(deck2)
client = MediaSyncer(deck1, server)
@nose.with_setup(setup_media)
def test_mediaNothing():
client.sync()
# Remote media tests
##########################################################################
def setup_remoteMedia():
global client, server
setup_basic()
server = RemoteMediaServer(TEST_HKEY)
client = MediaSyncer(deck1, server)
@nose.with_setup(setup_remoteMedia)
def test_remoteMediaNothing():
client.sync()
@nose.with_setup(setup_remoteMedia)
def test_mediaAdd():
os.chdir(deck1.media.dir())
open(os.path.join(deck1.media.dir(), "foo.jpg"), "wb").write("foo")
assert len(os.listdir(deck1.media.dir())) == 1
assert server.mediatest(1) == 0
client.sync()
assert len(os.listdir(deck1.media.dir())) == 1
assert server.mediatest(1) == 1