diff --git a/anki/deck.py b/anki/deck.py index f448ff7ab..39eefb2ed 100644 --- a/anki/deck.py +++ b/anki/deck.py @@ -36,9 +36,10 @@ defaultConf = { # this is initialized by storage.Deck class _Deck(object): - def __init__(self, db): + def __init__(self, db, server=False): self.db = db self.path = db._path + self.server = server self._lastSave = time.time() self.clearUndo() self.media = MediaManager(self) @@ -175,7 +176,7 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""", self.media.move(olddir) def usn(self): - return self._usn + return self._usn if self.server else -1 # Object creation helpers ########################################################################## @@ -204,9 +205,8 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""", ########################################################################## def _logRem(self, ids, type): - tbl = "cards" if type == REM_CARD else "facts" self.db.executemany("insert into graves values (%d, ?, %d)" % ( - intTime(), type), ([x] for x in ids)) + self.usn(), type), ([x] for x in ids)) # Facts ########################################################################## diff --git a/anki/errors.py b/anki/errors.py index 19e6a0047..9f25339b6 100644 --- a/anki/errors.py +++ b/anki/errors.py @@ -11,7 +11,3 @@ class AnkiError(Exception): if self.data: m += ": %s" % repr(self.data) return m - -class SyncTooLarge(Exception): - pass - diff --git a/anki/storage.py b/anki/storage.py index 20f99a147..2a3bd0b7e 100644 --- a/anki/storage.py +++ b/anki/storage.py @@ -13,7 +13,7 @@ from anki.stdmodels import addBasicModel, addClozeModel from anki.errors import AnkiError from anki.hooks import runHook -def Deck(path, queue=True, lock=True): +def Deck(path, queue=True, lock=True, server=False): "Open a new or existing deck. Path must be unicode." path = os.path.abspath(path) create = not os.path.exists(path) @@ -30,7 +30,7 @@ def Deck(path, queue=True, lock=True): db.execute("pragma temp_store = memory") db.execute("pragma cache_size = 10000") # add db to deck and do any remaining upgrades - deck = _Deck(db) + deck = _Deck(db, server) if ver < CURRENT_VERSION: _upgradeDeck(deck, ver) elif create: @@ -131,7 +131,7 @@ create table if not exists graves ( ); insert or ignore into deck -values(1,0,0,0,%(v)s,0,0,0,'','{}','','','{}'); +values(1,0,0,0,%(v)s,0,1,0,'','{}','','','{}'); """ % ({'v':CURRENT_VERSION})) import anki.deck if setDeckConf: @@ -160,16 +160,16 @@ update deck set conf = ?, groups = ?, gconf = ?""", def _updateIndices(db): "Add indices to the DB." db.executescript(""" --- avoid loading entire facts table in for sync summary +-- syncing create index if not exists ix_facts_usn on facts (usn); +create index if not exists ix_cards_usn on cards (usn); +create index if not exists ix_revlog_usn on revlog (usn); -- card spacing, etc create index if not exists ix_cards_fid on cards (fid); -- scheduling and group limiting create index if not exists ix_cards_sched on cards (gid, queue, due); -- revlog by card create index if not exists ix_revlog_cid on revlog (cid); --- revlog syncing -create index if not exists ix_revlog_usn on revlog (usn); -- field uniqueness check create index if not exists ix_fsums_fid on fsums (fid); create index if not exists ix_fsums_csum on fsums (csum); diff --git a/anki/sync.py b/anki/sync.py index 1fb368c22..5bc269076 100644 --- a/anki/sync.py +++ b/anki/sync.py @@ -57,71 +57,108 @@ class Syncer(object): def sync(self): "Returns 'noChanges', 'fullSync', or 'success'." - # get local and remote modified, schema and sync times - self.lmod, lscm, lsyn, self.minUsn = self.times() - self.rmod, rscm, rsyn, self.maxUsn = self.server.times() + # step 1: login & metadata + self.rmod, rscm, self.maxUsn = self.server.times() + self.lmod, lscm, self.minUsn = self.times() if self.lmod == self.rmod: return "noChanges" elif lscm != rscm: return "fullSync" self.lnewer = self.lmod > self.rmod - # get local changes and switch to full sync if there were too many - self.status("getLocal") + # step 2: deletions and small objects lchg = self.changes() - if lchg == "fullSync": - return "fullSync" - # send them to the server, and get the server's changes - self.status("getServer") - rchg = self.server.changes(minUsn=self.minUsn, lnewer=self.lnewer, - changes=lchg) - if rchg == "fullSync": - return "fullSync" - # otherwise, merge - self.status("merge") - self.merge(lchg, rchg) - # then tell server to save, and save local - self.status("finish") + rchg = self.server.applyChanges( + minUsn=self.minUsn, lnewer=self.lnewer, changes=lchg) + self.mergeChanges(lchg, rchg) + # step 3: stream large tables from server + while 1: + chunk = self.server.chunk() + self.applyChunk(chunk) + if chunk['done']: + break + # step 4: stream to server + while 1: + chunk = self.chunk() + self.server.applyChunk(chunk) + if chunk['done']: + break + # step 5: sanity check during beta testing + c = self.sanityCheck() + s = self.server.sanityCheck() + assert c == s + # finalize mod = self.server.finish() self.finish(mod) return "success" def times(self): - return (self.deck.mod, self.deck.scm, self.deck.ls, self.deck._usn) + return (self.deck.mod, self.deck.scm, self.deck._usn) - def changes(self, minUsn=None, lnewer=None, changes=None): - if minUsn is not None: - # we're the server; save info - self.maxUsn = self.deck._usn - self.minUsn = minUsn - self.lnewer = not lnewer - self.rchg = changes - try: - d = dict(revlog=self.getRevlog(), - facts=self.getFacts(), - cards=self.getCards(), - models=self.getModels(), - groups=self.getGroups(), - tags=self.getTags()) - except SyncTooLarge: - return "fullSync" - # collection-level configuration from last modified side + def changes(self): + "Bundle up deletions and small objects, and apply if server." + d = dict(models=self.getModels(), + groups=self.getGroups(), + tags=self.getTags(), + graves=self.getGraves()) if self.lnewer: d['conf'] = self.getConf() - if minUsn is not None: - # we're the server, we can merge our side before returning - self.merge(d, self.rchg) return d - def merge(self, lchg, rchg): - # order is important here + def applyChanges(self, minUsn, lnewer, changes): + # we're the server; save info + self.maxUsn = self.deck._usn + self.minUsn = minUsn + self.lnewer = not lnewer + self.rchg = changes + lchg = self.changes() + # merge our side before returning + self.mergeChanges(lchg, self.rchg) + return lchg + + def mergeChanges(self, lchg, rchg): + # first, handle the deletions + self.mergeGraves(rchg['graves']) + # then the other objects self.mergeModels(rchg['models']) self.mergeGroups(rchg['groups']) - self.mergeRevlog(rchg['revlog']) - self.mergeFacts(lchg['facts'], rchg['facts']) - self.mergeCards(lchg['cards'], rchg['cards']) self.mergeTags(rchg['tags']) if 'conf' in rchg: self.mergeConf(rchg['conf']) + self.prepareToChunk() + + def sanityCheck(self): + # some basic checks to ensure the sync went ok. this is slow, so will + # be removed before official release + assert not self.deck.db.scalar(""" +select count() from cards where fid not in (select id from facts)""") + assert not self.deck.db.scalar(""" +select count() from facts where id not in (select distinct fid from cards)""") + for t in "cards", "facts", "revlog", "graves": + assert not self.deck.db.scalar( + "select count() from %s where usn = -1" % t) + for g in self.deck.groups.all(): + assert g['usn'] != -1 + for t, usn in self.deck.tags.allItems(): + assert usn != -1 + for m in self.deck.models.all(): + assert m['usn'] != -1 + return [ + self.deck.db.scalar("select count() from cards"), + self.deck.db.scalar("select count() from facts"), + self.deck.db.scalar("select count() from revlog"), + self.deck.db.scalar("select count() from fsums"), + self.deck.db.scalar("select count() from graves"), + len(self.deck.models.all()), + len(self.deck.tags.all()), + len(self.deck.groups.all()), + len(self.deck.groups.allConf()), + ] + + def usnLim(self): + if self.deck.server: + return "usn >= %d" % self.minUsn + else: + return "usn = -1" def finish(self, mod=None): if not mod: @@ -132,15 +169,108 @@ class Syncer(object): self.deck.save(mod=mod) return mod + # Chunked syncing + ########################################################################## + + def prepareToChunk(self): + self.tablesLeft = ["revlog", "cards", "facts"] + self.cursor = None + + def cursorForTable(self, table): + lim = self.usnLim() + x = self.deck.db.execute + d = (self.maxUsn, lim) + if table == "revlog": + return x(""" +select id, cid, %d, ease, ivl, lastIvl, factor, time, type +from revlog where %s""" % d) + elif table == "cards": + return x(""" +select id, fid, gid, ord, mod, %d, type, queue, due, ivl, factor, reps, +lapses, left, edue, flags, data from cards where %s""" % d) + else: + return x(""" +select id, guid, mid, gid, mod, %d, tags, flds, '', flags, data +from facts where %s""" % d) + + def chunk(self): + buf = dict(done=False) + # gather up to 5000 records + lim = 5000 + while self.tablesLeft and lim: + curTable = self.tablesLeft[0] + if not self.cursor: + self.cursor = self.cursorForTable(curTable) + rows = self.cursor.fetchmany(lim) + if len(rows) != lim: + # table is empty + self.tablesLeft.pop(0) + self.cursor = None + # if we're the client, mark the objects as having been sent + if not self.deck.server: + self.deck.db.execute( + "update %s set usn=? where usn=-1"%curTable, + self.maxUsn) + buf[curTable] = rows + lim -= len(buf) + if not self.tablesLeft: + buf['done'] = True + return buf + + def applyChunk(self, chunk): + if "revlog" in chunk: + self.mergeRevlog(chunk['revlog']) + if "cards" in chunk: + self.mergeCards(chunk['cards']) + if "facts" in chunk: + self.mergeFacts(chunk['facts']) + + # Deletions + ########################################################################## + + def getGraves(self): + cards = [] + facts = [] + groups = [] + if self.deck.server: + curs = self.deck.db.execute( + "select oid, type from graves where usn >= ?", self.minUsn) + else: + curs = self.deck.db.execute( + "select oid, type from graves where usn = -1") + for oid, type in curs: + if type == REM_CARD: + cards.append(oid) + elif type == REM_FACT: + facts.append(oid) + else: + groups.append(oid) + if not self.deck.server: + self.deck.db.execute("update graves set usn=? where usn=-1", + self.maxUsn) + return dict(cards=cards, facts=facts, groups=groups) + + def mergeGraves(self, graves): + # facts first, so we don't end up with duplicate graves + self.deck._remFacts(graves['facts']) + self.deck.remCards(graves['cards']) + for oid in graves['groups']: + self.deck.groups.rem(oid) + # Models ########################################################################## def getModels(self): - return [m for m in self.deck.models.all() if m['usn'] >= self.minUsn] + if self.deck.server: + return [m for m in self.deck.models.all() if m['usn'] >= self.minUsn] + else: + mods = [m for m in self.deck.models.all() if m['usn'] == -1] + for m in mods: + m['usn'] = self.maxUsn + self.deck.models.save() + return mods def mergeModels(self, rchg): - # deletes result in schema mod, so we only have to worry about - # added or changed for r in rchg: l = self.deck.models.get(r['id']) # if missing locally or server is newer, update @@ -151,13 +281,22 @@ class Syncer(object): ########################################################################## def getGroups(self): - return [ - [g for g in self.deck.groups.all() if g['usn'] >= self.minUsn], - [g for g in self.deck.groups.allConf() if g['usn'] >= self.minUsn] + if self.deck.server: + return [ + [g for g in self.deck.groups.all() if g['usn'] >= self.minUsn], + [g for g in self.deck.groups.allConf() if g['usn'] >= self.minUsn] ] + else: + groups = [g for g in self.deck.groups.all() if g['usn'] == -1] + for g in groups: + g['usn'] = self.maxUsn + gconf = [g for g in self.deck.groups.allConf() if g['usn'] == -1] + for g in gconf: + g['usn'] = self.maxUsn + self.deck.groups.save() + return [groups, gconf] def mergeGroups(self, rchg): - # like models we rely on schema mod for deletes for r in rchg[0]: l = self.deck.groups.get(r['id'], False) # if missing locally or server is newer, update @@ -173,84 +312,54 @@ class Syncer(object): ########################################################################## def getTags(self): - return self.deck.tags.allSinceUSN(self.minUsn) + if self.deck.server: + return [t for t, usn in self.deck.tags.allItems() + if usn >= self.minUsn] + else: + tags = [] + for t, usn in self.deck.tags.allItems(): + if usn == -1: + self.deck.tags.tags[t] = self.maxUsn + tags.append(t) + self.deck.tags.save() + return tags def mergeTags(self, tags): - self.deck.tags.register(tags) + self.deck.tags.register(tags, usn=self.maxUsn) - # Revlog + # Cards/facts/revlog ########################################################################## - def getRevlog(self): - r = self.deck.db.all("select * from revlog where usn >= ? limit ?", - self.minUsn, self.MAX_REVLOG) - if len(r) == self.MAX_REVLOG: - raise SyncTooLarge - return r - def mergeRevlog(self, logs): - for l in logs: - l[2] = self.maxUsn self.deck.db.executemany( "insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)", logs) - # Facts - ########################################################################## - # we don't actually need to send sflds across as it's recalculated on - # merge; may want to change this before final release + def newerRows(self, data, table, modIdx): + ids = (r[0] for r in data) + lmods = {} + for id, mod in self.deck.db.execute( + "select id, mod from %s where id in %s and %s" % ( + table, ids2str(ids), self.usnLim())): + lmods[id] = mod + update = [] + for r in data: + if r[0] not in lmods or lmods[r[0]] < r[modIdx]: + update.append(r) + return update - def getFacts(self): - f = self.deck.db.all("select * from facts where usn >= ? limit ?", - self.minUsn, self.MAX_FACTS) - if len(f) == self.MAX_FACTS: - raise SyncTooLarge - return [ - f, - self.deck.db.list( - "select oid from graves where usn >= ? and type = ?", - self.minUsn, REM_FACT) - ] - - def mergeFacts(self, lchg, rchg): - (toAdd, toRem) = self.findChanges( - lchg[0], lchg[1], rchg[0], rchg[1], 4, 5) - # add missing - self.deck.db.executemany( - "insert or replace into facts values (?,?,?,?,?,?,?,?,?,?,?)", - toAdd) - # update fsums table - fixme: in future could skip sort cache - self.deck.updateFieldCache([f[0] for f in toAdd]) - # remove remotely deleted - self.deck._remFacts(toRem) - - # Cards - ########################################################################## - - def getCards(self): - c = self.deck.db.all("select * from cards where usn >= ? limit ?", - self.minUsn, self.MAX_CARDS) - if len(c) == self.MAX_CARDS: - raise SyncTooLarge - return [ - c, - self.deck.db.list( - "select oid from graves where usn >= ? and type = ?", - self.minUsn, REM_CARD) - ] - - def mergeCards(self, lchg, rchg): - # cards with higher reps preserved, so that gid changes don't clobber - # older reviews - (toAdd, toRem) = self.findChanges( - lchg[0], lchg[1], rchg[0], rchg[1], 11, 5) - # add missing + def mergeCards(self, cards): self.deck.db.executemany( "insert or replace into cards values " "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", - toAdd) - # remove remotely deleted - self.deck.remCards(toRem) + self.newerRows(cards, "cards", 4)) + + def mergeFacts(self, facts): + rows = self.newerRows(facts, "facts", 4) + self.deck.db.executemany( + "insert or replace into facts values (?,?,?,?,?,?,?,?,?,?,?)", + rows) + self.deck.updateFieldCache([f[0] for f in rows]) # Deck config ########################################################################## @@ -261,47 +370,12 @@ class Syncer(object): def mergeConf(self, conf): self.deck.conf = conf - # Merging - ########################################################################## - - def findChanges(self, localAdds, localRems, remoteAdds, remoteRems, key, usn): - local = {} - toAdd = [] - toRem = [] - # cache local side - for l in localAdds: - local[l[0]] = (True, l) - for l in localRems: - local[l] = (False, l) - # check remote adds - for r in remoteAdds: - if r[0] in local: - # added remotely; changed locally - (lAdded, l) = local[r[0]] - if lAdded: - # added on both sides - if r[key] > l[key]: - # remote newer; update - r[usn] = self.maxUsn - toAdd.append(r) - else: - # local newer; remote will update - pass - else: - # local deleted; remote will delete - pass - else: - # changed on server only - r[usn] = self.maxUsn - toAdd.append(r) - return toAdd, remoteRems - class LocalServer(Syncer): # serialize/deserialize payload, so we don't end up sharing objects # between decks in testing - def changes(self, minUsn, lnewer, changes): + def applyChanges(self, minUsn, lnewer, changes): l = simplejson.loads; d = simplejson.dumps - return l(d(Syncer.changes(self, minUsn, lnewer, l(d(changes))))) + return l(d(Syncer.applyChanges(self, minUsn, lnewer, l(d(changes))))) # not yet ported class RemoteServer(Syncer): diff --git a/anki/tags.py b/anki/tags.py index d7f54bf69..9b05b27c9 100644 --- a/anki/tags.py +++ b/anki/tags.py @@ -33,13 +33,13 @@ class TagManager(object): # Registering and fetching tags ############################################################# - def register(self, tags): + def register(self, tags, usn=None): "Given a list of tags, add any missing ones to tag registry." # case is stored as received, so user can create different case # versions of the same tag if they ignore the qt autocomplete. for t in tags: if t not in self.tags: - self.tags[t] = self.deck.usn() + self.tags[t] = self.deck.usn() if usn is None else usn self.changed = True def all(self): @@ -57,8 +57,11 @@ class TagManager(object): self.register(set(self.split( " ".join(self.deck.db.list("select distinct tags from facts"+lim))))) - def allSinceUSN(self, usn): - return [k for k,v in self.tags.items() if v >= usn] + def allItems(self): + return self.tags.items() + + def save(self): + self.changed = True # Bulk addition/removal from facts ############################################################# diff --git a/tests/shared.py b/tests/shared.py index 18b51bc9c..d91957a7f 100644 --- a/tests/shared.py +++ b/tests/shared.py @@ -9,9 +9,9 @@ def assertException(exception, func): found = True assert found -def getEmptyDeck(): +def getEmptyDeck(**kwargs): (fd, nam) = tempfile.mkstemp(suffix=".anki") os.unlink(nam) - return Deck(nam) + return Deck(nam, **kwargs) testDir = os.path.dirname(__file__) diff --git a/tests/test_sync.py b/tests/test_sync.py index 209ff7050..42fc75413 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -21,28 +21,24 @@ server=None def setup_basic(loadDecks=None): global deck1, deck2, client, server - if loadDecks: - deck1 = Deck(loadDecks[0]) - deck2 = Deck(loadDecks[1]) - else: - deck1 = getEmptyDeck() - # add a fact to deck 1 - f = deck1.newFact() - f['Front'] = u"foo"; f['Back'] = u"bar"; f.tags = [u"foo"] - deck1.addFact(f) - # answer it - deck1.reset(); deck1.sched.answerCard(deck1.sched.getCard(), 4) - # repeat for deck2 - deck2 = getEmptyDeck() - f = deck2.newFact() - f['Front'] = u"bar"; f['Back'] = u"bar"; f.tags = [u"bar"] - deck2.addFact(f) - deck2.reset(); deck2.sched.answerCard(deck2.sched.getCard(), 4) - # start with same schema and sync time - deck1.scm = deck2.scm = 0 - # and same mod time, so sync does nothing - t = intTime(1000) - deck1.save(mod=t); deck2.save(mod=t) + deck1 = getEmptyDeck() + # add a fact to deck 1 + f = deck1.newFact() + f['Front'] = u"foo"; f['Back'] = u"bar"; f.tags = [u"foo"] + deck1.addFact(f) + # answer it + deck1.reset(); deck1.sched.answerCard(deck1.sched.getCard(), 4) + # repeat for deck2 + deck2 = getEmptyDeck(server=True) + f = deck2.newFact() + f['Front'] = u"bar"; f['Back'] = u"bar"; f.tags = [u"bar"] + deck2.addFact(f) + deck2.reset(); deck2.sched.answerCard(deck2.sched.getCard(), 4) + # start with same schema and sync time + deck1.scm = deck2.scm = 0 + # and same mod time, so sync does nothing + t = intTime(1000) + deck1.save(mod=t); deck2.save(mod=t) server = LocalServer(deck2) client = Syncer(deck1, server) @@ -76,9 +72,9 @@ def test_sync(): assert client.sync() == "success" # last sync times and mod times should agree assert deck1.mod == deck2.mod - assert deck1.usn() == deck2.usn() + assert deck1._usn == deck2._usn assert deck1.mod == deck1.ls - assert deck1.usn() != origUsn + assert deck1._usn != origUsn # because everything was created separately it will be merged in. in # actual use we use a full sync to ensure initial a common starting point. check(2) @@ -142,6 +138,20 @@ def test_cards(): assert deck2.getCard(card.id).reps == 1 assert client.sync() == "success" assert deck2.getCard(card.id).reps == 2 + # if it's modified on both sides , later mod time should win + for test in ((deck1, deck2), (deck2, deck1)): + time.sleep(1) + c = test[0].getCard(card.id) + c.reps = 5; c.flush() + test[0].save() + time.sleep(1) + c = test[1].getCard(card.id) + c.reps = 3; c.flush() + test[1].save() + assert client.sync() == "success" + assert test[1].getCard(card.id).reps == 3 + assert test[0].getCard(card.id).reps == 3 + # removals should work too deck1.remCards([card.id]) deck1.save() assert deck2.db.scalar("select 1 from cards where id = ?", card.id)