# -*- coding: utf-8 -*- # Copyright: Damien Elmes # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html import urllib, os, sys, httplib2, gzip from cStringIO import StringIO from anki.db import DB from anki.utils import ids2str, intTime, json, isWin, isMac from anki.consts import * from hooks import runHook # syncing vars HTTP_TIMEOUT = 30 HTTP_PROXY = None try: # httplib2 >=0.7.7 _proxy_info_from_environment = httplib2.proxy_info_from_environment _proxy_info_from_url = httplib2.proxy_info_from_url except AttributeError: # httplib2 <0.7.7 _proxy_info_from_environment = httplib2.ProxyInfo.from_environment _proxy_info_from_url = httplib2.ProxyInfo.from_url # Httplib2 connection object ###################################################################### def httpCon(): certs = os.path.join(os.path.dirname(__file__), "ankiweb.certs") if not os.path.exists(certs): if isWin: certs = os.path.join( os.path.dirname(os.path.abspath(sys.argv[0])), "ankiweb.certs") elif isMac: certs = os.path.join( os.path.dirname(os.path.abspath(sys.argv[0])), "../Resources/ankiweb.certs") else: assert 0, "Your distro has not packaged Anki correctly." return httplib2.Http( timeout=HTTP_TIMEOUT, ca_certs=certs, proxy_info=HTTP_PROXY, disable_ssl_certificate_validation=not not HTTP_PROXY) # Proxy handling ###################################################################### def _setupProxy(): global HTTP_PROXY # set in env? p = _proxy_info_from_environment() if not p: # platform-specific fetch url = None if isWin: r = urllib.getproxies_registry() if 'https' in r: url = r['https'] elif 'http' in r: url = r['http'] elif isMac: r = urllib.getproxies_macosx_sysconf() if 'https' in r: url = r['https'] elif 'http' in r: url = r['http'] if url: p = _proxy_info_from_url(url, _proxyMethod(url)) if p: p.proxy_rdns = True HTTP_PROXY = p def _proxyMethod(url): if url.lower().startswith("https"): return "https" else: return "http" _setupProxy() # Incremental syncing ########################################################################## class Syncer(object): def __init__(self, col, server=None): self.col = col self.server = server def sync(self): "Returns 'noChanges', 'fullSync', or 'success'." # if the deck has any pending changes, flush them first and bump mod # time self.col.save() # step 1: login & metadata runHook("sync", "login") ret = self.server.meta() if not ret: return "badAuth" self.rmod, rscm, self.maxUsn, rts, self.mediaUsn = ret self.lmod, lscm, self.minUsn, lts, dummy = self.meta() if abs(rts - lts) > 300: return "clockOff" if self.lmod == self.rmod: return "noChanges" elif lscm != rscm: return "fullSync" self.lnewer = self.lmod > self.rmod # step 1.5: check collection is valid if not self.col.basicCheck(): return "basicCheckFailed" # step 2: deletions runHook("sync", "meta") lrem = self.removed() rrem = self.server.start( minUsn=self.minUsn, lnewer=self.lnewer, graves=lrem) self.remove(rrem) # ...and small objects lchg = self.changes() rchg = self.server.applyChanges(changes=lchg) self.mergeChanges(lchg, rchg) # step 3: stream large tables from server runHook("sync", "server") while 1: runHook("sync", "stream") chunk = self.server.chunk() self.applyChunk(chunk=chunk) if chunk['done']: break # step 4: stream to server runHook("sync", "client") while 1: runHook("sync", "stream") chunk = self.chunk() self.server.applyChunk(chunk=chunk) if chunk['done']: break # step 5: sanity check runHook("sync", "sanity") c = self.sanityCheck() ret = self.server.sanityCheck2(client=c) if ret['status'] != "ok": # roll back and force full sync self.col.rollback() self.col.modSchema() self.col.save() return "sanityCheckFailed" # finalize runHook("sync", "finalize") mod = self.server.finish() self.finish(mod) return "success" def meta(self): return (self.col.mod, self.col.scm, self.col._usn, intTime(), None) def changes(self): "Bundle up small objects." d = dict(models=self.getModels(), decks=self.getDecks(), tags=self.getTags()) if self.lnewer: d['conf'] = self.getConf() d['crt'] = self.col.crt return d def applyChanges(self, changes): self.rchg = changes lchg = self.changes() # merge our side before returning self.mergeChanges(lchg, self.rchg) return lchg def mergeChanges(self, lchg, rchg): # then the other objects self.mergeModels(rchg['models']) self.mergeDecks(rchg['decks']) self.mergeTags(rchg['tags']) if 'conf' in rchg: self.mergeConf(rchg['conf']) # this was left out of earlier betas if 'crt' in rchg: self.col.crt = rchg['crt'] self.prepareToChunk() def sanityCheck(self): if not self.col.basicCheck(): return "failed basic check" for t in "cards", "notes", "revlog", "graves": if self.col.db.scalar( "select count() from %s where usn = -1" % t): return "%s had usn = -1" % t for g in self.col.decks.all(): if g['usn'] == -1: return "deck had usn = -1" for t, usn in self.col.tags.allItems(): if usn == -1: return "tag had usn = -1" found = False for m in self.col.models.all(): if self.col.server: # the web upgrade was mistakenly setting usn if m['usn'] < 0: m['usn'] = 0 found = True else: if m['usn'] == -1: return "model had usn = -1" if found: self.col.models.save() self.col.sched.reset() # check for missing parent decks self.col.sched.deckDueList() # return summary of deck return [ list(self.col.sched.counts()), self.col.db.scalar("select count() from cards"), self.col.db.scalar("select count() from notes"), self.col.db.scalar("select count() from revlog"), self.col.db.scalar("select count() from graves"), len(self.col.models.all()), len(self.col.decks.all()), len(self.col.decks.allConf()), ] def sanityCheck2(self, client): server = self.sanityCheck() if client != server: return dict(status="bad", c=client, s=server) return dict(status="ok") def usnLim(self): if self.col.server: return "usn >= %d" % self.minUsn else: return "usn = -1" def finish(self, mod=None): if not mod: # server side; we decide new mod time mod = intTime(1000) self.col.ls = mod self.col._usn = self.maxUsn + 1 # ensure we save the mod time even if no changes made self.col.db.mod = True self.col.save(mod=mod) return mod # Chunked syncing ########################################################################## def prepareToChunk(self): self.tablesLeft = ["revlog", "cards", "notes"] self.cursor = None def cursorForTable(self, table): lim = self.usnLim() x = self.col.db.execute d = (self.maxUsn, lim) if table == "revlog": return x(""" select id, cid, %d, ease, ivl, lastIvl, factor, time, type from revlog where %s""" % d) elif table == "cards": return x(""" select id, nid, did, ord, mod, %d, type, queue, due, ivl, factor, reps, lapses, left, odue, odid, flags, data from cards where %s""" % d) else: return x(""" select id, guid, mid, mod, %d, tags, flds, '', '', flags, data from notes where %s""" % d) def chunk(self): buf = dict(done=False) lim = 2500 while self.tablesLeft and lim: curTable = self.tablesLeft[0] if not self.cursor: self.cursor = self.cursorForTable(curTable) rows = self.cursor.fetchmany(lim) fetched = len(rows) if fetched != lim: # table is empty self.tablesLeft.pop(0) self.cursor = None # if we're the client, mark the objects as having been sent if not self.col.server: self.col.db.execute( "update %s set usn=? where usn=-1"%curTable, self.maxUsn) buf[curTable] = rows lim -= fetched if not self.tablesLeft: buf['done'] = True return buf def applyChunk(self, chunk): if "revlog" in chunk: self.mergeRevlog(chunk['revlog']) if "cards" in chunk: self.mergeCards(chunk['cards']) if "notes" in chunk: self.mergeNotes(chunk['notes']) # Deletions ########################################################################## def removed(self): cards = [] notes = [] decks = [] if self.col.server: curs = self.col.db.execute( "select oid, type from graves where usn >= ?", self.minUsn) else: curs = self.col.db.execute( "select oid, type from graves where usn = -1") for oid, type in curs: if type == REM_CARD: cards.append(oid) elif type == REM_NOTE: notes.append(oid) else: decks.append(oid) if not self.col.server: self.col.db.execute("update graves set usn=? where usn=-1", self.maxUsn) return dict(cards=cards, notes=notes, decks=decks) def start(self, minUsn, lnewer, graves): self.maxUsn = self.col._usn self.minUsn = minUsn self.lnewer = not lnewer lgraves = self.removed() self.remove(graves) return lgraves def remove(self, graves): # pretend to be the server so we don't set usn = -1 wasServer = self.col.server self.col.server = True # notes first, so we don't end up with duplicate graves self.col._remNotes(graves['notes']) # then cards self.col.remCards(graves['cards'], notes=False) # and decks for oid in graves['decks']: self.col.decks.rem(oid, childrenToo=False) self.col.server = wasServer # Models ########################################################################## def getModels(self): if self.col.server: return [m for m in self.col.models.all() if m['usn'] >= self.minUsn] else: mods = [m for m in self.col.models.all() if m['usn'] == -1] for m in mods: m['usn'] = self.maxUsn self.col.models.save() return mods def mergeModels(self, rchg): for r in rchg: l = self.col.models.get(r['id']) # if missing locally or server is newer, update if not l or r['mod'] > l['mod']: self.col.models.update(r) # Decks ########################################################################## def getDecks(self): if self.col.server: return [ [g for g in self.col.decks.all() if g['usn'] >= self.minUsn], [g for g in self.col.decks.allConf() if g['usn'] >= self.minUsn] ] else: decks = [g for g in self.col.decks.all() if g['usn'] == -1] for g in decks: g['usn'] = self.maxUsn dconf = [g for g in self.col.decks.allConf() if g['usn'] == -1] for g in dconf: g['usn'] = self.maxUsn self.col.decks.save() return [decks, dconf] def mergeDecks(self, rchg): for r in rchg[0]: l = self.col.decks.get(r['id'], False) # if missing locally or server is newer, update if not l or r['mod'] > l['mod']: self.col.decks.update(r) for r in rchg[1]: try: l = self.col.decks.getConf(r['id']) except KeyError: l = None # if missing locally or server is newer, update if not l or r['mod'] > l['mod']: self.col.decks.updateConf(r) # Tags ########################################################################## def getTags(self): if self.col.server: return [t for t, usn in self.col.tags.allItems() if usn >= self.minUsn] else: tags = [] for t, usn in self.col.tags.allItems(): if usn == -1: self.col.tags.tags[t] = self.maxUsn tags.append(t) self.col.tags.save() return tags def mergeTags(self, tags): self.col.tags.register(tags, usn=self.maxUsn) # Cards/notes/revlog ########################################################################## def mergeRevlog(self, logs): self.col.db.executemany( "insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)", logs) def newerRows(self, data, table, modIdx): ids = (r[0] for r in data) lmods = {} for id, mod in self.col.db.execute( "select id, mod from %s where id in %s and %s" % ( table, ids2str(ids), self.usnLim())): lmods[id] = mod update = [] for r in data: if r[0] not in lmods or lmods[r[0]] < r[modIdx]: update.append(r) return update def mergeCards(self, cards): self.col.db.executemany( "insert or replace into cards values " "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", self.newerRows(cards, "cards", 4)) def mergeNotes(self, notes): rows = self.newerRows(notes, "notes", 4) self.col.db.executemany( "insert or replace into notes values (?,?,?,?,?,?,?,?,?,?,?)", rows) self.col.updateFieldCache([f[0] for f in rows]) # Col config ########################################################################## def getConf(self): return self.col.conf def mergeConf(self, conf): self.col.conf = conf # Local syncing for unit tests ########################################################################## class LocalServer(Syncer): # serialize/deserialize payload, so we don't end up sharing objects # between cols def applyChanges(self, changes): l = json.loads; d = json.dumps return l(d(Syncer.applyChanges(self, l(d(changes))))) # HTTP syncing tools ########################################################################## # Calling code should catch the following codes: # - 501: client needs upgrade # - 502: ankiweb down # - 503/504: server too busy class HttpSyncer(object): def __init__(self, hkey=None, con=None): self.hkey = hkey self.con = con or httpCon() def assertOk(self, resp): if resp['status'] != '200': raise Exception("Unknown response code: %s" % resp['status']) # Posting data as a file ###################################################################### # We don't want to post the payload as a form var, as the percent-encoding is # costly. We could send it as a raw post, but more HTTP clients seem to # support file uploading, so this is the more compatible choice. def req(self, method, fobj=None, comp=6, badAuthRaises=True, hkey=True): BOUNDARY="Anki-sync-boundary" bdry = "--"+BOUNDARY buf = StringIO() # compression flag and session key as post vars vars = {} vars['c'] = 1 if comp else 0 if hkey: vars['k'] = self.hkey for (key, value) in vars.items(): buf.write(bdry + "\r\n") buf.write( 'Content-Disposition: form-data; name="%s"\r\n\r\n%s\r\n' % (key, value)) # payload as raw data or json if fobj: # header buf.write(bdry + "\r\n") buf.write("""\ Content-Disposition: form-data; name="data"; filename="data"\r\n\ Content-Type: application/octet-stream\r\n\r\n""") # write file into buffer, optionally compressing if comp: tgt = gzip.GzipFile(mode="wb", fileobj=buf, compresslevel=comp) else: tgt = buf while 1: data = fobj.read(65536) if not data: if comp: tgt.close() break tgt.write(data) buf.write('\r\n' + bdry + '--\r\n') size = buf.tell() # connection headers headers = { 'Content-Type': 'multipart/form-data; boundary=%s' % BOUNDARY, 'Content-Length': str(size), } body = buf.getvalue() buf.close() resp, cont = self.con.request( SYNC_URL+method, "POST", headers=headers, body=body) if not badAuthRaises: # return false if bad auth instead of raising if resp['status'] == '403': return False self.assertOk(resp) return cont # Incremental sync over HTTP ###################################################################### class RemoteServer(HttpSyncer): def __init__(self, hkey): HttpSyncer.__init__(self, hkey) def hostKey(self, user, pw): "Returns hkey or none if user/pw incorrect." ret = self.req( "hostKey", StringIO(json.dumps(dict(u=user, p=pw))), badAuthRaises=False, hkey=False) if not ret: # invalid auth return self.hkey = json.loads(ret)['key'] return self.hkey def meta(self): ret = self.req( "meta", StringIO(json.dumps(dict(v=SYNC_VER))), badAuthRaises=False) if not ret: # invalid auth return return json.loads(ret) def applyChanges(self, **kw): return self._run("applyChanges", kw) def start(self, **kw): return self._run("start", kw) def chunk(self, **kw): return self._run("chunk", kw) def applyChunk(self, **kw): return self._run("applyChunk", kw) def sanityCheck2(self, **kw): return self._run("sanityCheck2", kw) def finish(self, **kw): return self._run("finish", kw) def _run(self, cmd, data): return json.loads( self.req(cmd, StringIO(json.dumps(data)))) # Full syncing ########################################################################## class FullSyncer(HttpSyncer): def __init__(self, col, hkey, con): HttpSyncer.__init__(self, hkey, con) self.col = col def download(self): runHook("sync", "download") self.col.close() cont = self.req("download") tpath = self.col.path + ".tmp" if cont == "upgradeRequired": runHook("sync", "upgradeRequired") return open(tpath, "wb").write(cont) # check the received file is ok d = DB(tpath) assert d.scalar("pragma integrity_check") == "ok" d.close() # overwrite existing collection os.unlink(self.col.path) os.rename(tpath, self.col.path) self.col = None def upload(self): "True if upload successful." runHook("sync", "upload") # make sure it's ok before we try to upload if self.col.db.scalar("pragma integrity_check") != "ok": return False if not self.col.basicCheck(): return False # apply some adjustments, then upload self.col.beforeUpload() if self.req("upload", open(self.col.path, "rb")) != "OK": return False return True # Media syncing ########################################################################## class MediaSyncer(object): def __init__(self, col, server=None): self.col = col self.server = server self.added = None def sync(self, mediaUsn): # step 1: check if there have been any changes runHook("sync", "findMedia") self.col.media.findChanges() lusn = self.col.media.usn() if lusn == mediaUsn and not self.col.media.hasChanged(): return "noChanges" # step 2: send/recv deletions runHook("sync", "removeMedia") lrem = self.removed() rrem = self.server.remove(fnames=lrem, minUsn=lusn) self.remove(rrem) # step 3: stream files from server runHook("sync", "server") while 1: runHook("sync", "streamMedia") usn = self.col.media.usn() zip = self.server.files(minUsn=usn) if self.addFiles(zip=zip): break # step 4: stream files to the server runHook("sync", "client") while 1: runHook("sync", "streamMedia") zip, fnames = self.files() if not fnames: # finished break usn = self.server.addFiles(zip=zip) # after server has replied, safe to remove from log self.col.media.forgetAdded(fnames) self.col.media.setUsn(usn) # step 5: sanity check during beta testing # NOTE: when removing this, need to move server tidyup # back from sanity check to addFiles s = self.server.mediaSanity() c = self.mediaSanity() if c != s: raise Exception("""\ Media sanity check failed. Please copy and paste the text below:\n%s\n%s""" % (c, s)) return "success" def removed(self): return self.col.media.removed() def remove(self, fnames, minUsn=None): self.col.media.syncRemove(fnames) if minUsn is not None: # we're the server return self.col.media.removed() def files(self): return self.col.media.zipAdded() def addFiles(self, zip): "True if zip is the last in set. Server returns new usn instead." return self.col.media.syncAdd(zip) def mediaSanity(self): return self.col.media.sanityCheck() # Remote media syncing ########################################################################## class RemoteMediaServer(HttpSyncer): def __init__(self, hkey, con): HttpSyncer.__init__(self, hkey, con) def remove(self, **kw): return json.loads( self.req("remove", StringIO(json.dumps(kw)))) def files(self, **kw): return self.req("files", StringIO(json.dumps(kw))) def addFiles(self, zip): # no compression, as we compress the zip file instead return json.loads( self.req("addFiles", StringIO(zip), comp=0)) def mediaSanity(self): return json.loads( self.req("mediaSanity")) # only for unit tests def mediatest(self, n): return json.loads( self.req("mediatest", StringIO( json.dumps(dict(n=n)))))