# -*- coding: utf-8 -*- # Copyright: Damien Elmes # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html import zlib, re, urllib, urllib2, socket, simplejson, time, shutil import os, base64, sys, httplib, types from datetime import date import anki, anki.deck, anki.cards from anki.errors import * from anki.utils import ids2str, checksum, intTime from anki.consts import * from anki.lang import _ from hooks import runHook if simplejson.__version__ < "1.7.3": raise Exception("SimpleJSON must be 1.7.3 or later.") CHUNK_SIZE = 32768 MIME_BOUNDARY = "Anki-sync-boundary" SYNC_HOST = os.environ.get("SYNC_HOST") or "dev.ankiweb.net" SYNC_PORT = int(os.environ.get("SYNC_PORT") or 80) SYNC_URL = "http://%s:%d/sync/" % (SYNC_HOST, SYNC_PORT) KEYS = ("models", "facts", "cards", "media") # fixme: # - ensure all urllib references are converted to urllib2 for proxies # - revlog merged by inserting or ignoring. schema mod for when inserted into # the past; not an issues with subscriptions as they do not share revlog # - facts can use mergeChanges(); facts recreate fsums # - graves get copied by way of the local deletes # - models could use similar merge, but they have no graves. removing causes # schema mod. # - tags: add only if missing. only way to update cache is db check, which # causes schema mod # - cards: reps on one side, changing group on another will cause reps to be # lost. priority reps over mod time? # - ability to cancel ########################################################################## from anki.consts import * class Syncer(object): MAX_REVLOG = 5000 MAX_CARDS = 5000 MAX_FACTS = 2500 def __init__(self, deck, server=None): self.deck = deck self.server = server def status(self, type): "Override to trace sync progress." #print "sync:", type pass def sync(self): "Returns 'noChanges', 'fullSync', or 'success'." # get local and remote modified, schema and sync times self.lmod, lscm, lsyn = self.times() self.rmod, rscm, rsyn = self.server.times() if self.lmod == self.rmod: return "noChanges" elif lscm != rscm: return "fullSync" # find last sync time minus 10 mins for clock drift self.ls = min(lsyn, rsyn) - 600 self.lnewer = self.lmod > self.rmod # get local changes and switch to full sync if there were too many self.status("getLocal") lchg = self.changes() if lchg == "fullSync": return "fullSync" # send them to the server, and get the server's changes self.status("getServer") rchg = self.server.changes(ls=self.ls, lnewer=self.lnewer, changes=lchg) if rchg == "fullSync": return "fullSync" # otherwise, merge self.status("merge") self.merge(lchg, rchg) # then tell server to save, and save local self.status("finish") mod = self.server.finish() self.finish(mod) return "success" def times(self): return (self.deck.mod, self.deck.scm, self.deck.lastSync) def changes(self, ls=None, lnewer=None, changes=None): if ls: # we're the server; save info self.ls = ls self.lnewer = not lnewer self.rchg = changes try: d = dict(revlog=self.getRevlog(), facts=self.getFacts(), cards=self.getCards(), models=self.getModels(), groups=self.getGroups(), tags=self.getTags()) except SyncTooLarge: return "fullSync" # collection-level configuration from last modified side if self.lnewer: d['conf'] = self.getConf() if ls: # we're the server, we can merge our side before returning self.merge(d, self.rchg) return d def merge(self, lchg, rchg): # order is important here self.mergeModels(rchg['models']) self.mergeGroups(rchg['groups']) self.mergeRevlog(rchg['revlog']) self.mergeFacts(lchg['facts'], rchg['facts']) self.mergeCards(lchg['cards'], rchg['cards']) self.mergeTags(rchg['tags']) def finish(self, mod=None): # fixme: dynamic index? if not mod: # server side; we decide new mod time mod = intTime() self.deck.lastSync = mod self.deck.save(mod=mod) return mod # Models ########################################################################## def getModels(self): return [m for m in self.deck.models.all() if m['mod'] > self.ls] def mergeModels(self, rchg): # deletes result in schema mod, so we only have to worry about # added or changed for r in rchg: l = self.deck.models.get(r['id']) # if missing locally or server is newer, update if not l or r['mod'] > l['mod']: self.deck.models.update(r) # Groups ########################################################################## def getGroups(self): return [ [g for g in self.deck.groups.all() if g['mod'] > self.ls], [g for g in self.deck.groups.allConf() if g['mod'] > self.ls] ] def mergeGroups(self, rchg): # like models we rely on schema mod for deletes for r in rchg[0]: l = self.deck.groups.get(r['id']) # if missing locally or server is newer, update if not l or r['mod'] > l['mod']: self.deck.groups.update(r) for r in rchg[1]: l = self.deck.groups.conf(r['id']) # if missing locally or server is newer, update if not l or r['mod'] > l['mod']: self.deck.groups.updateConf(r) # Tags ########################################################################## def getTags(self): return self.deck.tags.allSince(self.ls) def mergeTags(self, tags): self.deck.tags.register(tags) # Revlog ########################################################################## def getRevlog(self): r = self.deck.db.all("select * from revlog where id > ? limit ?", self.ls*1000, self.MAX_REVLOG) if len(r) == self.MAX_REVLOG: raise SyncTooLarge return r def mergeRevlog(self, logs): self.deck.db.executemany( "insert or ignore into revlog values (?,?,?,?,?,?,?,?)", logs) # Facts ########################################################################## def getFacts(self): f = self.deck.db.all("select * from facts where mod > ? limit ?", self.ls, self.MAX_FACTS) if len(f) == self.MAX_FACTS: raise SyncTooLarge return [ f, self.deck.db.list( "select oid from graves where id > ? and type = ?", self.ls, REM_FACT) ] def mergeFacts(self, lchg, rchg): (toAdd, toRem) = self.findChanges( lchg[0], lchg[1], rchg[0], rchg[1], 3) # add missing self.deck.db.executemany( "insert or replace into facts values (?,?,?,?,?,?,?,?)", toAdd) # update fsums table - fixme: in future could skip sort cache self.deck.updateFieldCache([f[0] for f in toAdd]) # remove remotely deleted self.deck._remFacts(toRem) # Cards ########################################################################## def getCards(self): c = self.deck.db.all("select * from cards where mod > ? limit ?", self.ls, self.MAX_CARDS) if len(c) == self.MAX_CARDS: raise SyncTooLarge return [ c, self.deck.db.list( "select oid from graves where id > ? and type = ?", self.ls, REM_CARD) ] def mergeCards(self, lchg, rchg): # cards with higher reps preserved, so that gid changes don't clobber # older reviews (toAdd, toRem) = self.findChanges( lchg[0], lchg[1], rchg[0], rchg[1], 10) # add missing self.deck.db.executemany( "insert or replace into cards values " "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", toAdd) # remove remotely deleted self.deck.remCards(toRem) # Deck config ########################################################################## def getConf(self): return self.deck.conf # fixme: merging conf # Merging ########################################################################## def findChanges(self, localAdds, localRems, remoteAdds, remoteRems, key): local = {} toAdd = [] toRem = [] # cache local side for l in localAdds: local[l[0]] = (True, l) for l in localRems: local[l] = (False, l) # check remote adds for r in remoteAdds: if r[0] in local: # added remotely; changed locally (lAdded, l) = local[r[0]] if lAdded: # added on both sides if r[key] > l[key]: # remote newer; update toAdd.append(r) else: # local newer; remote will update pass else: # local deleted; remote will delete pass else: # changed on server only toAdd.append(r) return toAdd, remoteRems class LocalServer(Syncer): # serialize/deserialize payload, so we don't end up sharing objects # between decks in testing def changes(self, ls, lnewer, changes): l = simplejson.loads; d = simplejson.dumps return l(d(Syncer.changes(self, ls, lnewer, l(d(changes))))) # not yet ported class RemoteServer(Syncer): pass # def unstuff(self, data): # return simplejson.loads(unicode(zlib.decompress(data), "utf8")) # def stuff(self, data): # return zlib.compress(simplejson.dumps(data)) # HTTP proxy: act as a server and direct requests to the real server ########################################################################## # not yet ported class HttpSyncServerProxy(object): def __init__(self, user, passwd): SyncServer.__init__(self) self.decks = None self.deckName = None self.username = user self.password = passwd self.protocolVersion = 5 self.sourcesToCheck = [] def connect(self, clientVersion=""): "Check auth, protocol & grab deck list." if not self.decks: import socket socket.setdefaulttimeout(30) d = self.runCmd("getDecks", libanki=anki.version, client=clientVersion, sources=simplejson.dumps(self.sourcesToCheck), pversion=self.protocolVersion) socket.setdefaulttimeout(None) if d['status'] != "OK": raise SyncError(type="authFailed", status=d['status']) self.decks = d['decks'] self.timestamp = d['timestamp'] self.timediff = abs(self.timestamp - time.time()) def hasDeck(self, deckName): self.connect() return deckName in self.decks.keys() def availableDecks(self): self.connect() return self.decks.keys() def createDeck(self, deckName): ret = self.runCmd("createDeck", name=deckName.encode("utf-8")) if not ret or ret['status'] != "OK": raise SyncError(type="createFailed") self.decks[deckName] = [0, 0] def summary(self, lastSync): return self.runCmd("summary", lastSync=self.stuff(lastSync)) def genOneWayPayload(self, lastSync): return self.runCmd("genOneWayPayload", lastSync=self.stuff(lastSync)) def modified(self): self.connect() return self.decks[self.deckName][0] def _lastSync(self): self.connect() return self.decks[self.deckName][1] def applyPayload(self, payload): return self.runCmd("applyPayload", payload=self.stuff(payload)) def finish(self): assert self.runCmd("finish") == "OK" def runCmd(self, action, **args): data = {"p": self.password, "u": self.username, "v": 2} if self.deckName: data['d'] = self.deckName.encode("utf-8") else: data['d'] = None data.update(args) data = urllib.urlencode(data) try: f = urllib2.urlopen(SYNC_URL + action, data) except (urllib2.URLError, socket.error, socket.timeout, httplib.BadStatusLine), e: raise SyncError(type="connectionError", exc=`e`) ret = f.read() if not ret: raise SyncError(type="noResponse") try: return self.unstuff(ret) except Exception, e: raise SyncError(type="connectionError", exc=`e`) # Full syncing ########################################################################## # not yet ported class FullSyncer(object): def __init__(self, deck): self.deck = deck def prepareFullSync(self): t = time.time() # ensure modified is not greater than server time self.deck.modified = min(self.deck.modified, self.server.timestamp) self.deck.db.commit() self.deck.close() fields = { "p": self.server.password, "u": self.server.username, "d": self.server.deckName.encode("utf-8"), } if self.localTime > self.remoteTime: return ("fromLocal", fields, self.deck.path) else: return ("fromServer", fields, self.deck.path) def fullSync(self): ret = self.prepareFullSync() if ret[0] == "fromLocal": self.fullSyncFromLocal(ret[1], ret[2]) else: self.fullSyncFromServer(ret[1], ret[2]) def fullSyncFromLocal(self, fields, path): global sendProgressHook try: # write into a temporary file, since POST needs content-length src = open(path, "rb") name = namedtmp("fullsync.anki") tmp = open(name, "wb") # post vars for (key, value) in fields.items(): tmp.write('--' + MIME_BOUNDARY + "\r\n") tmp.write('Content-Disposition: form-data; name="%s"\r\n' % key) tmp.write('\r\n') tmp.write(value) tmp.write('\r\n') # file header tmp.write('--' + MIME_BOUNDARY + "\r\n") tmp.write( 'Content-Disposition: form-data; name="deck"; filename="deck"\r\n') tmp.write('Content-Type: application/octet-stream\r\n') tmp.write('\r\n') # data comp = zlib.compressobj() while 1: data = src.read(CHUNK_SIZE) if not data: tmp.write(comp.flush()) break tmp.write(comp.compress(data)) src.close() tmp.write('\r\n--' + MIME_BOUNDARY + '--\r\n\r\n') size = tmp.tell() tmp.seek(0) # open http connection runHook("fullSyncStarted", size) headers = { 'Content-type': 'multipart/form-data; boundary=%s' % MIME_BOUNDARY, 'Content-length': str(size), 'Host': SYNC_HOST, } req = urllib2.Request(SYNC_URL + "fullup?v=2", tmp, headers) try: sendProgressHook = fullSyncProgressHook res = urllib2.urlopen(req).read() assert res.startswith("OK") # update lastSync c = sqlite.connect(path) c.execute("update decks set lastSync = ?", (res[3:],)) c.commit() c.close() finally: sendProgressHook = None tmp.close() finally: runHook("fullSyncFinished") def fullSyncFromServer(self, fields, path): try: runHook("fullSyncStarted", 0) fields = urllib.urlencode(fields) src = urllib.urlopen(SYNC_URL + "fulldown", fields) tmpname = namedtmp("fullsync.anki") tmp = open(tmpname, "wb") decomp = zlib.decompressobj() cnt = 0 while 1: data = src.read(CHUNK_SIZE) if not data: tmp.write(decomp.flush()) break tmp.write(decomp.decompress(data)) cnt += CHUNK_SIZE runHook("fullSyncProgress", "fromServer", cnt) src.close() tmp.close() os.close(fd) # if we were successful, overwrite old deck os.unlink(path) os.rename(tmpname, path) # reset the deck name c = sqlite.connect(path) c.execute("update decks set syncName = ?", [checksum(path.encode("utf-8"))]) c.commit() c.close() finally: runHook("fullSyncFinished") ########################################################################## # Monkey-patch httplib to incrementally send instead of chewing up large # amounts of memory, and track progress. sendProgressHook = None def incrementalSend(self, strOrFile): if self.sock is None: if self.auto_open: self.connect() else: raise NotConnected() if self.debuglevel > 0: print "send:", repr(str) try: if (isinstance(strOrFile, str) or isinstance(strOrFile, unicode)): self.sock.sendall(strOrFile) else: cnt = 0 t = time.time() while 1: if sendProgressHook and time.time() - t > 1: sendProgressHook(cnt) t = time.time() data = strOrFile.read(CHUNK_SIZE) cnt += len(data) if not data: break self.sock.sendall(data) except socket.error, v: if v[0] == 32: # Broken pipe self.close() raise httplib.HTTPConnection.send = incrementalSend def fullSyncProgressHook(cnt): runHook("fullSyncProgress", "fromLocal", cnt)