# coding: utf-8 import nose, os, tempfile, shutil, time from tests.shared import assertException from anki.errors import * from anki import Deck from anki.utils import intTime from anki.sync import Syncer, FullSyncer, LocalServer, RemoteServer, \ MediaSyncer, RemoteMediaServer from anki.facts import Fact from anki.cards import Card from tests.shared import getEmptyDeck deck1=None deck2=None client=None server=None server2=None # Remote tests ########################################################################## import tests.test_sync as ts from tests.test_sync import setup_basic import anki.sync anki.sync.SYNC_URL = "http://localhost:6543/sync/" TEST_USER = "synctest@ichi2.net" TEST_PASS = "synctest" TEST_HKEY = "k14LvSaEtXFITCJz" TEST_REMOTE = True def setup_remote(): setup_basic() # mark deck1 as changed ts.deck1.save() ts.server = RemoteServer(TEST_USER, TEST_HKEY) ts.client.server = ts.server @nose.with_setup(setup_remote) def test_meta(): global TEST_REMOTE try: (mod, scm, usn, tstamp, dummy) = ts.server.meta() except SyntaxError, e: if e.errno == 61: TEST_REMOTE = False print "aborting; server offline" return assert mod assert scm assert mod != ts.client.deck.mod assert abs(tstamp - time.time()) < 3 @nose.with_setup(setup_remote) def test_hkey(): if not TEST_REMOTE: return assertException(Exception, lambda: ts.server.hostKey("wrongpass")) ts.server.hkey = "abc" k = ts.server.hostKey(TEST_PASS) assert k == ts.server.hkey == TEST_HKEY @nose.with_setup(setup_remote) def test_download(): if not TEST_REMOTE: return f = FullSyncer(ts.client.deck, "abc") assertException(Exception, f.download) f.hkey = TEST_HKEY f.download() @nose.with_setup(setup_remote) def test_remoteSync(): if not TEST_REMOTE: return # not yet associated, so will require a full sync assert ts.client.sync() == "fullSync" # upload f = FullSyncer(ts.client.deck, TEST_HKEY) f.upload() ts.client.deck.reopen() # should report no changes assert ts.client.sync() == "noChanges" # bump local deck ts.client.deck.save() assert ts.client.sync() == "success" # again, no changes assert ts.client.sync() == "noChanges" # downloading the remote deck should give us the same mod lmod = ts.client.deck.mod f = FullSyncer(ts.client.deck, TEST_HKEY) f.download() d = Deck(ts.client.deck.path) assert d.mod == lmod # Remote media tests ########################################################################## # We can't run useful tests for local media, because the desktop code assumes # the current directory is the media folder. def setup_remoteMedia(): setup_basic() ts.server = RemoteMediaServer(TEST_HKEY) ts.server2 = RemoteServer(TEST_USER, TEST_HKEY) ts.client = MediaSyncer(ts.deck1, ts.server) @nose.with_setup(setup_remoteMedia) def test_media(): ts.server.mediatest("reset") assert len(os.listdir(ts.deck1.media.dir())) == 0 assert ts.server.mediatest("count") == 0 # initially, nothing to do assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" # add a file time.sleep(1) os.chdir(ts.deck1.media.dir()) p = os.path.join(ts.deck1.media.dir(), "foo.jpg") open(p, "wb").write("foo") assert len(os.listdir(ts.deck1.media.dir())) == 1 assert ts.server.mediatest("count") == 0 assert ts.client.sync(ts.server2.meta()[4]) == "success" assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" time.sleep(1) # should have been synced assert len(os.listdir(ts.deck1.media.dir())) == 1 assert ts.server.mediatest("count") == 1 # if we remove the file, should be removed os.unlink(p) assert ts.client.sync(ts.server2.meta()[4]) == "success" assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" assert len(os.listdir(ts.deck1.media.dir())) == 0 assert ts.server.mediatest("count") == 0 # we should be able to add it again time.sleep(1) open(p, "wb").write("foo") assert ts.client.sync(ts.server2.meta()[4]) == "success" assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" assert len(os.listdir(ts.deck1.media.dir())) == 1 assert ts.server.mediatest("count") == 1 # if we modify it, it should get sent too. also we set the zip size very # low here, so that we can test splitting into multiple zips import anki.media; anki.media.SYNC_ZIP_SIZE = 1 time.sleep(1) open(p, "wb").write("bar") open(p+"2", "wb").write("baz") assert len(os.listdir(ts.deck1.media.dir())) == 2 assert ts.client.sync(ts.server2.meta()[4]) == "success" assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" assert len(os.listdir(ts.deck1.media.dir())) == 2 assert ts.server.mediatest("count") == 2 # if we lose our media db, we should be able to bring it back in sync time.sleep(1) ts.deck1.media.close() os.unlink(ts.deck1.media.dir()+".db") ts.deck1.media.connect() changes = ts.deck1.media.added().fetchall() assert len(changes) == 2 assert ts.client.sync(ts.server2.meta()[4]) == "success" assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" assert len(os.listdir(ts.deck1.media.dir())) == 2 assert ts.server.mediatest("count") == 2 # if we send an unchanged file, the server should cope time.sleep(1) ts.deck1.media.db.execute("insert into log values ('foo.jpg', 0)") assert ts.client.sync(ts.server2.meta()[4]) == "success" assert ts.client.sync(ts.server2.meta()[4]) == "noChanges" assert len(os.listdir(ts.deck1.media.dir())) == 2 assert ts.server.mediatest("count") == 2 # if we remove foo.jpg on the ts.server, the removal should be synced assert ts.server.mediatest("removefoo") == "OK" assert ts.client.sync(ts.server2.meta()[4]) == "success" assert len(os.listdir(ts.deck1.media.dir())) == 1 assert ts.server.mediatest("count") == 1