sync tweaks

This commit is contained in:
Damien Elmes 2011-12-03 21:06:27 +09:00
parent d148a6cf1b
commit 6965a9c102
2 changed files with 45 additions and 38 deletions

View file

@ -35,16 +35,14 @@ class Syncer(object):
self.col = col self.col = col
self.server = server self.server = server
def status(self, type):
"Override to trace sync progress."
#print "sync:", type
pass
def sync(self): def sync(self):
"Returns 'noChanges', 'fullSync', or 'success'." "Returns 'noChanges', 'fullSync', or 'success'."
# step 1: login & metadata # step 1: login & metadata
self.status("login") runHook("sync", "login")
self.rmod, rscm, self.maxUsn, rts, self.mediaUsn = self.server.meta() ret = self.server.meta()
if not ret:
return "badAuth"
self.rmod, rscm, self.maxUsn, rts, self.mediaUsn = ret
self.lmod, lscm, self.minUsn, lts, dummy = self.meta() self.lmod, lscm, self.minUsn, lts, dummy = self.meta()
if abs(rts - lts) > 300: if abs(rts - lts) > 300:
return "clockOff" return "clockOff"
@ -54,34 +52,34 @@ class Syncer(object):
return "fullSync" return "fullSync"
self.lnewer = self.lmod > self.rmod self.lnewer = self.lmod > self.rmod
# step 2: deletions and small objects # step 2: deletions and small objects
self.status("meta") runHook("sync", "meta")
lchg = self.changes() lchg = self.changes()
rchg = self.server.applyChanges( rchg = self.server.applyChanges(
minUsn=self.minUsn, lnewer=self.lnewer, changes=lchg) minUsn=self.minUsn, lnewer=self.lnewer, changes=lchg)
self.mergeChanges(lchg, rchg) self.mergeChanges(lchg, rchg)
# step 3: stream large tables from server # step 3: stream large tables from server
self.status("server") runHook("sync", "server")
while 1: while 1:
self.status("stream") runHook("sync", "stream")
chunk = self.server.chunk() chunk = self.server.chunk()
self.applyChunk(chunk=chunk) self.applyChunk(chunk=chunk)
if chunk['done']: if chunk['done']:
break break
# step 4: stream to server # step 4: stream to server
self.status("client") runHook("sync", "client")
while 1: while 1:
self.status("stream") runHook("sync", "stream")
chunk = self.chunk() chunk = self.chunk()
self.server.applyChunk(chunk=chunk) self.server.applyChunk(chunk=chunk)
if chunk['done']: if chunk['done']:
break break
# step 5: sanity check during beta testing # step 5: sanity check during beta testing
self.status("sanity") runHook("sync", "sanity")
c = self.sanityCheck() c = self.sanityCheck()
s = self.server.sanityCheck() s = self.server.sanityCheck()
assert c == s assert c == s
# finalize # finalize
self.status("finalize") runHook("sync", "finalize")
mod = self.server.finish() mod = self.server.finish()
self.finish(mod) self.finish(mod)
return "success" return "success"
@ -381,16 +379,6 @@ class LocalServer(Syncer):
class HttpSyncer(object): class HttpSyncer(object):
# retrieving a host key for future operations
def hostKey(self, pw):
h = httpCon()
resp, cont = h.request(
SYNC_URL+"hostKey?" + urllib.urlencode(dict(u=self.user,p=pw)))
if resp['status'] != '200':
raise Exception("Invalid response code: %s" % resp['status'])
self.hkey = simplejson.loads(cont)['key']
return self.hkey
def _vars(self): def _vars(self):
return dict(k=self.hkey) return dict(k=self.hkey)
@ -449,23 +437,38 @@ Content-Type: application/octet-stream\r\n\r\n""")
class RemoteServer(Syncer, HttpSyncer): class RemoteServer(Syncer, HttpSyncer):
def __init__(self, user, hkey): def __init__(self, hkey):
self.user = user
self.hkey = hkey self.hkey = hkey
self.con = httpCon() self.con = httpCon()
def hostKey(self, user, pw):
"Returns hkey or none if user/pw incorrect."
user = user.encode("utf-8")
pw = pw.encode("utf-8")
resp, cont = self.con.request(
SYNC_URL+"hostKey?" + urllib.urlencode(dict(u=user,p=pw)))
if resp['status'] == '200':
self.hkey = simplejson.loads(cont)['key']
return self.hkey
elif resp['status'] == '403':
# invalid auth
return
else:
raise Exception("Unknown response code: %s" % resp['status'])
return
def meta(self): def meta(self):
resp, cont = self.con.request( resp, cont = self.con.request(
SYNC_URL+"meta?" + urllib.urlencode(dict(u=self.user,v=SYNC_VER))) SYNC_URL+"meta?" + urllib.urlencode(dict(k=self.hkey,v=SYNC_VER)))
# fixme: convert these into easily-catchable errors if resp['status'] == '403':
if resp['status'] in ('503', '504'): # auth failure
return
elif resp['status'] in ('503', '504'):
raise Exception("Server is too busy; please try again later.") raise Exception("Server is too busy; please try again later.")
elif resp['status'] == '501': elif resp['status'] == '501':
raise Exception("Your client is out of date; please upgrade.") raise Exception("Your client is out of date; please upgrade.")
elif resp['status'] == '403':
raise Exception("Invalid key; please authenticate.")
elif resp['status'] != '200': elif resp['status'] != '200':
raise Exception("Invalid response code: %s" % resp['status']) raise Exception("Unknown response code: %s" % resp['status'])
return simplejson.loads(cont) return simplejson.loads(cont)
def applyChanges(self, **kw): def applyChanges(self, **kw):

View file

@ -34,19 +34,23 @@ def setup_remote():
setup_basic() setup_basic()
# mark deck1 as changed # mark deck1 as changed
ts.deck1.save() ts.deck1.save()
ts.server = RemoteServer(TEST_USER, TEST_HKEY) ts.server = RemoteServer(TEST_HKEY)
ts.client.server = ts.server ts.client.server = ts.server
@nose.with_setup(setup_remote) @nose.with_setup(setup_remote)
def test_meta(): def test_meta():
global TEST_REMOTE global TEST_REMOTE
try: try:
(mod, scm, usn, tstamp, dummy) = ts.server.meta() # if the key is wrong, meta returns nothing
ts.server.hkey = "abc"
assert not ts.server.meta()
except Exception, e: except Exception, e:
if e.errno == 61: if e.errno == 61:
TEST_REMOTE = False TEST_REMOTE = False
print "aborting; server offline" print "aborting; server offline"
return return
ts.server.hkey = TEST_HKEY
(mod, scm, usn, tstamp, mediaUSN) = ts.server.meta()
assert mod assert mod
assert scm assert scm
assert mod != ts.client.col.mod assert mod != ts.client.col.mod
@ -56,9 +60,9 @@ def test_meta():
def test_hkey(): def test_hkey():
if not TEST_REMOTE: if not TEST_REMOTE:
return return
assertException(Exception, lambda: ts.server.hostKey("wrongpass")) assert not ts.server.hostKey(TEST_USER, "wrongpass")
ts.server.hkey = "abc" ts.server.hkey = "willchange"
k = ts.server.hostKey(TEST_PASS) k = ts.server.hostKey(TEST_USER, TEST_PASS)
assert k == ts.server.hkey == TEST_HKEY assert k == ts.server.hkey == TEST_HKEY
@nose.with_setup(setup_remote) @nose.with_setup(setup_remote)
@ -103,7 +107,7 @@ def setup_remoteMedia():
setup_basic() setup_basic()
con = httpCon() con = httpCon()
ts.server = RemoteMediaServer(TEST_HKEY, con) ts.server = RemoteMediaServer(TEST_HKEY, con)
ts.server2 = RemoteServer(TEST_USER, TEST_HKEY) ts.server2 = RemoteServer(TEST_HKEY)
ts.client = MediaSyncer(ts.deck1, ts.server) ts.client = MediaSyncer(ts.deck1, ts.server)
@nose.with_setup(setup_remoteMedia) @nose.with_setup(setup_remoteMedia)