Anki/pylib/anki/sync.py
Damien Elmes 5876866565 tweaking the folder names again
hopefully that's the last of it
2020-01-03 07:48:38 +10:00

950 lines
30 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
import gzip
import io
import json
import os
import random
import sqlite3
from typing import Any, Dict, List, Optional, Tuple, Union
import requests
import anki
from anki.consts import *
from anki.db import DB, DBError
from anki.utils import checksum, devMode, ids2str, intTime, platDesc, versionWithBuild
from .hooks import runHook
from .lang import ngettext
# syncing vars
HTTP_TIMEOUT = 90
HTTP_BUF_SIZE = 64 * 1024
class UnexpectedSchemaChange(Exception):
pass
# Incremental syncing
##########################################################################
class Syncer:
cursor: Optional[sqlite3.Cursor]
def __init__(self, col: "anki.storage._Collection", server=None) -> None:
self.col = col
self.server = server
# these are set later; provide dummy values for type checking
self.lnewer = False
self.maxUsn = 0
self.tablesLeft: List[str] = []
def sync(self) -> str:
"Returns 'noChanges', 'fullSync', 'success', etc"
self.syncMsg = ""
self.uname = ""
# if the deck has any pending changes, flush them first and bump mod
# time
self.col.save()
# step 1: login & metadata
runHook("sync", "login")
meta = self.server.meta()
self.col.log("rmeta", meta)
if not meta:
return "badAuth"
# server requested abort?
self.syncMsg = meta["msg"]
if not meta["cont"]:
return "serverAbort"
else:
# don't abort, but if 'msg' is not blank, gui should show 'msg'
# after sync finishes and wait for confirmation before hiding
pass
rscm = meta["scm"]
rts = meta["ts"]
self.rmod = meta["mod"]
self.maxUsn = meta["usn"]
self.uname = meta.get("uname", "")
self.hostNum = meta.get("hostNum")
meta = self.meta()
self.col.log("lmeta", meta)
self.lmod = meta["mod"]
self.minUsn = meta["usn"]
lscm = meta["scm"]
lts = meta["ts"]
if abs(rts - lts) > 300:
self.col.log("clock off")
return "clockOff"
if self.lmod == self.rmod:
self.col.log("no changes")
return "noChanges"
elif lscm != rscm:
self.col.log("schema diff")
return "fullSync"
self.lnewer = self.lmod > self.rmod
# step 1.5: check collection is valid
if not self.col.basicCheck():
self.col.log("basic check")
return "basicCheckFailed"
# step 2: startup and deletions
runHook("sync", "meta")
rrem = self.server.start(
minUsn=self.minUsn, lnewer=self.lnewer, offset=self.col.localOffset()
)
# apply deletions to server
lgraves = self.removed()
while lgraves:
gchunk, lgraves = self._gravesChunk(lgraves)
self.server.applyGraves(chunk=gchunk)
# then apply server deletions here
self.remove(rrem)
# ...and small objects
lchg = self.changes()
rchg = self.server.applyChanges(changes=lchg)
try:
self.mergeChanges(lchg, rchg)
except UnexpectedSchemaChange:
self.server.abort()
return self._forceFullSync()
# step 3: stream large tables from server
runHook("sync", "server")
while 1:
runHook("sync", "stream")
chunk = self.server.chunk()
self.col.log("server chunk", chunk)
self.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 4: stream to server
runHook("sync", "client")
while 1:
runHook("sync", "stream")
chunk = self.chunk()
self.col.log("client chunk", chunk)
self.server.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 5: sanity check
runHook("sync", "sanity")
c = self.sanityCheck()
ret = self.server.sanityCheck2(client=c)
if ret["status"] != "ok":
return self._forceFullSync()
# finalize
runHook("sync", "finalize")
mod = self.server.finish()
self.finish(mod)
return "success"
def _forceFullSync(self) -> str:
# roll back and force full sync
self.col.rollback()
self.col.modSchema(False)
self.col.save()
return "sanityCheckFailed"
def _gravesChunk(self, graves: Dict) -> Tuple[Dict, Optional[Dict]]:
lim = 250
chunk: Dict[str, Any] = dict(notes=[], cards=[], decks=[])
for cat in "notes", "cards", "decks":
if lim and graves[cat]:
chunk[cat] = graves[cat][:lim]
graves[cat] = graves[cat][lim:]
lim -= len(chunk[cat])
# anything remaining?
if graves["notes"] or graves["cards"] or graves["decks"]:
return chunk, graves
return chunk, None
def meta(self) -> dict:
return dict(
mod=self.col.mod,
scm=self.col.scm,
usn=self.col._usn,
ts=intTime(),
musn=0,
msg="",
cont=True,
)
def changes(self) -> dict:
"Bundle up small objects."
d: Dict[str, Any] = dict(
models=self.getModels(), decks=self.getDecks(), tags=self.getTags()
)
if self.lnewer:
d["conf"] = self.getConf()
d["crt"] = self.col.crt
return d
def mergeChanges(self, lchg, rchg) -> None:
# then the other objects
self.mergeModels(rchg["models"])
self.mergeDecks(rchg["decks"])
self.mergeTags(rchg["tags"])
if "conf" in rchg:
self.mergeConf(rchg["conf"])
# this was left out of earlier betas
if "crt" in rchg:
self.col.crt = rchg["crt"]
self.prepareToChunk()
def sanityCheck(self) -> Union[list, str]:
if not self.col.basicCheck():
return "failed basic check"
for t in "cards", "notes", "revlog", "graves":
if self.col.db.scalar("select count() from %s where usn = -1" % t):
return "%s had usn = -1" % t
for g in self.col.decks.all():
if g["usn"] == -1:
return "deck had usn = -1"
for t, usn in self.col.tags.allItems():
if usn == -1:
return "tag had usn = -1"
found = False
for m in self.col.models.all():
if m["usn"] == -1:
return "model had usn = -1"
if found:
self.col.models.save()
self.col.sched.reset()
# check for missing parent decks
self.col.sched.deckDueList()
# return summary of deck
return [
list(self.col.sched.counts()),
self.col.db.scalar("select count() from cards"),
self.col.db.scalar("select count() from notes"),
self.col.db.scalar("select count() from revlog"),
self.col.db.scalar("select count() from graves"),
len(self.col.models.all()),
len(self.col.decks.all()),
len(self.col.decks.allConf()),
]
def usnLim(self) -> str:
return "usn = -1"
def finish(self, mod: int) -> int:
self.col.ls = mod
self.col._usn = self.maxUsn + 1
# ensure we save the mod time even if no changes made
self.col.db.mod = True
self.col.save(mod=mod)
return mod
# Chunked syncing
##########################################################################
def prepareToChunk(self) -> None:
self.tablesLeft = ["revlog", "cards", "notes"]
self.cursor = None
def cursorForTable(self, table) -> sqlite3.Cursor:
lim = self.usnLim()
x = self.col.db.execute
d = (self.maxUsn, lim)
if table == "revlog":
return x(
"""
select id, cid, %d, ease, ivl, lastIvl, factor, time, type
from revlog where %s"""
% d
)
elif table == "cards":
return x(
"""
select id, nid, did, ord, mod, %d, type, queue, due, ivl, factor, reps,
lapses, left, odue, odid, flags, data from cards where %s"""
% d
)
else:
return x(
"""
select id, guid, mid, mod, %d, tags, flds, '', '', flags, data
from notes where %s"""
% d
)
def chunk(self) -> dict:
buf: Dict[str, Any] = dict(done=False)
lim = 250
while self.tablesLeft and lim:
curTable = self.tablesLeft[0]
if not self.cursor:
self.cursor = self.cursorForTable(curTable)
rows = self.cursor.fetchmany(lim)
fetched = len(rows)
if fetched != lim:
# table is empty
self.tablesLeft.pop(0)
self.cursor = None
# mark the objects as having been sent
self.col.db.execute(
"update %s set usn=? where usn=-1" % curTable, self.maxUsn
)
buf[curTable] = rows
lim -= fetched
if not self.tablesLeft:
buf["done"] = True
return buf
def applyChunk(self, chunk) -> None:
if "revlog" in chunk:
self.mergeRevlog(chunk["revlog"])
if "cards" in chunk:
self.mergeCards(chunk["cards"])
if "notes" in chunk:
self.mergeNotes(chunk["notes"])
# Deletions
##########################################################################
def removed(self) -> dict:
cards = []
notes = []
decks = []
curs = self.col.db.execute("select oid, type from graves where usn = -1")
for oid, type in curs:
if type == REM_CARD:
cards.append(oid)
elif type == REM_NOTE:
notes.append(oid)
else:
decks.append(oid)
self.col.db.execute("update graves set usn=? where usn=-1", self.maxUsn)
return dict(cards=cards, notes=notes, decks=decks)
def remove(self, graves) -> None:
# pretend to be the server so we don't set usn = -1
self.col.server = True # type: ignore
# notes first, so we don't end up with duplicate graves
self.col._remNotes(graves["notes"])
# then cards
self.col.remCards(graves["cards"], notes=False)
# and decks
for oid in graves["decks"]:
self.col.decks.rem(oid, childrenToo=False)
self.col.server = False # type: ignore
# Models
##########################################################################
def getModels(self) -> List:
mods = [m for m in self.col.models.all() if m["usn"] == -1]
for m in mods:
m["usn"] = self.maxUsn
self.col.models.save()
return mods
def mergeModels(self, rchg) -> None:
for r in rchg:
l = self.col.models.get(r["id"])
# if missing locally or server is newer, update
if not l or r["mod"] > l["mod"]:
# This is a hack to detect when the note type has been altered
# in an import without a full sync being forced. A future
# syncing algorithm should handle this in a better way.
if l:
if len(l["flds"]) != len(r["flds"]):
raise UnexpectedSchemaChange()
if len(l["tmpls"]) != len(r["tmpls"]):
raise UnexpectedSchemaChange()
self.col.models.update(r)
# Decks
##########################################################################
def getDecks(self) -> List[list]:
decks = [g for g in self.col.decks.all() if g["usn"] == -1]
for g in decks:
g["usn"] = self.maxUsn
dconf = [g for g in self.col.decks.allConf() if g["usn"] == -1]
for g in dconf:
g["usn"] = self.maxUsn
self.col.decks.save()
return [decks, dconf]
def mergeDecks(self, rchg) -> None:
for r in rchg[0]:
l = self.col.decks.get(r["id"], False)
# work around mod time being stored as string
if l and not isinstance(l["mod"], int):
l["mod"] = int(l["mod"])
# if missing locally or server is newer, update
if not l or r["mod"] > l["mod"]:
self.col.decks.update(r)
for r in rchg[1]:
try:
l = self.col.decks.getConf(r["id"])
except KeyError:
l = None
# if missing locally or server is newer, update
if not l or r["mod"] > l["mod"]:
self.col.decks.updateConf(r)
# Tags
##########################################################################
def getTags(self) -> List:
tags = []
for t, usn in self.col.tags.allItems():
if usn == -1:
self.col.tags.tags[t] = self.maxUsn
tags.append(t)
self.col.tags.save()
return tags
def mergeTags(self, tags) -> None:
self.col.tags.register(tags, usn=self.maxUsn)
# Cards/notes/revlog
##########################################################################
def mergeRevlog(self, logs) -> None:
self.col.db.executemany(
"insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)", logs
)
def newerRows(self, data, table, modIdx) -> List:
ids = (r[0] for r in data)
lmods = {}
for id, mod in self.col.db.execute(
"select id, mod from %s where id in %s and %s"
% (table, ids2str(ids), self.usnLim())
):
lmods[id] = mod
update = []
for r in data:
if r[0] not in lmods or lmods[r[0]] < r[modIdx]:
update.append(r)
self.col.log(table, data)
return update
def mergeCards(self, cards) -> None:
self.col.db.executemany(
"insert or replace into cards values "
"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
self.newerRows(cards, "cards", 4),
)
def mergeNotes(self, notes) -> None:
rows = self.newerRows(notes, "notes", 3)
self.col.db.executemany(
"insert or replace into notes values (?,?,?,?,?,?,?,?,?,?,?)", rows
)
self.col.updateFieldCache([f[0] for f in rows])
# Col config
##########################################################################
def getConf(self) -> Any:
return self.col.conf
def mergeConf(self, conf) -> None:
self.col.conf = conf
# Wrapper for requests that tracks upload/download progress
##########################################################################
class AnkiRequestsClient:
verify = True
timeout = 60
def __init__(self) -> None:
self.session = requests.Session()
def post(self, url, data, headers) -> Any:
data = _MonitoringFile(data) # pytype: disable=wrong-arg-types
headers["User-Agent"] = self._agentName()
return self.session.post(
url,
data=data,
headers=headers,
stream=True,
timeout=self.timeout,
verify=self.verify,
) # pytype: disable=wrong-arg-types
def get(self, url, headers=None) -> requests.models.Response:
if headers is None:
headers = {}
headers["User-Agent"] = self._agentName()
return self.session.get(
url, stream=True, headers=headers, timeout=self.timeout, verify=self.verify
)
def streamContent(self, resp) -> bytes:
resp.raise_for_status()
buf = io.BytesIO()
for chunk in resp.iter_content(chunk_size=HTTP_BUF_SIZE):
runHook("httpRecv", len(chunk))
buf.write(chunk)
return buf.getvalue()
def _agentName(self) -> str:
from anki import version
return "Anki {}".format(version)
# allow user to accept invalid certs in work/school settings
if os.environ.get("ANKI_NOVERIFYSSL"):
AnkiRequestsClient.verify = False
import warnings
warnings.filterwarnings("ignore")
class _MonitoringFile(io.BufferedReader):
def read(self, size=-1) -> bytes:
data = io.BufferedReader.read(self, HTTP_BUF_SIZE)
runHook("httpSend", len(data))
return data
# HTTP syncing tools
##########################################################################
class HttpSyncer:
def __init__(self, hkey=None, client=None, hostNum=None) -> None:
self.hkey = hkey
self.skey = checksum(str(random.random()))[:8]
self.client = client or AnkiRequestsClient()
self.postVars: Dict[str, str] = {}
self.hostNum = hostNum
self.prefix = "sync/"
def syncURL(self) -> str:
if devMode:
url = "https://l1sync.ankiweb.net/"
else:
url = SYNC_BASE % (self.hostNum or "")
return url + self.prefix
def assertOk(self, resp) -> None:
# not using raise_for_status() as aqt expects this error msg
if resp.status_code != 200:
raise Exception("Unknown response code: %s" % resp.status_code)
# Posting data as a file
######################################################################
# We don't want to post the payload as a form var, as the percent-encoding is
# costly. We could send it as a raw post, but more HTTP clients seem to
# support file uploading, so this is the more compatible choice.
def _buildPostData(self, fobj, comp) -> Tuple[Dict[str, str], io.BytesIO]:
BOUNDARY = b"Anki-sync-boundary"
bdry = b"--" + BOUNDARY
buf = io.BytesIO()
# post vars
self.postVars["c"] = "1" if comp else "0"
for (key, value) in list(self.postVars.items()):
buf.write(bdry + b"\r\n")
buf.write(
(
'Content-Disposition: form-data; name="%s"\r\n\r\n%s\r\n'
% (key, value)
).encode("utf8")
)
# payload as raw data or json
rawSize = 0
if fobj:
# header
buf.write(bdry + b"\r\n")
buf.write(
b"""\
Content-Disposition: form-data; name="data"; filename="data"\r\n\
Content-Type: application/octet-stream\r\n\r\n"""
)
# write file into buffer, optionally compressing
if comp:
tgt = gzip.GzipFile(mode="wb", fileobj=buf, compresslevel=comp)
else:
tgt = buf # type: ignore
while 1:
data = fobj.read(65536)
if not data:
if comp:
tgt.close()
break
rawSize += len(data)
tgt.write(data)
buf.write(b"\r\n")
buf.write(bdry + b"--\r\n")
size = buf.tell()
# connection headers
headers = {
"Content-Type": "multipart/form-data; boundary=%s"
% BOUNDARY.decode("utf8"),
"Content-Length": str(size),
}
buf.seek(0)
if size >= 100 * 1024 * 1024 or rawSize >= 250 * 1024 * 1024:
raise Exception("Collection too large to upload to AnkiWeb.")
return headers, buf
def req(self, method, fobj=None, comp=6, badAuthRaises=True) -> Any:
headers, body = self._buildPostData(fobj, comp)
r = self.client.post(self.syncURL() + method, data=body, headers=headers)
if not badAuthRaises and r.status_code == 403:
return False
self.assertOk(r)
buf = self.client.streamContent(r)
return buf
# Incremental sync over HTTP
######################################################################
class RemoteServer(HttpSyncer):
def __init__(self, hkey, hostNum) -> None:
HttpSyncer.__init__(self, hkey, hostNum=hostNum)
def hostKey(self, user, pw) -> Any:
"Returns hkey or none if user/pw incorrect."
self.postVars = dict()
ret = self.req(
"hostKey",
io.BytesIO(json.dumps(dict(u=user, p=pw)).encode("utf8")),
badAuthRaises=False,
)
if not ret:
# invalid auth
return
self.hkey = json.loads(ret.decode("utf8"))["key"]
return self.hkey
def meta(self) -> Any:
self.postVars = dict(k=self.hkey, s=self.skey,)
ret = self.req(
"meta",
io.BytesIO(
json.dumps(
dict(
v=SYNC_VER,
cv="ankidesktop,%s,%s" % (versionWithBuild(), platDesc()),
)
).encode("utf8")
),
badAuthRaises=False,
)
if not ret:
# invalid auth
return
return json.loads(ret.decode("utf8"))
def applyGraves(self, **kw) -> Any:
return self._run("applyGraves", kw)
def applyChanges(self, **kw) -> Any:
return self._run("applyChanges", kw)
def start(self, **kw) -> Any:
return self._run("start", kw)
def chunk(self, **kw) -> Any:
return self._run("chunk", kw)
def applyChunk(self, **kw) -> Any:
return self._run("applyChunk", kw)
def sanityCheck2(self, **kw) -> Any:
return self._run("sanityCheck2", kw)
def finish(self, **kw) -> Any:
return self._run("finish", kw)
def abort(self, **kw) -> Any:
return self._run("abort", kw)
def _run(self, cmd, data) -> Any:
return json.loads(
self.req(cmd, io.BytesIO(json.dumps(data).encode("utf8"))).decode("utf8")
)
# Full syncing
##########################################################################
class FullSyncer(HttpSyncer):
def __init__(self, col, hkey, client, hostNum) -> None:
HttpSyncer.__init__(self, hkey, client, hostNum=hostNum)
self.postVars = dict(
k=self.hkey, v="ankidesktop,%s,%s" % (anki.version, platDesc()),
)
self.col = col
def download(self) -> Optional[str]:
runHook("sync", "download")
localNotEmpty = self.col.db.scalar("select 1 from cards")
self.col.close()
cont = self.req("download")
tpath = self.col.path + ".tmp"
if cont == "upgradeRequired":
runHook("sync", "upgradeRequired")
return None
open(tpath, "wb").write(cont)
# check the received file is ok
d = DB(tpath)
assert d.scalar("pragma integrity_check") == "ok"
remoteEmpty = not d.scalar("select 1 from cards")
d.close()
# accidental clobber?
if localNotEmpty and remoteEmpty:
os.unlink(tpath)
return "downloadClobber"
# overwrite existing collection
os.unlink(self.col.path)
os.rename(tpath, self.col.path)
self.col = None
return None
def upload(self) -> bool:
"True if upload successful."
runHook("sync", "upload")
# make sure it's ok before we try to upload
if self.col.db.scalar("pragma integrity_check") != "ok":
return False
if not self.col.basicCheck():
return False
# apply some adjustments, then upload
self.col.beforeUpload()
if self.req("upload", open(self.col.path, "rb")) != b"OK":
return False
return True
# Media syncing
##########################################################################
#
# About conflicts:
# - to minimize data loss, if both sides are marked for sending and one
# side has been deleted, favour the add
# - if added/changed on both sides, favour the server version on the
# assumption other syncers are in sync with the server
#
class MediaSyncer:
def __init__(self, col, server=None) -> None:
self.col = col
self.server = server
self.downloadCount = 0
def sync(self) -> Any:
# check if there have been any changes
runHook("sync", "findMedia")
self.col.log("findChanges")
try:
self.col.media.findChanges()
except DBError:
return "corruptMediaDB"
# begin session and check if in sync
lastUsn = self.col.media.lastUsn()
ret = self.server.begin()
srvUsn = ret["usn"]
if lastUsn == srvUsn and not self.col.media.haveDirty():
return "noChanges"
# loop through and process changes from server
self.col.log("last local usn is %s" % lastUsn)
while True:
data = self.server.mediaChanges(lastUsn=lastUsn)
self.col.log("mediaChanges resp count %d" % len(data))
if not data:
break
need = []
lastUsn = data[-1][1]
for fname, rusn, rsum in data:
lsum, ldirty = self.col.media.syncInfo(fname)
self.col.log(
"check: lsum=%s rsum=%s ldirty=%d rusn=%d fname=%s"
% ((lsum and lsum[0:4]), (rsum and rsum[0:4]), ldirty, rusn, fname)
)
if rsum:
# added/changed remotely
if not lsum or lsum != rsum:
self.col.log("will fetch")
need.append(fname)
else:
self.col.log("have same already")
if ldirty:
self.col.media.markClean([fname])
elif lsum:
# deleted remotely
if not ldirty:
self.col.log("delete local")
self.col.media.syncDelete(fname)
else:
# conflict; local add overrides remote delete
self.col.log("conflict; will send")
else:
# deleted both sides
self.col.log("both sides deleted")
if ldirty:
self.col.media.markClean([fname])
self._downloadFiles(need)
self.col.log("update last usn to %d" % lastUsn)
self.col.media.setLastUsn(lastUsn) # commits
# at this point we're all up to date with the server's changes,
# and we need to send our own
updateConflict = False
toSend = self.col.media.dirtyCount()
while True:
zip, fnames = self.col.media.mediaChangesZip()
if not fnames:
break
runHook(
"syncMsg",
ngettext(
"%d media change to upload", "%d media changes to upload", toSend
)
% toSend,
)
processedCnt, serverLastUsn = self.server.uploadChanges(zip)
self.col.media.markClean(fnames[0:processedCnt])
self.col.log(
"processed %d, serverUsn %d, clientUsn %d"
% (processedCnt, serverLastUsn, lastUsn)
)
if serverLastUsn - processedCnt == lastUsn:
self.col.log("lastUsn in sync, updating local")
lastUsn = serverLastUsn
self.col.media.setLastUsn(serverLastUsn) # commits
else:
self.col.log("concurrent update, skipping usn update")
# commit for markClean
self.col.media.db.commit()
updateConflict = True
toSend -= processedCnt
if updateConflict:
self.col.log("restart sync due to concurrent update")
return self.sync()
lcnt = self.col.media.mediaCount()
ret = self.server.mediaSanity(local=lcnt)
if ret == "OK":
return "OK"
else:
self.col.media.forceResync()
return ret
def _downloadFiles(self, fnames) -> None:
self.col.log("%d files to fetch" % len(fnames))
while fnames:
top = fnames[0:SYNC_ZIP_COUNT]
self.col.log("fetch %s" % top)
zipData = self.server.downloadFiles(files=top)
cnt = self.col.media.addFilesFromZip(zipData)
self.downloadCount += cnt
self.col.log("received %d files" % cnt)
fnames = fnames[cnt:]
n = self.downloadCount
runHook(
"syncMsg",
ngettext("%d media file downloaded", "%d media files downloaded", n)
% n,
)
# Remote media syncing
##########################################################################
class RemoteMediaServer(HttpSyncer):
def __init__(self, col, hkey, client, hostNum) -> None:
self.col = col
HttpSyncer.__init__(self, hkey, client, hostNum=hostNum)
self.prefix = "msync/"
def begin(self) -> Any:
self.postVars = dict(
k=self.hkey, v="ankidesktop,%s,%s" % (anki.version, platDesc())
)
ret = self._dataOnly(
self.req("begin", io.BytesIO(json.dumps(dict()).encode("utf8")))
)
self.skey = ret["sk"]
return ret
# args: lastUsn
def mediaChanges(self, **kw) -> Any:
self.postVars = dict(sk=self.skey,)
return self._dataOnly(
self.req("mediaChanges", io.BytesIO(json.dumps(kw).encode("utf8")))
)
# args: files
def downloadFiles(self, **kw) -> Any:
return self.req("downloadFiles", io.BytesIO(json.dumps(kw).encode("utf8")))
def uploadChanges(self, zip) -> Any:
# no compression, as we compress the zip file instead
return self._dataOnly(self.req("uploadChanges", io.BytesIO(zip), comp=0))
# args: local
def mediaSanity(self, **kw) -> Any:
return self._dataOnly(
self.req("mediaSanity", io.BytesIO(json.dumps(kw).encode("utf8")))
)
def _dataOnly(self, resp) -> Any:
resp = json.loads(resp.decode("utf8"))
if resp["err"]:
self.col.log("error returned:%s" % resp["err"])
raise Exception("SyncError:%s" % resp["err"])
return resp["data"]
# only for unit tests
def mediatest(self, cmd) -> Any:
self.postVars = dict(k=self.hkey,)
return self._dataOnly(
self.req(
"newMediaTest", io.BytesIO(json.dumps(dict(cmd=cmd)).encode("utf8"))
)
)