diff --git a/anki/consts.py b/anki/consts.py index aeae40ef9..c3ada3dac 100644 --- a/anki/consts.py +++ b/anki/consts.py @@ -2,6 +2,8 @@ # Copyright: Damien Elmes # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +import os + # whether new cards should be mixed with reviews, or shown first or last NEW_CARDS_DISTRIBUTE = 0 NEW_CARDS_LAST = 1 @@ -33,6 +35,15 @@ COUNT_REMAINING = 1 MEDIA_ADD = 0 MEDIA_REM = 1 +# syncing vars +SYNC_ZIP_SIZE = 10*1024*1024 +CHUNK_SIZE = 65536 +MIME_BOUNDARY = "Anki-sync-boundary" +SYNC_HOST = os.environ.get("SYNC_HOST") or "dev.ankiweb.net" +SYNC_PORT = int(os.environ.get("SYNC_PORT") or 80) +SYNC_URL = "http://%s:%d/sync/" % (SYNC_HOST, SYNC_PORT) +SYNC_VER = 0 + # Labels ########################################################################## diff --git a/anki/db.py b/anki/db.py index 4c0a315d4..142696db4 100644 --- a/anki/db.py +++ b/anki/db.py @@ -68,3 +68,13 @@ class DB(object): def set_progress_handler(self, *args): self._db.set_progress_handler(*args) + + def __enter__(self): + self._db.execute("begin") + return self + + def __exit__(self, exc_type, *args): + if not exc_type: + # no exception, so commit + self._db.commit() + self._db.close() diff --git a/anki/media.py b/anki/media.py index 93d0fdb9e..37bee9d4f 100644 --- a/anki/media.py +++ b/anki/media.py @@ -3,7 +3,8 @@ # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html import os, shutil, re, urllib, urllib2, time, unicodedata, \ - urllib, sys, shutil + urllib, sys, shutil, simplejson, zipfile +from cStringIO import StringIO from anki.utils import checksum, intTime, namedtmp, isWin from anki.lang import _ from anki.db import DB @@ -169,7 +170,7 @@ If the same name exists, compare checksums.""" def removed(self): self.findChanges() - return self.db.execute("select * from log where type = ?", MEDIA_REM) + return self.db.list("select * from log where type = ?", MEDIA_REM) def clearLog(self): self.db.execute("delete from log") @@ -181,7 +182,7 @@ If the same name exists, compare checksums.""" # in the log, a mod time of zero indicates a delete self.db.executescript(""" create table media (fname text primary key, csum text, mod int); -create table meta (dirMod int); insert into meta values (0); +create table meta (dirMod int, usn int); insert into meta values (0, 0); create table log (fname text primary key, type int); """) @@ -191,6 +192,15 @@ create table log (fname text primary key, type int); def _checksum(self, path): return checksum(open(path, "rb").read()) + def usn(self): + return self.db.scalar("select usn from meta") + + def setUsn(self, usn): + self.db.execute("update meta set usn = ?", usn) + + def syncMod(self): + self.db.execute("update meta set dirMod = ?", self._mtime(self.dir())) + def _changed(self): "Return dir mtime if it has changed since the last findChanges()" # doesn't track edits, but user can add or remove a file to update @@ -256,3 +266,95 @@ create table log (fname text primary key, type int); if not v[2]: removed.append(k) return added, removed + + # Adding/removing files in media sync + ########################################################################## + + def syncRemove(self, fnames): + for f in fnames: + if os.path.exists(f): + os.unlink(f) + self.db.execute("delete from log where fname = ?", f) + self.db.execute("delete from media where fname = ?", f) + + def syncAdd(self, zipData): + "Extra zip data; true if finished." + f = StringIO(zipData) + z = zipfile.ZipFile(f, "r") + finished = False + meta = None + media = [] + sizecnt = 0 + # get meta info first + assert z.getinfo("_meta").file_size < 100000 + meta = simplejson.loads(z.read("_meta")) + # then loop through all files + for i in z.infolist(): + # check for zip bombs + sizecnt += i.file_size + assert sizecnt < 100*1024*1024 + if i.filename == "_meta": + # ignore previously-retrieved meta + continue + elif i.filename == "_finished": + # last zip in set + finished = True + else: + # prepare sql + data = z.read(i) + csum = checksum(data) + mod = meta[i.filename]['mod'] + name = meta[i.filename]['name'] + # malicious chars? + for c in '/\\': + assert c not in name + media.append((name, csum, mod)) + # remove entries from local log + self.db.execute("delete from log where fname = ?", name) + # save file + open(name, "wb").write(data) + # set mod time if possible; may fail on some filesystems + try: + os.utime(name, (mod, mod)) + except: + print "failed to set utime" + # update media db + if media: + self.db.executemany( + "insert or replace into media values (?,?,?)", media) + # if we have finished adding, we need to record the new folder mtime + # so that we don't trigger a needless scan + if finished: + self.syncMod() + # also need to clear log after sync finished + + return finished + + # Streaming zips + ########################################################################## + # Because there's no standard filename encoding for zips, and because not + # all zip clients support retrieving mtime, we store the files as ascii + # and place a json file in the zip with the necessary information. + + def zipFromAdded(self, cur): + "Add files to a zip until over SYNC_ZIP_SIZE. Return zip data." + f = StringIO() + z = zipfile.ZipFile(f, "w") + sz = 0 + cnt = 0 + files = {} + while 1: + fname = cur.fetchone() + if not fname: + z.writestr("_finished", "") + break + z.write(fname, str(cnt)) + files[str(c)] = dict( + name=fname, mod=self._mtime(fname)) + sz += os.path.getsize(fname) + if sz > SYNC_ZIP_SIZE: + break + cnt += 1 + z.writestr("_meta", simplejson.dumps(files)) + z.close() + return f.getvalue() diff --git a/anki/sound.py b/anki/sound.py index d0523de82..f39c804a1 100644 --- a/anki/sound.py +++ b/anki/sound.py @@ -78,6 +78,8 @@ mplayerReader = None mplayerEvt = threading.Event() mplayerClear = False +# fixme from robert: can we do away with this with stderr=file(os.devnull, +# 'w') in the popen call? class MplayerReader(threading.Thread): "Read any debugging info to prevent mplayer from blocking." diff --git a/anki/sync.py b/anki/sync.py index 45750376d..3b14fa79a 100644 --- a/anki/sync.py +++ b/anki/sync.py @@ -2,7 +2,7 @@ # Copyright: Damien Elmes # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html -import urllib, simplejson, os, sys, httplib2, zipfile, gzip +import urllib, simplejson, os, sys, httplib2, gzip from cStringIO import StringIO from datetime import date from anki.db import DB @@ -15,13 +15,6 @@ from hooks import runHook if simplejson.__version__ < "1.7.3": raise Exception("SimpleJSON must be 1.7.3 or later.") -CHUNK_SIZE = 65536 -MIME_BOUNDARY = "Anki-sync-boundary" -SYNC_HOST = os.environ.get("SYNC_HOST") or "dev.ankiweb.net" -SYNC_PORT = int(os.environ.get("SYNC_PORT") or 80) -SYNC_URL = "http://%s:%d/sync/" % (SYNC_HOST, SYNC_PORT) -SYNC_VER = 0 - # - 64 bit guid will be munged in js; need to escape or rethink # - make sure /sync/download is compressed @@ -34,16 +27,13 @@ SYNC_VER = 0 # changed, since by default closing the deck bumps the mod time # - ensure the user doesn't add foreign chars to passsword +# Incremental syncing ########################################################################## from anki.consts import * class Syncer(object): - MAX_REVLOG = 5000 - MAX_CARDS = 5000 - MAX_FACTS = 2500 - def __init__(self, deck, server=None): self.deck = deck self.server = server @@ -383,6 +373,7 @@ from facts where %s""" % d) ########################################################################## class LocalServer(Syncer): + # serialize/deserialize payload, so we don't end up sharing objects # between decks def applyChanges(self, minUsn, lnewer, changes): @@ -404,6 +395,9 @@ class HttpSyncer(object): self.hkey = cont return cont + def _vars(self): + return dict(k=self.hkey) + # Posting data as a file ###################################################################### # We don't want to post the payload as a form var, as the percent-encoding is @@ -422,23 +416,24 @@ class HttpSyncer(object): 'Content-Disposition: form-data; name="%s"\r\n\r\n%s\r\n' % (key, value)) # file header - buf.write(bdry + "\r\n") - buf.write("""\ + if fobj: + buf.write(bdry + "\r\n") + buf.write("""\ Content-Disposition: form-data; name="data"; filename="data"\r\n\ Content-Type: application/octet-stream\r\n\r\n""") - # write file into buffer, optionally compressing - if comp: - tgt = gzip.GzipFile(mode="wb", fileobj=buf, compresslevel=comp) - else: - tgt = buf - while 1: - data = fobj.read(CHUNK_SIZE) - if not data: - if comp: - tgt.close() - break - tgt.write(data) - buf.write('\r\n' + bdry + '--\r\n') + # write file into buffer, optionally compressing + if comp: + tgt = gzip.GzipFile(mode="wb", fileobj=buf, compresslevel=comp) + else: + tgt = buf + while 1: + data = fobj.read(CHUNK_SIZE) + if not data: + if comp: + tgt.close() + break + tgt.write(data) + buf.write('\r\n' + bdry + '--\r\n') size = buf.tell() # connection headers headers = { @@ -453,7 +448,11 @@ Content-Type: application/octet-stream\r\n\r\n""") raise Exception("Invalid response code: %s" % resp['status']) return cont +# Incremental sync over HTTP +###################################################################### + class RemoteServer(Syncer, HttpSyncer): + def __init__(self, user, hkey): self.user = user self.hkey = hkey @@ -490,9 +489,6 @@ class RemoteServer(Syncer, HttpSyncer): def finish(self, **kw): return self._run("finish", kw) - def _vars(self): - return dict(k=self.hkey) - def _run(self, cmd, data): return simplejson.loads( self.postData(self.con, cmd, StringIO(simplejson.dumps(data)), @@ -507,9 +503,6 @@ class FullSyncer(HttpSyncer): self.deck = deck self.hkey = hkey - def _vars(self): - return dict(k=self.hkey) - def _con(self): return httplib2.Http(timeout=60) @@ -535,7 +528,78 @@ class FullSyncer(HttpSyncer): # Media syncing ########################################################################## -class MediaSyncer(HttpSyncer): - pass +class MediaSyncer(object): + def __init__(self, deck, server=None): + self.deck = deck + self.server = server + self.added = None + def sync(self): + # step 1: send/recv deletions + runHook("mediaSync", "remove") + usn = self.deck.media.usn() + lrem = self.removed() + rrem = self.server.remove(fnames=lrem, minUsn=usn) + self.remove(rrem) + # step 2: stream files from server + runHook("mediaSync", "server") + while 1: + runHook("mediaSync", "stream") + zip = self.server.files() + if self.addFiles(zip=zip): + break + # step 3: stream files to the server + runHook("mediaSync", "client") + while 1: + runHook("mediaSync", "stream") + zip = self.files() + usn = self.server.addFiles(zip=zip) + if usn: + # when server has run out of files, it returns bumped usn + break + self.deck.media.setUsn(usn) + self.deck.media.clearLog() + + # fixme: need to commit to media db? or is already in that commit mode? + + def removed(self): + return self.deck.media.removed() + + def remove(self, fnames, minUsn=None): + self.deck.media.syncRemove(fnames) + if minUsn is not None: + # we're the server + self.minUsn = minUsn + return self.deck.media.removed() + + def files(self): + if not self.added: + self.added = self.deck.media.added() + return self.deck.media.zipFromAdded(self.added) + + def addFiles(self, zip): + "True if zip is the last in set. Server returns new usn instead." + return self.deck.media.syncAdd(zip) + +# Remote media syncing +########################################################################## + +class RemoteMediaServer(MediaSyncer, HttpSyncer): + + def __init__(self, hkey): + self.hkey = hkey + self.con = httplib2.Http(timeout=60) + + def remove(self, **kw): + return simplejson.loads( + self.postData( + self.con, "remove", StringIO(simplejson.dumps(kw)), + self._vars())) + + def files(self): + return self.postData(self.con, "files", None, self._vars()) + + def addFiles(self, zip): + return self.postData(self.con, "files", StringIO(zip), + self._vars(), comp=0) diff --git a/tests/test_sync.py b/tests/test_sync.py index 9dc20c98e..ea003e042 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -6,7 +6,8 @@ from tests.shared import assertException from anki.errors import * from anki import Deck from anki.utils import intTime -from anki.sync import Syncer, FullSyncer, LocalServer, RemoteServer +from anki.sync import Syncer, FullSyncer, LocalServer, RemoteServer, \ + MediaSyncer, RemoteMediaServer from anki.facts import Fact from anki.cards import Card from tests.shared import getEmptyDeck @@ -322,3 +323,41 @@ def test_remoteSync(): f.download() d = Deck(client.deck.path) assert d.mod == lmod + +# Media tests +########################################################################## +# We can't run many tests for local media, because the desktop code assumes +# the current directory is the media folder + +def setup_media(): + global client, server + setup_basic() + server = MediaSyncer(deck2) + client = MediaSyncer(deck1, server) + +@nose.with_setup(setup_media) +def test_mediaNothing(): + client.sync() + +# Remote media tests +########################################################################## + +def setup_remoteMedia(): + global client, server + setup_basic() + server = RemoteMediaServer(TEST_HKEY) + client = MediaSyncer(deck1, server) + +@nose.with_setup(setup_remoteMedia) +def test_remoteMediaNothing(): + client.sync() + +# @nose.with_setup(setup_media) +# def test_mediaAdd(): +# open(os.path.join(deck1.media.dir(), "foo.jpg"), "wb").write("foo") +# assert len(os.listdir(deck1.media.dir())) == 1 +# assert len(os.listdir(deck2.media.dir())) == 0 +# client.sync() +# assert len(os.listdir(deck1.media.dir())) == 1 +# assert len(os.listdir(deck2.media.dir())) == 1 +