Anki/anki/sync.py
Damien Elmes 362ae3eee2 initial work on sync refactor
Ported the sync code to the latest libanki structure. Key points:

No summary:

The old style got each side to fetch ids+mod times and required the client to
diff them and then request or bundle up the appropriate objects. Instead, we now
get each side to send all changed objects, and it's the responsibility of the
other side to decide what needs to be merged and what needs to be discarded.
This allows us to skip a separate summary step, which saves scanning tables
twice, and allows us to reduce server requests from 4 to 3.

Schema changes:

Certain operations that are difficult to merge (such as changing the number of
fields in a model, or deleting models or groups) result in a full sync. The
user is warned about it in the GUI before such schema-changing operations
execute.

Sync size:

For now, we don't try to deal with large incremental syncs. Because the cards,
facts and revlog can be large in memory (hundreds of megabytes in some cases),
they would have to be chunked for the benefit of devices with a low amount of
memory.

Currently findChanges() uses the full fact/card objects which we're planning to
send to the server. It could be rewritten to fetch a summary (just the id, mod
& rep columns) which would save some memory, and then compare against blocks
of a few hundred remote objects at a time. However, it's a bit more
complicated than that:

- If the local summary is huge it could exceed memory limits. Without a local
  summary we'd have to query the db for each record, which could be a lot
  slower.

- We currently accumulate a list of remote records we need to add locally.
  This list also has the potential to get too big. We would need to
  periodically commit the changes as we accumulate them.

- Merging a large amount of changes is also potentially slow on mobile
  devices.

Given the fact that certain schema-changing operations require a full sync
anyway, I think it's probably best to concentrate on a chunked full sync for
now instead, as provided the user syncs periodically it should not be easy to
hit the full sync limits except after bulk editing operations.

Chunked partial syncing should be possible to add in the future without any
changes to the deck format.

Still to do:
- deck conf merging
- full syncing
- new http proxy
2011-09-08 12:50:42 +09:00

561 lines
19 KiB
Python

