login/full up/full down plugged in

This commit is contained in:
Damien Elmes 2020-05-30 12:28:22 +10:00
parent ee6d7f82e7
commit 0e5b7da62a
19 changed files with 435 additions and 1240 deletions

View file

@ -159,7 +159,7 @@ service BackendService {
// sync // sync
rpc SyncMedia (SyncMediaIn) returns (Empty); rpc SyncMedia (SyncAuth) returns (Empty);
rpc AbortSync (Empty) returns (Empty); rpc AbortSync (Empty) returns (Empty);
rpc BeforeUpload (Empty) returns (Empty); rpc BeforeUpload (Empty) returns (Empty);
rpc SyncLogin (SyncLoginIn) returns (SyncAuth); rpc SyncLogin (SyncLoginIn) returns (SyncAuth);
@ -610,11 +610,6 @@ message AddMediaFileIn {
bytes data = 2; bytes data = 2;
} }
message SyncMediaIn {
string hkey = 1;
string endpoint = 2;
}
message CheckMediaOut { message CheckMediaOut {
repeated string unused = 1; repeated string unused = 1;
repeated string missing = 2; repeated string missing = 2;

View file

@ -239,11 +239,20 @@ class Collection:
self.media.close() self.media.close()
self._closeLog() self._closeLog()
def close_for_full_sync(self) -> None:
# save and cleanup, but backend will take care of collection close
if self.db:
self.save(trx=False)
self.models._clear_cache()
self.db = None
self.media.close()
self._closeLog()
def rollback(self) -> None: def rollback(self) -> None:
self.db.rollback() self.db.rollback()
self.db.begin() self.db.begin()
def reopen(self) -> None: def reopen(self, after_full_sync=False) -> None:
assert not self.db assert not self.db
assert self.path.endswith(".anki2") assert self.path.endswith(".anki2")
@ -255,6 +264,7 @@ class Collection:
log_path = self.path.replace(".anki2", "2.log") log_path = self.path.replace(".anki2", "2.log")
# connect # connect
if not after_full_sync:
self.backend.open_collection( self.backend.open_collection(
collection_path=self.path, collection_path=self.path,
media_folder_path=media_dir, media_folder_path=media_dir,

View file

@ -66,13 +66,6 @@ MODEL_CLOZE = 1
STARTING_FACTOR = 2500 STARTING_FACTOR = 2500
# deck schema & syncing vars
SCHEMA_VERSION = 11
SYNC_ZIP_SIZE = int(2.5 * 1024 * 1024)
SYNC_ZIP_COUNT = 25
SYNC_BASE = "https://sync%s.ankiweb.net/"
SYNC_VER = 10
HELP_SITE = "https://apps.ankiweb.net/docs/manual.html" HELP_SITE = "https://apps.ankiweb.net/docs/manual.html"
# Leech actions # Leech actions

View file

@ -466,6 +466,8 @@ schema_will_change = _SchemaWillChangeFilter()
class _SyncProgressDidChangeHook: class _SyncProgressDidChangeHook:
"""Obsolete, do not use."""
_hooks: List[Callable[[str], None]] = [] _hooks: List[Callable[[str], None]] = []
def append(self, cb: Callable[[str], None]) -> None: def append(self, cb: Callable[[str], None]) -> None:
@ -484,14 +486,14 @@ class _SyncProgressDidChangeHook:
# if the hook fails, remove it # if the hook fails, remove it
self._hooks.remove(hook) self._hooks.remove(hook)
raise raise
# legacy support
runHook("syncMsg", msg)
sync_progress_did_change = _SyncProgressDidChangeHook() sync_progress_did_change = _SyncProgressDidChangeHook()
class _SyncStageDidChangeHook: class _SyncStageDidChangeHook:
"""Obsolete, do not use."""
_hooks: List[Callable[[str], None]] = [] _hooks: List[Callable[[str], None]] = []
def append(self, cb: Callable[[str], None]) -> None: def append(self, cb: Callable[[str], None]) -> None:
@ -510,8 +512,6 @@ class _SyncStageDidChangeHook:
# if the hook fails, remove it # if the hook fails, remove it
self._hooks.remove(hook) self._hooks.remove(hook)
raise raise
# legacy support
runHook("sync", stage)
sync_stage_did_change = _SyncStageDidChangeHook() sync_stage_did_change = _SyncStageDidChangeHook()

View file

@ -52,6 +52,8 @@ TagUsnTuple = pb.TagUsnTuple
NoteType = pb.NoteType NoteType = pb.NoteType
DeckTreeNode = pb.DeckTreeNode DeckTreeNode = pb.DeckTreeNode
StockNoteType = pb.StockNoteType StockNoteType = pb.StockNoteType
SyncAuth = pb.SyncAuth
SyncOutput = pb.SyncCollectionOut
try: try:
import orjson import orjson
@ -147,6 +149,7 @@ def proto_exception_to_native(err: pb.BackendError) -> Exception:
MediaSyncProgress = pb.MediaSyncProgress MediaSyncProgress = pb.MediaSyncProgress
FullSyncProgress = pb.FullSyncProgress
FormatTimeSpanContext = pb.FormatTimespanIn.Context FormatTimeSpanContext = pb.FormatTimespanIn.Context
@ -254,4 +257,6 @@ def translate_string_in(
# temporarily force logging of media handling # temporarily force logging of media handling
if "RUST_LOG" not in os.environ: if "RUST_LOG" not in os.environ:
os.environ["RUST_LOG"] = "warn,anki::media=debug,anki::dbcheck=debug" os.environ[
"RUST_LOG"
] = "warn,anki::media=debug,anki::sync=debug,anki::dbcheck=debug"

View file

@ -1,674 +1,11 @@
# Copyright: Ankitects Pty Ltd and contributors # Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import gzip
import io
import json
import os
import random
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import anki
from anki.consts import *
from anki.db import DB
from anki.utils import checksum, ids2str, intTime, platDesc, versionWithBuild
from . import hooks
from .httpclient import HttpClient from .httpclient import HttpClient
# add-on compat
from .rsbackend import from_json_bytes, to_json_bytes
AnkiRequestsClient = HttpClient AnkiRequestsClient = HttpClient
class UnexpectedSchemaChange(Exception):
pass
# Incremental syncing
##########################################################################
class Syncer: class Syncer:
chunkRows: Optional[List[Sequence]]
def __init__(self, col: anki.collection.Collection, server=None) -> None:
self.col = col.weakref()
self.server = server
# these are set later; provide dummy values for type checking
self.lnewer = False
self.maxUsn = 0
self.tablesLeft: List[str] = []
def sync(self) -> str: def sync(self) -> str:
"Returns 'noChanges', 'fullSync', 'success', etc"
self.syncMsg = ""
self.uname = ""
# if the deck has any pending changes, flush them first and bump mod
# time
self.col.save()
# step 1: login & metadata
hooks.sync_stage_did_change("login")
meta = self.server.meta()
self.col.log("rmeta", meta)
if not meta:
return "badAuth"
# server requested abort?
self.syncMsg = meta["msg"]
if not meta["cont"]:
return "serverAbort"
else:
# don't abort, but if 'msg' is not blank, gui should show 'msg'
# after sync finishes and wait for confirmation before hiding
pass pass
rscm = meta["scm"]
rts = meta["ts"]
self.rmod = meta["mod"]
self.maxUsn = meta["usn"]
self.uname = meta.get("uname", "")
self.hostNum = meta.get("hostNum")
meta = self.meta()
self.col.log("lmeta", meta)
self.lmod = meta["mod"]
self.minUsn = meta["usn"]
lscm = meta["scm"]
lts = meta["ts"]
if abs(rts - lts) > 300:
self.col.log("clock off")
return "clockOff"
if self.lmod == self.rmod:
self.col.log("no changes")
return "noChanges"
elif lscm != rscm:
self.col.log("schema diff")
return "fullSync"
self.lnewer = self.lmod > self.rmod
# step 1.5: check collection is valid
if not self.col.basicCheck():
self.col.log("basic check")
return "basicCheckFailed"
# step 2: startup and deletions
hooks.sync_stage_did_change("meta")
rrem = self.server.start(
minUsn=self.minUsn, lnewer=self.lnewer, offset=self.col.localOffset()
)
# apply deletions to server
lgraves = self.removed()
while lgraves:
gchunk, lgraves = self._gravesChunk(lgraves)
self.server.applyGraves(chunk=gchunk)
# then apply server deletions here
self.remove(rrem)
# ...and small objects
lchg = self.changes()
rchg = self.server.applyChanges(changes=lchg)
try:
self.mergeChanges(lchg, rchg)
except UnexpectedSchemaChange:
self.server.abort()
return self._forceFullSync()
# step 3: stream large tables from server
hooks.sync_stage_did_change("server")
while 1:
hooks.sync_stage_did_change("stream")
chunk = self.server.chunk()
self.col.log("server chunk", chunk)
self.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 4: stream to server
hooks.sync_stage_did_change("client")
while 1:
hooks.sync_stage_did_change("stream")
chunk = self.chunk()
self.col.log("client chunk", chunk)
self.server.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 5: sanity check
hooks.sync_stage_did_change("sanity")
c = self.sanityCheck()
ret = self.server.sanityCheck2(client=c)
if ret["status"] != "ok":
return self._forceFullSync()
# finalize
hooks.sync_stage_did_change("finalize")
mod = self.server.finish()
self.finish(mod)
return "success"
def _forceFullSync(self) -> str:
# roll back and force full sync
self.col.rollback()
self.col.modSchema(False)
self.col.save()
return "sanityCheckFailed"
def _gravesChunk(self, graves: Dict) -> Tuple[Dict, Optional[Dict]]:
lim = 250
chunk: Dict[str, Any] = dict(notes=[], cards=[], decks=[])
for cat in "notes", "cards", "decks":
if lim and graves[cat]:
chunk[cat] = graves[cat][:lim]
graves[cat] = graves[cat][lim:]
lim -= len(chunk[cat])
# anything remaining?
if graves["notes"] or graves["cards"] or graves["decks"]:
return chunk, graves
return chunk, None
def meta(self) -> dict:
return dict(
mod=self.col.mod,
scm=self.col.scm,
usn=self.col._usn,
ts=intTime(),
musn=0,
msg="",
cont=True,
)
def changes(self) -> dict:
"Bundle up small objects."
d: Dict[str, Any] = dict(
models=self.getModels(), decks=self.getDecks(), tags=self.getTags()
)
if self.lnewer:
d["conf"] = self.getConf()
d["crt"] = self.col.crt
return d
def mergeChanges(self, lchg, rchg) -> None:
# then the other objects
self.mergeModels(rchg["models"])
self.mergeDecks(rchg["decks"])
self.mergeTags(rchg["tags"])
if "conf" in rchg:
self.mergeConf(rchg["conf"])
# this was left out of earlier betas
if "crt" in rchg:
self.col.crt = rchg["crt"]
self.prepareToChunk()
def sanityCheck(self) -> Union[list, str]:
if not self.col.basicCheck():
return "failed basic check"
for t in "cards", "notes", "revlog", "graves":
if self.col.db.scalar("select count() from %s where usn = -1" % t):
return "%s had usn = -1" % t
for g in self.col.decks.all():
if g["usn"] == -1:
return "deck had usn = -1"
for tup in self.col.backend.all_tags():
if tup.usn == -1:
return "tag had usn = -1"
found = False
for m in self.col.models.all():
if m["usn"] == -1:
return "model had usn = -1"
if found:
self.col.models.save()
self.col.sched.reset()
# return summary of deck
return [
list(self.col.sched.counts()),
self.col.db.scalar("select count() from cards"),
self.col.db.scalar("select count() from notes"),
self.col.db.scalar("select count() from revlog"),
self.col.db.scalar("select count() from graves"),
len(self.col.models.all()),
len(self.col.decks.all()),
len(self.col.decks.allConf()),
]
def usnLim(self) -> str:
return "usn = -1"
def finish(self, mod: int) -> int:
self.col.ls = mod
self.col._usn = self.maxUsn + 1
# ensure we save the mod time even if no changes made
self.col.db.mod = True
self.col.save(mod=mod)
return mod
# Chunked syncing
##########################################################################
def prepareToChunk(self) -> None:
self.tablesLeft = ["revlog", "cards", "notes"]
self.chunkRows = None
def getChunkRows(self, table) -> List[Sequence]:
lim = self.usnLim()
x = self.col.db.all
d = (self.maxUsn, lim)
if table == "revlog":
return x(
"""
select id, cid, %d, ease, ivl, lastIvl, factor, time, type
from revlog where %s"""
% d
)
elif table == "cards":
return x(
"""
select id, nid, did, ord, mod, %d, type, queue, due, ivl, factor, reps,
lapses, left, odue, odid, flags, data from cards where %s"""
% d
)
else:
return x(
"""
select id, guid, mid, mod, %d, tags, flds, '', '', flags, data
from notes where %s"""
% d
)
def chunk(self) -> dict:
buf: Dict[str, Any] = dict(done=False)
lim = 250
while self.tablesLeft and lim:
curTable = self.tablesLeft[0]
if self.chunkRows is None:
self.chunkRows = self.getChunkRows(curTable)
rows = self.chunkRows[:lim]
self.chunkRows = self.chunkRows[lim:]
fetched = len(rows)
if fetched != lim:
# table is empty
self.tablesLeft.pop(0)
self.chunkRows = None
# mark the objects as having been sent
self.col.db.execute(
"update %s set usn=? where usn=-1" % curTable, self.maxUsn
)
buf[curTable] = rows
lim -= fetched
if not self.tablesLeft:
buf["done"] = True
return buf
def applyChunk(self, chunk) -> None:
if "revlog" in chunk:
self.mergeRevlog(chunk["revlog"])
if "cards" in chunk:
self.mergeCards(chunk["cards"])
if "notes" in chunk:
self.mergeNotes(chunk["notes"])
# Deletions
##########################################################################
def removed(self) -> dict:
cards = []
notes = []
decks = []
curs = self.col.db.execute("select oid, type from graves where usn = -1")
for oid, type in curs:
if type == REM_CARD:
cards.append(oid)
elif type == REM_NOTE:
notes.append(oid)
else:
decks.append(oid)
self.col.db.execute("update graves set usn=? where usn=-1", self.maxUsn)
return dict(cards=cards, notes=notes, decks=decks)
def remove(self, graves) -> None:
# pretend to be the server so we don't set usn = -1
self.col.server = True
# notes first, so we don't end up with duplicate graves
self.col._remNotes(graves["notes"])
# then cards
self.col.remCards(graves["cards"], notes=False)
# and decks
for oid in graves["decks"]:
self.col.decks.rem(oid, childrenToo=False)
self.col.server = False
# Models
##########################################################################
def getModels(self) -> List:
mods = [m for m in self.col.models.all() if m["usn"] == -1]
for m in mods:
m["usn"] = self.maxUsn
self.col.models.update(m, preserve_usn=True)
return mods
def mergeModels(self, rchg) -> None:
for r in rchg:
l = self.col.models.get(r["id"])
# if missing locally or server is newer, update
if not l or r["mod"] > l["mod"]:
# This is a hack to detect when the note type has been altered
# in an import without a full sync being forced. A future
# syncing algorithm should handle this in a better way.
if l:
if len(l["flds"]) != len(r["flds"]):
raise UnexpectedSchemaChange()
if len(l["tmpls"]) != len(r["tmpls"]):
raise UnexpectedSchemaChange()
self.col.models.update(r, preserve_usn=True)
# Decks
##########################################################################
def getDecks(self) -> List[list]:
decks = [g for g in self.col.decks.all() if g["usn"] == -1]
for g in decks:
g["usn"] = self.maxUsn
self.col.decks.update(g, preserve_usn=True)
dconf = [g for g in self.col.decks.all_config() if g["usn"] == -1]
for g in dconf:
g["usn"] = self.maxUsn
self.col.decks.update_config(g, preserve_usn=True)
return [decks, dconf]
def mergeDecks(self, rchg) -> None:
for r in rchg[0]:
l = self.col.decks.get(r["id"], False)
# work around mod time being stored as string
if l and not isinstance(l["mod"], int):
l["mod"] = int(l["mod"])
# if missing locally or server is newer, update
if not l or r["mod"] > l["mod"]:
self.col.decks.update(r, preserve_usn=True)
for r in rchg[1]:
try:
l = self.col.decks.get_config(r["id"])
except KeyError:
l = None
# if missing locally or server is newer, update
if not l or r["mod"] > l["mod"]:
self.col.decks.update_config(r, preserve_usn=True)
# Tags
##########################################################################
def getTags(self) -> List:
return list(self.col.backend.get_changed_tags(self.maxUsn))
def mergeTags(self, tags) -> None:
self.col.tags.register(tags, usn=self.maxUsn)
# Cards/notes/revlog
##########################################################################
def mergeRevlog(self, logs) -> None:
self.col.db.executemany(
"insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)", logs
)
def newerRows(self, data, table, modIdx) -> List:
ids = (r[0] for r in data)
lmods = {}
for id, mod in self.col.db.execute(
"select id, mod from %s where id in %s and %s"
% (table, ids2str(ids), self.usnLim())
):
lmods[id] = mod
update = []
for r in data:
if r[0] not in lmods or lmods[r[0]] < r[modIdx]:
update.append(r)
self.col.log(table, data)
return update
def mergeCards(self, cards) -> None:
self.col.db.executemany(
"insert or replace into cards values "
"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
self.newerRows(cards, "cards", 4),
)
def mergeNotes(self, notes) -> None:
rows = self.newerRows(notes, "notes", 3)
self.col.db.executemany(
"insert or replace into notes values (?,?,?,?,?,?,?,?,?,?,?)", rows
)
self.col.updateFieldCache([f[0] for f in rows])
# Col config
##########################################################################
def getConf(self) -> Dict[str, Any]:
return from_json_bytes(self.col.backend.get_all_config())
def mergeConf(self, conf: Dict[str, Any]) -> None:
self.col.backend.set_all_config(to_json_bytes(conf))
# HTTP syncing tools
##########################################################################
class HttpSyncer:
def __init__(self, hkey=None, client=None, hostNum=None) -> None:
self.hkey = hkey
self.skey = checksum(str(random.random()))[:8]
self.client = client or HttpClient()
self.postVars: Dict[str, str] = {}
self.hostNum = hostNum
self.prefix = "sync/"
def syncURL(self) -> str:
url = SYNC_BASE % (self.hostNum or "")
return url + self.prefix
def assertOk(self, resp) -> None:
# not using raise_for_status() as aqt expects this error msg
if resp.status_code != 200:
raise Exception("Unknown response code: %s" % resp.status_code)
# Posting data as a file
######################################################################
# We don't want to post the payload as a form var, as the percent-encoding is
# costly. We could send it as a raw post, but more HTTP clients seem to
# support file uploading, so this is the more compatible choice.
def _buildPostData(self, fobj, comp) -> Tuple[Dict[str, str], io.BytesIO]:
BOUNDARY = b"Anki-sync-boundary"
bdry = b"--" + BOUNDARY
buf = io.BytesIO()
# post vars
self.postVars["c"] = "1" if comp else "0"
for (key, value) in list(self.postVars.items()):
buf.write(bdry + b"\r\n")
buf.write(
(
'Content-Disposition: form-data; name="%s"\r\n\r\n%s\r\n'
% (key, value)
).encode("utf8")
)
# payload as raw data or json
rawSize = 0
if fobj:
# header
buf.write(bdry + b"\r\n")
buf.write(
b"""\
Content-Disposition: form-data; name="data"; filename="data"\r\n\
Content-Type: application/octet-stream\r\n\r\n"""
)
# write file into buffer, optionally compressing
if comp:
tgt = gzip.GzipFile(mode="wb", fileobj=buf, compresslevel=comp)
else:
tgt = buf # type: ignore
while 1:
data = fobj.read(65536)
if not data:
if comp:
tgt.close()
break
rawSize += len(data)
tgt.write(data)
buf.write(b"\r\n")
buf.write(bdry + b"--\r\n")
size = buf.tell()
# connection headers
headers = {
"Content-Type": "multipart/form-data; boundary=%s"
% BOUNDARY.decode("utf8"),
"Content-Length": str(size),
}
buf.seek(0)
if size >= 100 * 1024 * 1024 or rawSize >= 250 * 1024 * 1024:
raise Exception("Collection too large to upload to AnkiWeb.")
return headers, buf
def req(self, method, fobj=None, comp=6, badAuthRaises=True) -> Any:
headers, body = self._buildPostData(fobj, comp)
r = self.client.post(self.syncURL() + method, data=body, headers=headers)
if not badAuthRaises and r.status_code == 403:
return False
self.assertOk(r)
buf = self.client.streamContent(r)
return buf
# Incremental sync over HTTP
######################################################################
class RemoteServer(HttpSyncer):
def __init__(self, hkey, hostNum) -> None:
HttpSyncer.__init__(self, hkey, hostNum=hostNum)
def hostKey(self, user, pw) -> Any:
"Returns hkey or none if user/pw incorrect."
self.postVars = dict()
ret = self.req(
"hostKey",
io.BytesIO(json.dumps(dict(u=user, p=pw)).encode("utf8")),
badAuthRaises=False,
)
if not ret:
# invalid auth
return
self.hkey = json.loads(ret.decode("utf8"))["key"]
return self.hkey
def meta(self) -> Any:
self.postVars = dict(k=self.hkey, s=self.skey,)
ret = self.req(
"meta",
io.BytesIO(
json.dumps(
dict(
v=SYNC_VER,
cv="ankidesktop,%s,%s" % (versionWithBuild(), platDesc()),
)
).encode("utf8")
),
badAuthRaises=False,
)
if not ret:
# invalid auth
return
return json.loads(ret.decode("utf8"))
def applyGraves(self, **kw) -> Any:
return self._run("applyGraves", kw)
def applyChanges(self, **kw) -> Any:
return self._run("applyChanges", kw)
def start(self, **kw) -> Any:
return self._run("start", kw)
def chunk(self, **kw) -> Any:
return self._run("chunk", kw)
def applyChunk(self, **kw) -> Any:
return self._run("applyChunk", kw)
def sanityCheck2(self, **kw) -> Any:
return self._run("sanityCheck2", kw)
def finish(self, **kw) -> Any:
return self._run("finish", kw)
def abort(self, **kw) -> Any:
return self._run("abort", kw)
def _run(self, cmd: str, data: Any) -> Any:
return json.loads(
self.req(cmd, io.BytesIO(json.dumps(data).encode("utf8"))).decode("utf8")
)
# Full syncing
##########################################################################
class FullSyncer(HttpSyncer):
def __init__(self, col, hkey, client, hostNum) -> None:
HttpSyncer.__init__(self, hkey, client, hostNum=hostNum)
self.postVars = dict(
k=self.hkey, v="ankidesktop,%s,%s" % (anki.version, platDesc()),
)
self.col = col.weakref()
def download(self) -> Optional[str]:
hooks.sync_stage_did_change("download")
localNotEmpty = self.col.db.scalar("select 1 from cards")
self.col.close(downgrade=False)
cont = self.req("download")
tpath = self.col.path + ".tmp"
if cont == "upgradeRequired":
hooks.sync_stage_did_change("upgradeRequired")
return None
with open(tpath, "wb") as file:
file.write(cont)
# check the received file is ok
d = DB(tpath)
assert d.scalar("pragma integrity_check") == "ok"
remoteEmpty = not d.scalar("select 1 from cards")
d.close()
# accidental clobber?
if localNotEmpty and remoteEmpty:
os.unlink(tpath)
return "downloadClobber"
# overwrite existing collection
os.unlink(self.col.path)
os.rename(tpath, self.col.path)
self.col = None
return None
def upload(self) -> bool:
"True if upload successful."
hooks.sync_stage_did_change("upload")
# make sure it's ok before we try to upload
if self.col.db.scalar("pragma integrity_check") != "ok":
return False
if not self.col.basicCheck():
return False
# apply some adjustments, then upload
self.col.beforeUpload()
with open(self.col.path, "rb") as file:
if self.req("upload", file) != b"OK":
return False
return True

View file

@ -33,8 +33,6 @@ hooks = [
args=["exporters: List[Tuple[str, Any]]"], args=["exporters: List[Tuple[str, Any]]"],
legacy_hook="exportersList", legacy_hook="exportersList",
), ),
Hook(name="sync_stage_did_change", args=["stage: str"], legacy_hook="sync"),
Hook(name="sync_progress_did_change", args=["msg: str"], legacy_hook="syncMsg"),
Hook( Hook(
name="field_filter", name="field_filter",
args=[ args=[
@ -92,6 +90,12 @@ hooks = [
args=["notetype: Dict[str, Any]"], args=["notetype: Dict[str, Any]"],
doc="Obsolete, do not use.", doc="Obsolete, do not use.",
), ),
Hook(
name="sync_stage_did_change", args=["stage: str"], doc="Obsolete, do not use.",
),
Hook(
name="sync_progress_did_change", args=["msg: str"], doc="Obsolete, do not use.",
),
] ]
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -875,7 +875,8 @@ title="%s" %s>%s</button>""" % (
if self.media_syncer.is_syncing(): if self.media_syncer.is_syncing():
self.media_syncer.show_sync_log() self.media_syncer.show_sync_log()
else: else:
self.unloadCollection(self._onSync) self.temp_sync()
# self.unloadCollection(self._onSync)
def _onSync(self): def _onSync(self):
self._sync() self._sync()
@ -910,6 +911,11 @@ title="%s" %s>%s</button>""" % (
self.syncer.sync() self.syncer.sync()
self.app.setQuitOnLastWindowClosed(True) self.app.setQuitOnLastWindowClosed(True)
def temp_sync(self):
from aqt.sync import sync
sync(self)
# Tools # Tools
########################################################################## ##########################################################################

View file

@ -9,7 +9,6 @@ from dataclasses import dataclass
from typing import List, Optional, Union from typing import List, Optional, Union
import aqt import aqt
from anki.consts import SYNC_BASE
from anki.rsbackend import ( from anki.rsbackend import (
TR, TR,
Interrupted, Interrupted,
@ -45,8 +44,6 @@ class MediaSyncer:
if progress.kind != ProgressKind.MediaSync: if progress.kind != ProgressKind.MediaSync:
return return
print(progress.val)
assert isinstance(progress.val, MediaSyncProgress) assert isinstance(progress.val, MediaSyncProgress)
self._log_and_notify(progress.val) self._log_and_notify(progress.val)
@ -55,32 +52,24 @@ class MediaSyncer:
if self._syncing: if self._syncing:
return return
hkey = self.mw.pm.sync_key()
if hkey is None:
return
if not self.mw.pm.media_syncing_enabled(): if not self.mw.pm.media_syncing_enabled():
self._log_and_notify(tr(TR.SYNC_MEDIA_DISABLED)) self._log_and_notify(tr(TR.SYNC_MEDIA_DISABLED))
return return
auth = self.mw.pm.sync_auth()
if auth is None:
return
self._log_and_notify(tr(TR.SYNC_MEDIA_STARTING)) self._log_and_notify(tr(TR.SYNC_MEDIA_STARTING))
self._syncing = True self._syncing = True
self._progress_timer = self.mw.progress.timer(1000, self._on_progress, True) self._progress_timer = self.mw.progress.timer(1000, self._on_progress, True)
gui_hooks.media_sync_did_start_or_stop(True) gui_hooks.media_sync_did_start_or_stop(True)
def run() -> None: def run() -> None:
self.mw.col.backend.sync_media(hkey=hkey, endpoint=self._endpoint()) self.mw.col.backend.sync_media(auth)
self.mw.taskman.run_in_background(run, self._on_finished) self.mw.taskman.run_in_background(run, self._on_finished)
def _endpoint(self) -> str:
shard = self.mw.pm.sync_shard()
if shard is not None:
shard_str = str(shard)
else:
shard_str = ""
return f"{SYNC_BASE % shard_str}msync/"
def _log_and_notify(self, entry: LogEntry) -> None: def _log_and_notify(self, entry: LogEntry) -> None:
entry_with_time = LogEntryWithTime(time=intTime(), entry=entry) entry_with_time = LogEntryWithTime(time=intTime(), entry=entry)
self._log.append(entry_with_time) self._log.append(entry_with_time)

View file

@ -21,6 +21,7 @@ import aqt.sound
from anki import Collection from anki import Collection
from anki.db import DB from anki.db import DB
from anki.lang import _, without_unicode_isolation from anki.lang import _, without_unicode_isolation
from anki.rsbackend import SyncAuth
from anki.utils import intTime, isMac, isWin from anki.utils import intTime, isMac, isWin
from aqt import appHelpSite from aqt import appHelpSite
from aqt.qt import * from aqt.qt import *
@ -605,17 +606,23 @@ create table if not exists profiles
self.profile["interrupt_audio"] = val self.profile["interrupt_audio"] = val
aqt.sound.av_player.interrupt_current_audio = val aqt.sound.av_player.interrupt_current_audio = val
def sync_key(self) -> Optional[str]:
return self.profile.get("syncKey")
def set_sync_key(self, val: Optional[str]) -> None: def set_sync_key(self, val: Optional[str]) -> None:
self.profile["syncKey"] = val self.profile["syncKey"] = val
def set_sync_username(self, val: Optional[str]) -> None:
self.profile["syncUser"] = val
def set_host_number(self, val: Optional[int]) -> None:
self.profile["hostNum"] = val or 0
def media_syncing_enabled(self) -> bool: def media_syncing_enabled(self) -> bool:
return self.profile["syncMedia"] return self.profile["syncMedia"]
def sync_shard(self) -> Optional[int]: def sync_auth(self) -> Optional[SyncAuth]:
return self.profile.get("hostNum") hkey = self.profile.get("syncKey")
if not hkey:
return None
return SyncAuth(hkey=hkey, host_number=self.profile.get("hostNum", 0))
###################################################################### ######################################################################

View file

@ -87,7 +87,14 @@ class ProgressManager:
qconnect(self._show_timer.timeout, self._on_show_timer) qconnect(self._show_timer.timeout, self._on_show_timer)
return self._win return self._win
def update(self, label=None, value=None, process=True, maybeShow=True) -> None: def update(
self,
label=None,
value=None,
process=True,
maybeShow=True,
max: Optional[int] = None,
) -> None:
# print self._min, self._counter, self._max, label, time.time() - self._lastTime # print self._min, self._counter, self._max, label, time.time() - self._lastTime
if not self.mw.inMainThread(): if not self.mw.inMainThread():
print("progress.update() called on wrong thread") print("progress.update() called on wrong thread")
@ -101,7 +108,9 @@ class ProgressManager:
elapsed = time.time() - self._lastUpdate elapsed = time.time() - self._lastUpdate
if label: if label:
self._win.form.label.setText(label) self._win.form.label.setText(label)
self._max = max
if self._max: if self._max:
self._win.form.progressBar.setMaximum(max)
self._counter = value or (self._counter + 1) self._counter = value or (self._counter + 1)
self._win.form.progressBar.setValue(self._counter) self._win.form.progressBar.setValue(self._counter)
if process and elapsed >= 0.2: if process and elapsed >= 0.2:
@ -170,6 +179,13 @@ class ProgressManager:
self._show_timer = None self._show_timer = None
self._showWin() self._showWin()
def want_cancel(self) -> bool:
win = self._win
if win:
return win.wantCancel
else:
return False
class ProgressDialog(QDialog): class ProgressDialog(QDialog):
def __init__(self, parent): def __init__(self, parent):

View file

@ -1,496 +1,253 @@
# Copyright: Ankitects Pty Ltd and contributors # Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
import gc from __future__ import annotations
import time
from anki import hooks import enum
from anki.lang import _ from typing import Callable, Tuple
from anki.storage import Collection
from anki.sync import FullSyncer, RemoteServer, Syncer
from aqt.qt import *
from aqt.utils import askUserDialog, showInfo, showText, showWarning, tooltip
# Sync manager import aqt
###################################################################### from anki.rsbackend import (
TR,
FullSyncProgress,
ProgressKind,
SyncError,
SyncErrorKind,
SyncOutput,
)
from aqt.qt import (
QDialog,
QDialogButtonBox,
QGridLayout,
QLabel,
QLineEdit,
Qt,
QTimer,
QVBoxLayout,
qconnect,
)
from aqt.utils import askUser, askUserDialog, showWarning, tr
class SyncManager(QObject): class FullSyncChoice(enum.Enum):
def __init__(self, mw, pm): CANCEL = 0
QObject.__init__(self, mw) UPLOAD = 1
self.mw = mw DOWNLOAD = 2
self.pm = pm
def sync(self):
if not self.pm.profile["syncKey"]: def get_sync_status(mw: aqt.main.AnkiQt, callback: Callable[[SyncOutput], None]):
auth = self._getUserPass() auth = mw.pm.sync_auth()
if not auth: if not auth:
return return
self.pm.profile["syncUser"] = auth[0]
self._sync(auth) def on_done(fut):
callback(fut.result())
mw.taskman.run_in_background(lambda: mw.col.backend.sync_status(auth), on_done)
def sync(mw: aqt.main.AnkiQt) -> None:
auth = mw.pm.sync_auth()
if not auth:
login(mw, on_success=lambda: sync(mw))
return
def on_done(fut):
mw.col.db.begin()
out: SyncOutput = fut.result()
mw.pm.set_host_number(out.host_number)
if out.required == out.NO_CHANGES:
# all done
return
else: else:
self._sync() full_sync(mw, out)
def _sync(self, auth=None): if not mw.col.basicCheck():
# to avoid gui widgets being garbage collected in the worker thread, showWarning("Please use Tools>Check Database")
# run gc in advance return
self._didFullUp = False
self._didError = False
gc.collect()
# create the thread, setup signals and start running
t = self.thread = SyncThread(
self.pm.collectionPath(),
self.pm.profile["syncKey"],
auth=auth,
hostNum=self.pm.profile.get("hostNum"),
)
qconnect(t._event, self.onEvent)
qconnect(t.progress_event, self.on_progress)
self.label = _("Connecting...")
prog = self.mw.progress.start(immediate=True, label=self.label)
self.sentBytes = self.recvBytes = 0
self._updateLabel()
self.thread.start()
while not self.thread.isFinished():
if prog.wantCancel:
self.thread.flagAbort()
# make sure we don't display 'upload success' msg
self._didFullUp = False
# abort may take a while
self.mw.progress.update(_("Stopping..."))
self.mw.app.processEvents()
self.thread.wait(100)
self.mw.progress.finish()
if self.thread.syncMsg:
showText(self.thread.syncMsg)
if self.thread.uname:
self.pm.profile["syncUser"] = self.thread.uname
self.pm.profile["hostNum"] = self.thread.hostNum
def delayedInfo(): mw.col.save(trx=False)
if self._didFullUp and not self._didError: mw.taskman.with_progress(
showInfo( lambda: mw.col.backend.sync_collection(auth),
_( on_done,
"""\ label=tr(TR.SYNC_CHECKING),
Your collection was successfully uploaded to AnkiWeb.
If you use any other devices, please sync them now, and choose \
to download the collection you have just uploaded from this computer. \
After doing so, future reviews and added cards will be merged \
automatically."""
)
) )
self.mw.progress.timer(1000, delayedInfo, False, requiresCollection=False)
def _updateLabel(self): def full_sync(mw: aqt.main.AnkiQt, out: SyncOutput) -> None:
self.mw.progress.update( if out.required == out.FULL_DOWNLOAD:
label="%s\n%s" confirm_full_download(mw)
% ( elif out.required == out.FULL_UPLOAD:
self.label, full_upload(mw)
_("%(a)0.1fkB up, %(b)0.1fkB down") else:
% dict(a=self.sentBytes / 1024, b=self.recvBytes / 1024), choice = ask_user_to_decide_direction()
) if choice == FullSyncChoice.UPLOAD:
full_upload(mw)
elif choice == FullSyncChoice.DOWNLOAD:
full_download(mw)
def confirm_full_download(mw: aqt.main.AnkiQt) -> None:
# confirmation step required, as some users customize their notetypes
# in an empty collection, then want to upload them
if not askUser(tr(TR.SYNC_CONFIRM_EMPTY_DOWNLOAD)):
return
else:
mw.closeAllWindows(lambda: full_download(mw))
def on_full_sync_timer(mw: aqt.main.AnkiQt) -> None:
progress = mw.col.latest_progress()
if progress.kind != ProgressKind.FullSync:
return
assert isinstance(progress, FullSyncProgress)
mw.progress.update(value=progress.val.transferred, max=progress.val.total)
if mw.progress.want_cancel():
mw.col.backend.abort_sync()
def full_download(mw: aqt.main.AnkiQt) -> None:
mw.col.close_for_full_sync()
def on_timer():
on_full_sync_timer(mw)
timer = QTimer(mw)
qconnect(timer.timeout, on_timer)
timer.start(150)
def on_done(fut):
timer.stop()
mw.col.reopen(after_full_sync=True)
mw.reset()
try:
fut.result()
except Exception as e:
showWarning(str(e))
return
mw.taskman.with_progress(
lambda: mw.col.backend.full_download(mw.pm.sync_auth()),
on_done,
label=tr(TR.SYNC_DOWNLOADING_FROM_ANKIWEB),
) )
def on_progress(self, upload: int, download: int) -> None:
# posted events not guaranteed to arrive in order; don't go backwards
self.sentBytes = max(self.sentBytes, upload)
self.recvBytes = max(self.recvBytes, download)
self._updateLabel()
def onEvent(self, evt, *args): def full_upload(mw: aqt.main.AnkiQt) -> None:
pu = self.mw.progress.update mw.col.close_for_full_sync()
if evt == "badAuth":
tooltip( def on_timer():
_("AnkiWeb ID or password was incorrect; please try again."), on_full_sync_timer(mw)
parent=self.mw,
) timer = QTimer(mw)
# blank the key so we prompt user again qconnect(timer.timeout, on_timer)
self.pm.profile["syncKey"] = None timer.start(150)
self.pm.save()
elif evt == "corrupt": def on_done(fut):
pass timer.stop()
elif evt == "newKey": mw.col.reopen(after_full_sync=True)
self.pm.profile["syncKey"] = args[0] mw.reset()
self.pm.save() try:
elif evt == "offline": fut.result()
tooltip(_("Syncing failed; internet offline.")) except Exception as e:
elif evt == "upbad": showWarning(str(e))
self._didFullUp = False return
self._checkFailed()
elif evt == "sync": mw.taskman.with_progress(
m = None lambda: mw.col.backend.full_upload(mw.pm.sync_auth()),
t = args[0] on_done,
if t == "login": label=tr(TR.SYNC_UPLOADING_TO_ANKIWEB),
m = _("Syncing...")
elif t == "upload":
self._didFullUp = True
m = _("Uploading to AnkiWeb...")
elif t == "download":
m = _("Downloading from AnkiWeb...")
elif t == "sanity":
m = _("Checking...")
elif t == "upgradeRequired":
showText(
_(
"""\
Please visit AnkiWeb, upgrade your deck, then try again."""
)
)
if m:
self.label = m
self._updateLabel()
elif evt == "syncMsg":
self.label = args[0]
self._updateLabel()
elif evt == "error":
self._didError = True
showText(_("Syncing failed:\n%s") % self._rewriteError(args[0]))
elif evt == "clockOff":
self._clockOff()
elif evt == "checkFailed":
self._checkFailed()
elif evt == "noChanges":
pass
elif evt == "fullSync":
self._confirmFullSync()
elif evt == "downloadClobber":
showInfo(
_(
"Your AnkiWeb collection does not contain any cards. Please sync again and choose 'Upload' instead."
)
) )
def _rewriteError(self, err):
if "Errno 61" in err:
return _(
"""\
Couldn't connect to AnkiWeb. Please check your network connection \
and try again."""
)
elif "timed out" in err or "10060" in err:
return _(
"""\
The connection to AnkiWeb timed out. Please check your network \
connection and try again."""
)
elif "code: 500" in err:
return _(
"""\
AnkiWeb encountered an error. Please try again in a few minutes, and if \
the problem persists, please file a bug report."""
)
elif "code: 501" in err:
return _(
"""\
Please upgrade to the latest version of Anki."""
)
# 502 is technically due to the server restarting, but we reuse the
# error message
elif "code: 502" in err:
return _("AnkiWeb is under maintenance. Please try again in a few minutes.")
elif "code: 503" in err:
return _(
"""\
AnkiWeb is too busy at the moment. Please try again in a few minutes."""
)
elif "code: 504" in err:
return _(
"504 gateway timeout error received. Please try temporarily disabling your antivirus."
)
elif "code: 409" in err:
return _(
"Only one client can access AnkiWeb at a time. If a previous sync failed, please try again in a few minutes."
)
elif "10061" in err or "10013" in err or "10053" in err:
return _(
"Antivirus or firewall software is preventing Anki from connecting to the internet."
)
elif "10054" in err or "Broken pipe" in err:
return _(
"Connection timed out. Either your internet connection is experiencing problems, or you have a very large file in your media folder."
)
elif "Unable to find the server" in err or "socket.gaierror" in err:
return _(
"Server not found. Either your connection is down, or antivirus/firewall "
"software is blocking Anki from connecting to the internet."
)
elif "code: 407" in err:
return _("Proxy authentication required.")
elif "code: 413" in err:
return _("Your collection or a media file is too large to sync.")
elif "EOF occurred in violation of protocol" in err:
return (
_(
"Error establishing a secure connection. This is usually caused by antivirus, firewall or VPN software, or problems with your ISP."
)
+ " (eof)"
)
elif "certificate verify failed" in err:
return (
_(
"Error establishing a secure connection. This is usually caused by antivirus, firewall or VPN software, or problems with your ISP."
)
+ " (invalid cert)"
)
return err
def _getUserPass(self): def login(
d = QDialog(self.mw) mw: aqt.main.AnkiQt, on_success: Callable[[], None], username="", password=""
d.setWindowTitle("Anki") ) -> None:
d.setWindowModality(Qt.WindowModal) while True:
(username, password) = get_id_and_pass_from_user(mw, username, password)
if not username and not password:
return
if username and password:
break
def on_done(fut):
try:
auth = fut.result()
except SyncError as e:
if e.kind() == SyncErrorKind.AUTH_FAILED:
showWarning(str(e))
login(mw, on_success, username, password)
return
except Exception as e:
showWarning(str(e))
return
mw.pm.set_host_number(auth.host_number)
mw.pm.set_sync_key(auth.hkey)
mw.pm.set_sync_username(username)
on_success()
mw.taskman.with_progress(
lambda: mw.col.backend.sync_login(username=username, password=password), on_done
)
def ask_user_to_decide_direction() -> FullSyncChoice:
button_labels = [
tr(TR.SYNC_UPLOAD_TO_ANKIWEB),
tr(TR.SYNC_DOWNLOAD_FROM_ANKIWEB),
tr(TR.SYNC_CANCEL_BUTTON),
]
diag = askUserDialog(tr(TR.SYNC_CONFLICT_EXPLANATION), button_labels)
diag.setDefault(2)
ret = diag.run()
if ret == button_labels[0]:
return FullSyncChoice.UPLOAD
elif ret == button_labels[1]:
return FullSyncChoice.DOWNLOAD
else:
return FullSyncChoice.CANCEL
def get_id_and_pass_from_user(
mw: aqt.main.AnkiQt, username="", password=""
) -> Tuple[str, str]:
diag = QDialog(mw)
diag.setWindowTitle("Anki")
diag.setWindowModality(Qt.WindowModal)
vbox = QVBoxLayout() vbox = QVBoxLayout()
l = QLabel( info_label = QLabel(
_( tr(TR.SYNC_ACCOUNT_REQUIRED, link="https://ankiweb.net/account/login")
"""\
<h1>Account Required</h1>
A free account is required to keep your collection synchronized. Please \
<a href="%s">sign up</a> for an account, then \
enter your details below."""
) )
% "https://ankiweb.net/account/login" info_label.setOpenExternalLinks(True)
) info_label.setWordWrap(True)
l.setOpenExternalLinks(True) vbox.addWidget(info_label)
l.setWordWrap(True)
vbox.addWidget(l)
vbox.addSpacing(20) vbox.addSpacing(20)
g = QGridLayout() g = QGridLayout()
l1 = QLabel(_("AnkiWeb ID:")) l1 = QLabel(tr(TR.SYNC_ANKIWEB_ID_LABEL))
g.addWidget(l1, 0, 0) g.addWidget(l1, 0, 0)
user = QLineEdit() user = QLineEdit()
user.setText(username)
g.addWidget(user, 0, 1) g.addWidget(user, 0, 1)
l2 = QLabel(_("Password:")) l2 = QLabel(tr(TR.SYNC_PASSWORD_LABEL))
g.addWidget(l2, 1, 0) g.addWidget(l2, 1, 0)
passwd = QLineEdit() passwd = QLineEdit()
passwd.setText(password)
passwd.setEchoMode(QLineEdit.Password) passwd.setEchoMode(QLineEdit.Password)
g.addWidget(passwd, 1, 1) g.addWidget(passwd, 1, 1)
vbox.addLayout(g) vbox.addLayout(g)
bb = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) bb = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel) # type: ignore
bb.button(QDialogButtonBox.Ok).setAutoDefault(True) bb.button(QDialogButtonBox.Ok).setAutoDefault(True)
qconnect(bb.accepted, d.accept) qconnect(bb.accepted, diag.accept)
qconnect(bb.rejected, d.reject) qconnect(bb.rejected, diag.reject)
vbox.addWidget(bb) vbox.addWidget(bb)
d.setLayout(vbox) diag.setLayout(vbox)
d.show() diag.show()
accepted = d.exec_()
u = user.text()
p = passwd.text()
if not accepted or not u or not p:
return
return (u, p)
def _confirmFullSync(self): accepted = diag.exec_()
self.mw.progress.finish() if not accepted:
if self.thread.localIsEmpty: return ("", "")
diag = askUserDialog( return (user.text().strip(), passwd.text())
_("Local collection has no cards. Download from AnkiWeb?"),
[_("Download from AnkiWeb"), _("Cancel")],
)
diag.setDefault(1)
else:
diag = askUserDialog(
_(
"""\
Your decks here and on AnkiWeb differ in such a way that they can't \
be merged together, so it's necessary to overwrite the decks on one \
side with the decks from the other.
If you choose download, Anki will download the collection from AnkiWeb, \
and any changes you have made on your computer since the last sync will \
be lost.
If you choose upload, Anki will upload your collection to AnkiWeb, and \
any changes you have made on AnkiWeb or your other devices since the \
last sync to this device will be lost.
After all devices are in sync, future reviews and added cards can be merged \
automatically."""
),
[_("Upload to AnkiWeb"), _("Download from AnkiWeb"), _("Cancel")],
)
diag.setDefault(2)
ret = diag.run()
if ret == _("Upload to AnkiWeb"):
self.thread.fullSyncChoice = "upload"
elif ret == _("Download from AnkiWeb"):
self.thread.fullSyncChoice = "download"
else:
self.thread.fullSyncChoice = "cancel"
self.mw.progress.start(immediate=True)
def _clockOff(self):
showWarning(
_(
"""\
Syncing requires the clock on your computer to be set correctly. Please \
fix the clock and try again."""
)
)
def _checkFailed(self):
showWarning(
_(
"""\
Your collection is in an inconsistent state. Please run Tools>\
Check Database, then sync again."""
)
)
# Sync thread
######################################################################
class SyncThread(QThread):
_event = pyqtSignal(str, str)
progress_event = pyqtSignal(int, int)
def __init__(self, path, hkey, auth=None, hostNum=None):
QThread.__init__(self)
self.path = path
self.hkey = hkey
self.auth = auth
self.hostNum = hostNum
self._abort = 0 # 1=flagged, 2=aborting
def flagAbort(self):
self._abort = 1
def run(self):
# init this first so an early crash doesn't cause an error
# in the main thread
self.syncMsg = ""
self.uname = ""
try:
self.col = Collection(self.path)
except:
self.fireEvent("corrupt")
return
self.server = RemoteServer(self.hkey, hostNum=self.hostNum)
self.client = Syncer(self.col, self.server)
self.sentTotal = 0
self.recvTotal = 0
def syncEvent(type):
self.fireEvent("sync", type)
def syncMsg(msg):
self.fireEvent("syncMsg", msg)
def http_progress(upload: int, download: int) -> None:
if not self._abort:
self.sentTotal += upload
self.recvTotal += download
self.progress_event.emit(self.sentTotal, self.recvTotal) # type: ignore
elif self._abort == 1:
self._abort = 2
raise Exception("sync cancelled")
self.server.client.progress_hook = http_progress
hooks.sync_stage_did_change.append(syncEvent)
hooks.sync_progress_did_change.append(syncMsg)
# run sync and catch any errors
try:
self._sync()
except:
err = traceback.format_exc()
self.fireEvent("error", err)
finally:
# don't bump mod time unless we explicitly save
self.col.close(save=False, downgrade=False)
hooks.sync_stage_did_change.remove(syncEvent)
hooks.sync_progress_did_change.remove(syncMsg)
def _abortingSync(self):
try:
return self.client.sync()
except Exception as e:
if "sync cancelled" in str(e):
self.server.abort()
raise
else:
raise
def _sync(self):
if self.auth:
# need to authenticate and obtain host key
self.hkey = self.server.hostKey(*self.auth)
if not self.hkey:
# provided details were invalid
return self.fireEvent("badAuth")
else:
# write new details and tell calling thread to save
self.fireEvent("newKey", self.hkey)
# run sync and check state
try:
ret = self._abortingSync()
except Exception as e:
log = traceback.format_exc()
err = repr(str(e))
if (
"Unable to find the server" in err
or "Errno 2" in err
or "getaddrinfo" in err
):
self.fireEvent("offline")
elif "sync cancelled" in err:
pass
else:
self.fireEvent("error", log)
return
if ret == "badAuth":
return self.fireEvent("badAuth")
elif ret == "clockOff":
return self.fireEvent("clockOff")
elif ret == "basicCheckFailed" or ret == "sanityCheckFailed":
return self.fireEvent("checkFailed")
# full sync?
if ret == "fullSync":
return self._fullSync()
# save and note success state
if ret == "noChanges":
self.fireEvent("noChanges")
elif ret == "success":
self.fireEvent("success")
elif ret == "serverAbort":
self.syncMsg = self.client.syncMsg
return
else:
self.fireEvent("error", "Unknown sync return code.")
self.syncMsg = self.client.syncMsg
self.uname = self.client.uname
self.hostNum = self.client.hostNum
def _fullSync(self):
# tell the calling thread we need a decision on sync direction, and
# wait for a reply
self.fullSyncChoice = False
self.localIsEmpty = self.col.isEmpty()
self.fireEvent("fullSync")
while not self.fullSyncChoice:
time.sleep(0.1)
f = self.fullSyncChoice
if f == "cancel":
return
self.client = FullSyncer(
self.col, self.hkey, self.server.client, hostNum=self.hostNum
)
try:
if f == "upload":
if not self.client.upload():
self.fireEvent("upbad")
else:
ret = self.client.download()
if ret == "downloadClobber":
self.fireEvent(ret)
return
except Exception as e:
if "sync cancelled" in str(e):
return
raise
def fireEvent(self, cmd, arg=""):
self._event.emit(cmd, arg)

View file

@ -66,8 +66,9 @@ class TaskManager(QObject):
task: Callable, task: Callable,
on_done: Optional[Callable[[Future], None]] = None, on_done: Optional[Callable[[Future], None]] = None,
parent: Optional[QWidget] = None, parent: Optional[QWidget] = None,
label: Optional[str] = None,
): ):
self.mw.progress.start(parent=parent) self.mw.progress.start(parent=parent, label=label)
def wrapped_done(fut): def wrapped_done(fut):
self.mw.progress.finish() self.mw.progress.finish()

View file

@ -1,34 +1,56 @@
### Messages shown when synchronizing with AnkiWeb. ### Messages shown when synchronizing with AnkiWeb.
## Media synchronization ## Media synchronization
sync-media-added-count = Added: { $up }↑ { $down }↓ sync-media-added-count = Added: { $up }↑ { $down }↓
sync-media-removed-count = Removed: { $up }↑ { $down }↓ sync-media-removed-count = Removed: { $up }↑ { $down }↓
sync-media-checked-count = Checked: { $count } sync-media-checked-count = Checked: { $count }
sync-media-starting = Media sync starting... sync-media-starting = Media sync starting...
sync-media-complete = Media sync complete. sync-media-complete = Media sync complete.
sync-media-failed = Media sync failed. sync-media-failed = Media sync failed.
sync-media-aborting = Media sync aborting... sync-media-aborting = Media sync aborting...
sync-media-aborted = Media sync aborted. sync-media-aborted = Media sync aborted.
# Shown in the sync log to indicate media syncing will not be done, because it # Shown in the sync log to indicate media syncing will not be done, because it
# was previously disabled by the user in the preferences screen. # was previously disabled by the user in the preferences screen.
sync-media-disabled = Media sync disabled. sync-media-disabled = Media sync disabled.
sync-abort-button = Abort
sync-media-log-button = Media Log
# Title of the screen that shows syncing progress history # Title of the screen that shows syncing progress history
sync-media-log-title = Media Sync Log sync-media-log-title = Media Sync Log
## Error messages ## Error messages / dialogs
sync-conflict = Only one copy of Anki can sync to your account at once. Please wait a few minutes, then try again. sync-conflict = Only one copy of Anki can sync to your account at once. Please wait a few minutes, then try again.
sync-server-error = AnkiWeb encountered a problem. Please try again in a few minutes. sync-server-error = AnkiWeb encountered a problem. Please try again in a few minutes.
sync-client-too-old = sync-client-too-old = Your Anki version is too old. Please update to the latest version to continue syncing.
Your Anki version is too old. Please update to the latest version to continue syncing.
sync-wrong-pass = AnkiWeb ID or password was incorrect; please try again. sync-wrong-pass = AnkiWeb ID or password was incorrect; please try again.
sync-resync-required = sync-resync-required = Please sync again. If this message keeps appearing, please post on the support site.
Please sync again. If this message keeps appearing, please post on the support site.
sync-must-wait-for-end = Anki is currently syncing. Please wait for the sync to complete, then try again. sync-must-wait-for-end = Anki is currently syncing. Please wait for the sync to complete, then try again.
sync-confirm-empty-download = Local collection has no cards. Download from AnkiWeb?
sync-conflict-explanation =
Your decks here and on AnkiWeb differ in such a way that they can't be merged together, so it's necessary to overwrite the decks on one side with the decks from the other.
If you choose download, Anki will download the collection from AnkiWeb, and any changes you have made on your computer since the last sync will be lost.
If you choose upload, Anki will upload your collection to AnkiWeb, and any changes you have made on AnkiWeb or your other devices since the last sync to this device will be lost.
After all devices are in sync, future reviews and added cards can be merged automatically.
sync-ankiweb-id-label = AnkiWeb ID:
sync-password-label = Password:
sync-account-required =
<h1>Account Required</h1>
A free account is required to keep your collection synchronized. Please <a href="{ $link }">sign up</a> for an account, then enter your details below.
## Buttons
sync-media-log-button = Media Log
sync-abort-button = Abort
sync-download-from-ankiweb = Download from AnkiWeb
sync-upload-to-ankiweb = Upload to AnkiWeb
sync-cancel-button = Cancel
## Progress
sync-downloading-from-ankiweb = Downloading from AnkiWeb...
sync-uploading-to-ankiweb = Uploading to AnkiWeb...
sync-syncing = Syncing...
sync-checking = Checking...

View file

@ -8,7 +8,6 @@ use crate::{
backend_proto::builtin_search_order::BuiltinSortKind, backend_proto::builtin_search_order::BuiltinSortKind,
backend_proto::{ backend_proto::{
AddOrUpdateDeckConfigLegacyIn, BackendResult, Empty, RenderedTemplateReplacement, AddOrUpdateDeckConfigLegacyIn, BackendResult, Empty, RenderedTemplateReplacement,
SyncMediaIn,
}, },
card::{Card, CardID}, card::{Card, CardID},
card::{CardQueue, CardType}, card::{CardQueue, CardType},
@ -66,7 +65,7 @@ impl ThrottlingProgressHandler {
fn update(&mut self, progress: impl Into<Progress>) -> bool { fn update(&mut self, progress: impl Into<Progress>) -> bool {
let now = coarsetime::Instant::now(); let now = coarsetime::Instant::now();
if now.duration_since(self.last_update).as_f64() < 0.1 { if now.duration_since(self.last_update).as_f64() < 0.1 {
return false; return true;
} }
self.last_update = now; self.last_update = now;
let mut guard = self.state.lock().unwrap(); let mut guard = self.state.lock().unwrap();
@ -981,7 +980,7 @@ impl BackendService for Backend {
Ok(().into()) Ok(().into())
} }
fn sync_media(&mut self, input: SyncMediaIn) -> BackendResult<Empty> { fn sync_media(&mut self, input: pb::SyncAuth) -> BackendResult<Empty> {
let mut guard = self.col.lock().unwrap(); let mut guard = self.col.lock().unwrap();
let col = guard.as_mut().unwrap(); let col = guard.as_mut().unwrap();
@ -1177,7 +1176,11 @@ impl Backend {
} }
fn new_progress_handler(&self) -> ThrottlingProgressHandler { fn new_progress_handler(&self) -> ThrottlingProgressHandler {
self.progress_state.lock().unwrap().want_abort = false; {
let mut guard = self.progress_state.lock().unwrap();
guard.want_abort = false;
guard.last_progress = None;
}
ThrottlingProgressHandler { ThrottlingProgressHandler {
state: self.progress_state.clone(), state: self.progress_state.clone(),
last_update: coarsetime::Instant::now(), last_update: coarsetime::Instant::now(),
@ -1186,7 +1189,7 @@ impl Backend {
fn sync_media_inner( fn sync_media_inner(
&mut self, &mut self,
input: pb::SyncMediaIn, input: pb::SyncAuth,
folder: PathBuf, folder: PathBuf,
db: PathBuf, db: PathBuf,
log: Logger, log: Logger,
@ -1199,7 +1202,7 @@ impl Backend {
let mgr = MediaManager::new(&folder, &db)?; let mgr = MediaManager::new(&folder, &db)?;
let mut rt = Runtime::new().unwrap(); let mut rt = Runtime::new().unwrap();
let sync_fut = mgr.sync_media(progress_fn, &input.endpoint, &input.hkey, log); let sync_fut = mgr.sync_media(progress_fn, input.host_number, &input.hkey, log);
let abortable_sync = Abortable::new(sync_fut, abort_reg); let abortable_sync = Abortable::new(sync_fut, abort_reg);
let ret = match rt.block_on(abortable_sync) { let ret = match rt.block_on(abortable_sync) {
Ok(sync_result) => sync_result, Ok(sync_result) => sync_result,
@ -1491,9 +1494,18 @@ impl From<SyncOutput> for pb::SyncCollectionOut {
SyncActionRequired::NoChanges => { SyncActionRequired::NoChanges => {
pb::sync_collection_out::ChangesRequired::NoChanges as i32 pb::sync_collection_out::ChangesRequired::NoChanges as i32
} }
SyncActionRequired::FullSyncRequired => { SyncActionRequired::FullSyncRequired {
upload_ok,
download_ok,
} => {
if !upload_ok {
pb::sync_collection_out::ChangesRequired::FullDownload as i32
} else if !download_ok {
pb::sync_collection_out::ChangesRequired::FullUpload as i32
} else {
pb::sync_collection_out::ChangesRequired::FullSync as i32 pb::sync_collection_out::ChangesRequired::FullSync as i32
} }
}
SyncActionRequired::NormalSyncRequired => { SyncActionRequired::NormalSyncRequired => {
pb::sync_collection_out::ChangesRequired::NormalSync as i32 pb::sync_collection_out::ChangesRequired::NormalSync as i32
} }

View file

@ -130,14 +130,14 @@ impl MediaManager {
pub async fn sync_media<'a, F>( pub async fn sync_media<'a, F>(
&'a self, &'a self,
progress: F, progress: F,
endpoint: &'a str, host_number: u32,
hkey: &'a str, hkey: &'a str,
log: Logger, log: Logger,
) -> Result<()> ) -> Result<()>
where where
F: FnMut(MediaSyncProgress) -> bool, F: FnMut(MediaSyncProgress) -> bool,
{ {
let mut syncer = MediaSyncer::new(self, progress, endpoint, log); let mut syncer = MediaSyncer::new(self, progress, host_number, log);
syncer.sync(hkey).await syncer.sync(hkey).await
} }

View file

@ -45,7 +45,7 @@ where
client: Client, client: Client,
progress_cb: P, progress_cb: P,
progress: MediaSyncProgress, progress: MediaSyncProgress,
endpoint: &'a str, endpoint: String,
log: Logger, log: Logger,
} }
@ -132,23 +132,36 @@ struct FinalizeResponse {
err: String, err: String,
} }
fn media_sync_endpoint(host_number: u32) -> String {
if let Ok(endpoint) = std::env::var("SYNC_ENDPOINT_MEDIA") {
endpoint
} else {
let suffix = if host_number > 0 {
format!("{}", host_number)
} else {
"".to_string()
};
format!("https://sync{}.ankiweb.net/msync/", suffix)
}
}
impl<P> MediaSyncer<'_, P> impl<P> MediaSyncer<'_, P>
where where
P: FnMut(MediaSyncProgress) -> bool, P: FnMut(MediaSyncProgress) -> bool,
{ {
pub fn new<'a>( pub fn new(
mgr: &'a MediaManager, mgr: &MediaManager,
progress_cb: P, progress_cb: P,
endpoint: &'a str, host_number: u32,
log: Logger, log: Logger,
) -> MediaSyncer<'a, P> { ) -> MediaSyncer<'_, P> {
let client = Client::builder() let client = Client::builder()
.connect_timeout(Duration::from_secs(30)) .connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(60)) .timeout(Duration::from_secs(60))
.build() .build()
.unwrap(); .unwrap();
let endpoint = media_sync_endpoint(host_number);
let ctx = mgr.dbctx(); let ctx = mgr.dbctx();
MediaSyncer { MediaSyncer {
mgr, mgr,
ctx, ctx,
@ -817,8 +830,7 @@ mod test {
let log = crate::log::terminal(); let log = crate::log::terminal();
let mgr = MediaManager::new(&media_dir, &media_db)?; let mgr = MediaManager::new(&media_dir, &media_db)?;
mgr.sync_media(progress, "https://sync.ankiweb.net/msync/", hkey, log) mgr.sync_media(progress, 0, hkey, log).await?;
.await?;
Ok(()) Ok(())
} }

View file

@ -244,6 +244,15 @@ impl super::SqliteStorage {
Ok(entries) Ok(entries)
} }
pub(crate) fn have_at_least_one_card(&self) -> Result<bool> {
self.db
.prepare_cached("select null from cards")?
.query(NO_PARAMS)?
.next()
.map(|o| o.is_none())
.map_err(Into::into)
}
} }
#[cfg(test)] #[cfg(test)]

View file

@ -185,13 +185,13 @@ pub struct FullSyncProgress {
pub total_bytes: usize, pub total_bytes: usize,
} }
#[derive(PartialEq)] #[derive(PartialEq, Debug)]
pub enum SyncActionRequired { pub enum SyncActionRequired {
NoChanges, NoChanges,
FullSyncRequired, FullSyncRequired { upload_ok: bool, download_ok: bool },
NormalSyncRequired, NormalSyncRequired,
} }
#[derive(Debug)]
struct SyncState { struct SyncState {
required: SyncActionRequired, required: SyncActionRequired,
local_is_newer: bool, local_is_newer: bool,
@ -227,13 +227,26 @@ impl NormalSyncer<'_> {
} }
pub async fn sync(&mut self) -> Result<SyncOutput> { pub async fn sync(&mut self) -> Result<SyncOutput> {
debug!(self.col.log, "fetching meta...");
let state: SyncState = self.get_sync_state().await?; let state: SyncState = self.get_sync_state().await?;
debug!(self.col.log, "fetched"; "state"=>?&state);
match state.required { match state.required {
SyncActionRequired::NoChanges => Ok(state.into()), SyncActionRequired::NoChanges => Ok(state.into()),
SyncActionRequired::FullSyncRequired => Ok(state.into()), SyncActionRequired::FullSyncRequired { .. } => Ok(state.into()),
SyncActionRequired::NormalSyncRequired => { SyncActionRequired::NormalSyncRequired => {
// fixme: transaction self.col.storage.begin_trx()?;
self.normal_sync_inner(state).await match self.normal_sync_inner(state).await {
Ok(success) => {
self.col.storage.commit_trx()?;
Ok(success)
}
Err(e) => {
// fixme: full sync on sanity check failure, etc
self.col.storage.rollback_trx()?;
let _ = self.remote.abort().await;
Err(e)
}
}
} }
} }
} }
@ -241,6 +254,7 @@ impl NormalSyncer<'_> {
async fn get_sync_state(&self) -> Result<SyncState> { async fn get_sync_state(&self) -> Result<SyncState> {
let remote: SyncMeta = self.remote.meta().await?; let remote: SyncMeta = self.remote.meta().await?;
if !remote.should_continue { if !remote.should_continue {
debug!(self.col.log, "server says abort"; "message"=>&remote.server_message);
return Err(AnkiError::SyncError { return Err(AnkiError::SyncError {
info: remote.server_message, info: remote.server_message,
kind: SyncErrorKind::ServerMessage, kind: SyncErrorKind::ServerMessage,
@ -248,7 +262,9 @@ impl NormalSyncer<'_> {
} }
let local = self.col.sync_meta()?; let local = self.col.sync_meta()?;
if (remote.current_time.0 - local.current_time.0).abs() > 300 { let delta = remote.current_time.0 - local.current_time.0;
if delta.abs() > 300 {
debug!(self.col.log, "clock off"; "delta"=>delta);
return Err(AnkiError::SyncError { return Err(AnkiError::SyncError {
// fixme: need to rethink error handling; defer translation and pass in time difference // fixme: need to rethink error handling; defer translation and pass in time difference
info: "".into(), info: "".into(),
@ -259,7 +275,12 @@ impl NormalSyncer<'_> {
let required = if remote.modified == local.modified { let required = if remote.modified == local.modified {
SyncActionRequired::NoChanges SyncActionRequired::NoChanges
} else if remote.schema != local.schema { } else if remote.schema != local.schema {
SyncActionRequired::FullSyncRequired let upload_ok = !local.empty;
let download_ok = !remote.empty;
SyncActionRequired::FullSyncRequired {
upload_ok,
download_ok,
}
} else { } else {
SyncActionRequired::NormalSyncRequired SyncActionRequired::NormalSyncRequired
}; };
@ -407,7 +428,6 @@ pub async fn sync_login(username: &str, password: &str) -> Result<SyncAuth> {
} }
impl Collection { impl Collection {
// fixme: upload only, download only case
pub async fn get_sync_status(&mut self, auth: SyncAuth) -> Result<SyncOutput> { pub async fn get_sync_status(&mut self, auth: SyncAuth) -> Result<SyncOutput> {
NormalSyncer::new(self, auth) NormalSyncer::new(self, auth)
.get_sync_state() .get_sync_state()
@ -470,7 +490,7 @@ impl Collection {
server_message: "".into(), server_message: "".into(),
should_continue: true, should_continue: true,
host_number: 0, host_number: 0,
empty: false, empty: self.storage.have_at_least_one_card()?,
}) })
} }