rework media syncing so we can resume

This commit is contained in:
Damien Elmes 2011-12-06 22:18:41 +09:00
parent c0edcae238
commit be66c960a9
3 changed files with 116 additions and 115 deletions

View file

@ -180,24 +180,115 @@ If the same name exists, compare checksums."""
cnt += 1
return cnt
# Tracking changes (public)
# Media syncing - changes and removal
##########################################################################
def added(self):
self.findChanges()
return self.db.execute("select * from log where type = ?", MEDIA_ADD)
def removed(self):
self.findChanges()
return self.db.list("select * from log where type = ?", MEDIA_REM)
def clearLog(self):
self.db.execute("delete from log")
self.db.commit()
def hasChanged(self):
return self.db.scalar("select 1 from log limit 1")
def removed(self):
return self.db.list("select * from log where type = ?", MEDIA_REM)
def syncRemove(self, fnames):
# remove provided deletions
for f in fnames:
if os.path.exists(f):
os.unlink(f)
self.db.execute("delete from log where fname = ?", f)
self.db.execute("delete from media where fname = ?", f)
# and all locally-logged deletions, as server has acked them
self.db.execute("delete from log where type = ?", MEDIA_REM)
self.db.commit()
# Media syncing - unbundling zip files from server
##########################################################################
def syncAdd(self, zipData):
"Extract zip data; true if finished."
f = StringIO(zipData)
z = zipfile.ZipFile(f, "r")
finished = False
meta = None
media = []
sizecnt = 0
# get meta info first
assert z.getinfo("_meta").file_size < 100000
meta = simplejson.loads(z.read("_meta"))
nextUsn = int(z.read("_usn"))
# then loop through all files
for i in z.infolist():
# check for zip bombs
sizecnt += i.file_size
assert sizecnt < 100*1024*1024
if i.filename == "_meta" or i.filename == "_usn":
# ignore previously-retrieved meta
continue
elif i.filename == "_finished":
# last zip in set
finished = True
else:
data = z.read(i)
csum = checksum(data)
name = meta[i.filename]
# malicious chars?
for c in '/\\':
assert c not in name
# save file
open(name, "wb").write(data)
# update db
media.append((name, csum, self._mtime(name)))
# remove entries from local log
self.db.execute("delete from log where fname = ?", name)
# update media db and note new starting usn
if media:
self.db.executemany(
"insert or replace into media values (?,?,?)", media)
self.setUsn(nextUsn) # commits
# if we have finished adding, we need to record the new folder mtime
# so that we don't trigger a needless scan
if finished:
self.syncMod()
return finished
# Media syncing - bundling zip files to send to server
##########################################################################
# Because there's no standard filename encoding for zips, and because not
# all zip clients support retrieving mtime, we store the files as ascii
# and place a json file in the zip with the necessary information.
def zipAdded(self):
"Add files to a zip until over SYNC_ZIP_SIZE. Return zip data."
f = StringIO()
z = zipfile.ZipFile(f, "w")
sz = 0
cnt = 0
files = {}
cur = self.db.execute(
"select fname from log where type = ? limit 300", MEDIA_ADD)
fnames = []
while 1:
fname = cur.fetchone()
if not fname:
z.writestr("_finished", "")
break
fname = fname[0]
fnames.append([fname])
z.write(fname, str(cnt))
files[str(cnt)] = fname
sz += os.path.getsize(fname)
if sz > SYNC_ZIP_SIZE:
break
cnt += 1
z.writestr("_meta", simplejson.dumps(files))
z.close()
return f.getvalue(), fnames
def forgetAdded(self, fnames):
if not fnames:
return
self.db.executemany("delete from log where fname = ?", fnames)
self.db.commit()
# Tracking changes (private)
##########################################################################
@ -290,90 +381,3 @@ create table log (fname text primary key, type int);
if not v[2]:
removed.append(k)
return added, removed
# Adding/removing files in media sync
##########################################################################
def syncRemove(self, fnames):
for f in fnames:
if os.path.exists(f):
os.unlink(f)
self.db.execute("delete from log where fname = ?", f)
self.db.execute("delete from media where fname = ?", f)
self.db.commit()
def syncAdd(self, zipData):
"Extract zip data; true if finished."
f = StringIO(zipData)
z = zipfile.ZipFile(f, "r")
finished = False
meta = None
media = []
sizecnt = 0
# get meta info first
assert z.getinfo("_meta").file_size < 100000
meta = simplejson.loads(z.read("_meta"))
# then loop through all files
for i in z.infolist():
# check for zip bombs
sizecnt += i.file_size
assert sizecnt < 100*1024*1024
if i.filename == "_meta":
# ignore previously-retrieved meta
continue
elif i.filename == "_finished":
# last zip in set
finished = True
else:
data = z.read(i)
csum = checksum(data)
name = meta[i.filename]
# malicious chars?
for c in '/\\':
assert c not in name
# save file
open(name, "wb").write(data)
# update db
media.append((name, csum, self._mtime(name)))
# remove entries from local log
self.db.execute("delete from log where fname = ?", name)
# update media db
if media:
self.db.executemany(
"insert or replace into media values (?,?,?)", media)
self.db.commit()
# if we have finished adding, we need to record the new folder mtime
# so that we don't trigger a needless scan
if finished:
self.syncMod()
# also need to clear log after sync finished
return finished
# Streaming zips
##########################################################################
# Because there's no standard filename encoding for zips, and because not
# all zip clients support retrieving mtime, we store the files as ascii
# and place a json file in the zip with the necessary information.
def zipFromAdded(self, cur):
"Add files to a zip until over SYNC_ZIP_SIZE. Return zip data."
f = StringIO()
z = zipfile.ZipFile(f, "w")
sz = 0
cnt = 0
files = {}
while 1:
fname = cur.fetchone()
if not fname:
z.writestr("_finished", "")
break
fname = fname[0]
z.write(fname, str(cnt))
files[str(cnt)] = fname
sz += os.path.getsize(fname)
if sz > SYNC_ZIP_SIZE:
break
cnt += 1
z.writestr("_meta", simplejson.dumps(files))
z.close()
return f.getvalue()

View file

@ -561,23 +561,24 @@ class MediaSyncer(object):
runHook("sync", "server")
while 1:
runHook("sync", "streamMedia")
zip = self.server.files()
usn = self.col.media.usn()
zip = self.server.files(minUsn=usn)
if self.addFiles(zip=zip):
break
# step 4: stream files to the server
runHook("sync", "client")
while 1:
runHook("sync", "streamMedia")
zip = self.files()
zip, fnames = self.files()
usn = self.server.addFiles(zip=zip)
# after server has replied, safe to remove from log
self.col.media.forgetAdded(fnames)
# when server has run out of files, it returns bumped usn
if usn is not False:
# when server has run out of files, it returns bumped usn
break
# step 5: finalize
# update usn from addFiles() and cached mtime
self.col.media.setUsn(usn)
self.col.media.clearLog()
# clear cursor so successive calls work
self.added = None
#self.col.media.syncMod()
return "success"
def removed(self):
@ -587,13 +588,10 @@ class MediaSyncer(object):
self.col.media.syncRemove(fnames)
if minUsn is not None:
# we're the server
self.minUsn = minUsn
return self.col.media.removed()
def files(self):
if not self.added:
self.added = self.col.media.added()
return self.col.media.zipFromAdded(self.added)
return self.col.media.zipAdded()
def addFiles(self, zip):
"True if zip is the last in set. Server returns new usn instead."
@ -614,8 +612,9 @@ class RemoteMediaServer(MediaSyncer, HttpSyncer):
self.con, "remove", StringIO(simplejson.dumps(kw)),
self._vars()))
def files(self):
return self.postData(self.con, "files", None, self._vars())
def files(self, **kw):
return self.postData(
self.con, "files", StringIO(simplejson.dumps(kw)), self._vars())
def addFiles(self, zip):
return simplejson.loads(

View file

@ -162,8 +162,6 @@ def test_media():
ts.deck1.media.close()
os.unlink(ts.deck1.media.dir()+".db")
ts.deck1.media.connect()
changes = ts.deck1.media.added().fetchall()
assert len(changes) == 2
assert ts.client.sync(ts.server2.meta()[4]) == "success"
assert ts.client.sync(ts.server2.meta()[4]) == "noChanges"
assert len(os.listdir(ts.deck1.media.dir())) == 2