# -*- coding: utf-8 -*-
# Copyright: Damien Elmes <anki@ichi2.net>
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
import zlib, re, urllib, urllib2, socket, simplejson, time, shutil
import os, base64, sys, httplib, types
from datetime import date
import anki, anki.deck, anki.cards
from anki.errors import *
from anki.utils import ids2str, checksum, intTime
from anki.consts import *
from anki.lang import _
from hooks import runHook
if simplejson.__version__ < "1.7.3":
raise Exception("SimpleJSON must be 1.7.3 or later.")
CHUNK_SIZE = 32768
MIME_BOUNDARY = "Anki-sync-boundary"
SYNC_HOST = os.environ.get("SYNC_HOST") or "dev.ankiweb.net"
SYNC_PORT = int(os.environ.get("SYNC_PORT") or 80)
SYNC_URL = "http://%s:%d/sync/" % (SYNC_HOST, SYNC_PORT)
KEYS = ("models", "facts", "cards", "media")
# fixme:
# - ensure all urllib references are converted to urllib2 for proxies
# - revlog merged by inserting or ignoring. schema mod for when inserted into
# the past; not an issues with subscriptions as they do not share revlog
# - facts can use mergeChanges(); facts recreate fsums
# - graves get copied by way of the local deletes
# - models could use similar merge, but they have no graves. removing causes
# schema mod.
# - tags: add only if missing. only way to update cache is db check, which
# causes schema mod
# - cards: reps on one side, changing group on another will cause reps to be
# lost. priority reps over mod time?
# - ability to cancel
##########################################################################
from anki.consts import *
class Syncer(object):
MAX_REVLOG = 5000
MAX_CARDS = 5000
MAX_FACTS = 2500
def __init__(self, deck, server=None):
self.deck = deck
self.server = server
def status(self, type):
"Override to trace sync progress."
#print "sync:", type
pass
def sync(self):
"Returns 'noChanges', 'fullSync', or 'success'."
# get local and remote modified, schema and sync times
self.lmod, lscm, lsyn = self.times()
self.rmod, rscm, rsyn = self.server.times()
if self.lmod == self.rmod:
return "noChanges"
elif lscm != rscm:
return "fullSync"
# find last sync time minus 10 mins for clock drift
self.ls = min(lsyn, rsyn) - 600
self.lnewer = self.lmod > self.rmod
# get local changes and switch to full sync if there were too many
self.status("getLocal")
lchg = self.changes()
if lchg == "fullSync":
return "fullSync"
# send them to the server, and get the server's changes
self.status("getServer")
rchg = self.server.changes(ls=self.ls, lnewer=self.lnewer, changes=lchg)
if rchg == "fullSync":
return "fullSync"
# otherwise, merge
self.status("merge")
self.merge(lchg, rchg)
# then tell server to save, and save local
self.status("finish")
mod = self.server.finish()
self.finish(mod)
return "success"
def times(self):
return (self.deck.mod, self.deck.scm, self.deck.lastSync)
def changes(self, ls=None, lnewer=None, changes=None):
if ls:
# we're the server; save info
self.ls = ls
self.lnewer = not lnewer
self.rchg = changes
try:
d = dict(revlog=self.getRevlog(),
facts=self.getFacts(),
cards=self.getCards(),
models=self.getModels(),
groups=self.getGroups(),
tags=self.getTags())
except SyncTooLarge:
return "fullSync"
# collection-level configuration from last modified side
if self.lnewer:
d['conf'] = self.getConf()
if ls:
# we're the server, we can merge our side before returning
self.merge(d, self.rchg)
return d
def merge(self, lchg, rchg):
# order is important here
self.mergeModels(rchg['models'])
self.mergeGroups(rchg['groups'])
self.mergeRevlog(rchg['revlog'])
self.mergeFacts(lchg['facts'], rchg['facts'])
self.mergeCards(lchg['cards'], rchg['cards'])
self.mergeTags(rchg['tags'])
def finish(self, mod=None):
# fixme: dynamic index?
if not mod:
# server side; we decide new mod time
mod = intTime()
self.deck.lastSync = mod
self.deck.save(mod=mod)
return mod
# Models
##########################################################################
def getModels(self):
return [m for m in self.deck.models.all() if m['mod'] > self.ls]
def mergeModels(self, rchg):
# deletes result in schema mod, so we only have to worry about
# added or changed
for r in rchg:
l = self.deck.models.get(r['id'])
# if missing locally or server is newer, update
if not l or r['mod'] > l['mod']:
self.deck.models.update(r)
# Groups
##########################################################################
def getGroups(self):
return [
[g for g in self.deck.groups.all() if g['mod'] > self.ls],
[g for g in self.deck.groups.allConf() if g['mod'] > self.ls]
]
def mergeGroups(self, rchg):
# like models we rely on schema mod for deletes
for r in rchg[0]:
l = self.deck.groups.get(r['id'])
# if missing locally or server is newer, update
if not l or r['mod'] > l['mod']:
self.deck.groups.update(r)
for r in rchg[1]:
l = self.deck.groups.conf(r['id'])
# if missing locally or server is newer, update
if not l or r['mod'] > l['mod']:
self.deck.groups.updateConf(r)
# Tags
##########################################################################
def getTags(self):
return self.deck.tags.allSince(self.ls)
def mergeTags(self, tags):
self.deck.tags.register(tags)
# Revlog
##########################################################################
def getRevlog(self):
r = self.deck.db.all("select * from revlog where id > ? limit ?",
self.ls, self.MAX_REVLOG)
if len(r) == self.MAX_REVLOG:
raise SyncTooLarge
return r
def mergeRevlog(self, logs):
self.deck.db.executemany(
"insert or ignore into revlog values (?,?,?,?,?,?,?,?)",
logs)
# Facts
##########################################################################
def getFacts(self):
f = self.deck.db.all("select * from facts where id > ? limit ?",
self.ls, self.MAX_FACTS)
if len(f) == self.MAX_FACTS:
raise SyncTooLarge
return [
f,
self.deck.db.list(
"select oid from graves where id > ? and type = ?",
self.ls, REM_FACT)
]
def mergeFacts(self, lchg, rchg):
(toAdd, toRem) = self.findChanges(
lchg[0], lchg[1], rchg[0], rchg[1], 3)
# add missing
self.deck.db.executemany(
"insert or replace into facts values (?,?,?,?,?,?,?,?)",
toAdd)
# update fsums table - fixme: in future could skip sort cache
self.deck.updateFieldCache([f[0] for f in toAdd])
# remove remotely deleted
self.deck._remFacts(toRem)
# Cards
##########################################################################
def getCards(self):
c = self.deck.db.all("select * from cards where id > ? limit ?",
self.ls, self.MAX_CARDS)
if len(c) == self.MAX_CARDS:
raise SyncTooLarge
return [
c,
self.deck.db.list(
"select oid from graves where id > ? and type = ?",
self.ls, REM_CARD)
]
def mergeCards(self, lchg, rchg):
# cards with higher reps preserved, so that gid changes don't clobber
# older reviews
(toAdd, toRem) = self.findChanges(
lchg[0], lchg[1], rchg[0], rchg[1], 10)
# add missing
self.deck.db.executemany(
"insert or replace into cards values "
"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
toAdd)
# remove remotely deleted
self.deck.remCards(toRem)
# Deck config
##########################################################################
def getConf(self):
return self.deck.conf
# fixme: merging conf
# Merging
##########################################################################
def findChanges(self, localAdds, localRems, remoteAdds, remoteRems, key):
local = {}
toAdd = []
toRem = []
# cache local side
for l in localAdds:
local[l[0]] = (True, l)
for l in localRems:
local[l] = (False, l)
# check remote adds
for r in remoteAdds:
if r[0] in local:
# added remotely; changed locally
(lAdded, l) = local[r[0]]
if lAdded:
# added on both sides
if r[key] > l[key]:
# remote newer; update
toAdd.append(r)
else:
# local newer; remote will update
pass
else:
# local deleted; remote will delete
pass
else:
# changed on server only
toAdd.append(r)
return toAdd, remoteRems
class LocalServer(Syncer):
# serialize/deserialize payload, so we don't end up sharing objects
# between decks in testing
def changes(self, ls, lnewer, changes):
l = simplejson.loads; d = simplejson.dumps
return l(d(Syncer.changes(self, ls, lnewer, l(d(changes)))))
# not yet ported
class RemoteServer(Syncer):
pass
# def unstuff(self, data):
# return simplejson.loads(unicode(zlib.decompress(data), "utf8"))
# def stuff(self, data):
# return zlib.compress(simplejson.dumps(data))
# HTTP proxy: act as a server and direct requests to the real server
##########################################################################
# not yet ported
class HttpSyncServerProxy(object):
def __init__(self, user, passwd):
SyncServer.__init__(self)
self.decks = None
self.deckName = None
self.username = user
self.password = passwd
self.protocolVersion = 5
self.sourcesToCheck = []
def connect(self, clientVersion=""):
"Check auth, protocol & grab deck list."
if not self.decks:
import socket
socket.setdefaulttimeout(30)
d = self.runCmd("getDecks",
libanki=anki.version,
client=clientVersion,
sources=simplejson.dumps(self.sourcesToCheck),
pversion=self.protocolVersion)
socket.setdefaulttimeout(None)
if d['status'] != "OK":
raise SyncError(type="authFailed", status=d['status'])
self.decks = d['decks']
self.timestamp = d['timestamp']
self.timediff = abs(self.timestamp - time.time())
def hasDeck(self, deckName):
self.connect()
return deckName in self.decks.keys()
def availableDecks(self):
self.connect()
return self.decks.keys()
def createDeck(self, deckName):
ret = self.runCmd("createDeck", name=deckName.encode("utf-8"))
if not ret or ret['status'] != "OK":
raise SyncError(type="createFailed")
self.decks[deckName] = [0, 0]
def summary(self, lastSync):
return self.runCmd("summary",
lastSync=self.stuff(lastSync))
def genOneWayPayload(self, lastSync):
return self.runCmd("genOneWayPayload",
lastSync=self.stuff(lastSync))
def modified(self):
self.connect()
return self.decks[self.deckName][0]
def _lastSync(self):
self.connect()
return self.decks[self.deckName][1]
def applyPayload(self, payload):
return self.runCmd("applyPayload",
payload=self.stuff(payload))
def finish(self):
assert self.runCmd("finish") == "OK"
def runCmd(self, action, **args):
data = {"p": self.password,
"u": self.username,
"v": 2}
if self.deckName:
data['d'] = self.deckName.encode("utf-8")
else:
data['d'] = None
data.update(args)
data = urllib.urlencode(data)
try:
f = urllib2.urlopen(SYNC_URL + action, data)
except (urllib2.URLError, socket.error, socket.timeout,
httplib.BadStatusLine), e:
raise SyncError(type="connectionError",
exc=`e`)
ret = f.read()
if not ret:
raise SyncError(type="noResponse")
try:
return self.unstuff(ret)
except Exception, e:
raise SyncError(type="connectionError",
exc=`e`)
# Full syncing
##########################################################################
# not yet ported
class FullSyncer(object):
def __init__(self, deck):
self.deck = deck
def prepareFullSync(self):
t = time.time()
# ensure modified is not greater than server time
self.deck.modified = min(self.deck.modified, self.server.timestamp)
self.deck.db.commit()
self.deck.close()
fields = {
"p": self.server.password,
"u": self.server.username,
"d": self.server.deckName.encode("utf-8"),
}
if self.localTime > self.remoteTime:
return ("fromLocal", fields, self.deck.path)
else:
return ("fromServer", fields, self.deck.path)
def fullSync(self):
ret = self.prepareFullSync()
if ret[0] == "fromLocal":
self.fullSyncFromLocal(ret[1], ret[2])
else:
self.fullSyncFromServer(ret[1], ret[2])
def fullSyncFromLocal(self, fields, path):
global sendProgressHook
try:
# write into a temporary file, since POST needs content-length
src = open(path, "rb")
name = namedtmp("fullsync.anki")
tmp = open(name, "wb")
# post vars
for (key, value) in fields.items():
tmp.write('--' + MIME_BOUNDARY + "\r\n")
tmp.write('Content-Disposition: form-data; name="%s"\r\n' % key)
tmp.write('\r\n')
tmp.write(value)
tmp.write('\r\n')
# file header
tmp.write('--' + MIME_BOUNDARY + "\r\n")
tmp.write(
'Content-Disposition: form-data; name="deck"; filename="deck"\r\n')
tmp.write('Content-Type: application/octet-stream\r\n')
tmp.write('\r\n')
# data
comp = zlib.compressobj()
while 1:
data = src.read(CHUNK_SIZE)
if not data:
tmp.write(comp.flush())
break
tmp.write(comp.compress(data))
src.close()
tmp.write('\r\n--' + MIME_BOUNDARY + '--\r\n\r\n')
size = tmp.tell()
tmp.seek(0)
# open http connection
runHook("fullSyncStarted", size)
headers = {
'Content-type': 'multipart/form-data; boundary=%s' %
MIME_BOUNDARY,
'Content-length': str(size),
'Host': SYNC_HOST,
}
req = urllib2.Request(SYNC_URL + "fullup?v=2", tmp, headers)
try:
sendProgressHook = fullSyncProgressHook
res = urllib2.urlopen(req).read()
assert res.startswith("OK")
# update lastSync
c = sqlite.connect(path)
c.execute("update decks set lastSync = ?",
(res[3:],))
c.commit()
c.close()
finally:
sendProgressHook = None
tmp.close()
finally:
runHook("fullSyncFinished")
def fullSyncFromServer(self, fields, path):
try:
runHook("fullSyncStarted", 0)
fields = urllib.urlencode(fields)
src = urllib.urlopen(SYNC_URL + "fulldown", fields)
tmpname = namedtmp("fullsync.anki")
tmp = open(tmpname, "wb")
decomp = zlib.decompressobj()
cnt = 0
while 1:
data = src.read(CHUNK_SIZE)
if not data:
tmp.write(decomp.flush())
break
tmp.write(decomp.decompress(data))
cnt += CHUNK_SIZE
runHook("fullSyncProgress", "fromServer", cnt)
src.close()
tmp.close()
os.close(fd)
# if we were successful, overwrite old deck
os.unlink(path)
os.rename(tmpname, path)
# reset the deck name
c = sqlite.connect(path)
c.execute("update decks set syncName = ?",
[checksum(path.encode("utf-8"))])
c.commit()
c.close()
finally:
runHook("fullSyncFinished")
##########################################################################
# Monkey-patch httplib to incrementally send instead of chewing up large
# amounts of memory, and track progress.
sendProgressHook = None
def incrementalSend(self, strOrFile):
if self.sock is None:
if self.auto_open:
self.connect()
else:
raise NotConnected()
if self.debuglevel > 0:
print "send:", repr(str)
try:
if (isinstance(strOrFile, str) or
isinstance(strOrFile, unicode)):
self.sock.sendall(strOrFile)
else:
cnt = 0
t = time.time()
while 1:
if sendProgressHook and time.time() - t > 1:
sendProgressHook(cnt)
t = time.time()
data = strOrFile.read(CHUNK_SIZE)
cnt += len(data)
if not data:
break
self.sock.sendall(data)
except socket.error, v:
if v[0] == 32: # Broken pipe
self.close()
raise
httplib.HTTPConnection.send = incrementalSend
def fullSyncProgressHook(cnt):
runHook("fullSyncProgress", "fromLocal", cnt)