diff --git a/proto/anki/collection.proto b/proto/anki/collection.proto index 1a760035e..6d78cd35b 100644 --- a/proto/anki/collection.proto +++ b/proto/anki/collection.proto @@ -8,6 +8,7 @@ option java_multiple_files = true; package anki.collection; import "anki/generic.proto"; +import "anki/sync.proto"; service CollectionService { rpc CheckDatabase(generic.Empty) returns (CheckDatabaseResponse); @@ -100,12 +101,6 @@ message OpChangesAfterUndo { } message Progress { - message MediaSync { - string checked = 1; - string added = 2; - string removed = 3; - } - message FullSync { uint32 transferred = 1; uint32 total = 2; @@ -136,7 +131,7 @@ message Progress { oneof value { generic.Empty none = 1; - MediaSync media_sync = 2; + sync.MediaSyncProgress media_sync = 2; string media_check = 3; FullSync full_sync = 4; NormalSync normal_sync = 5; diff --git a/proto/anki/sync.proto b/proto/anki/sync.proto index 368287195..bc65359c8 100644 --- a/proto/anki/sync.proto +++ b/proto/anki/sync.proto @@ -9,17 +9,19 @@ package anki.sync; import "anki/generic.proto"; -/// Syncing methods are only available with a Backend handle. +// Syncing methods are only available with a Backend handle. service SyncService {} service BackendSyncService { rpc SyncMedia(SyncAuth) returns (generic.Empty); rpc AbortMediaSync(generic.Empty) returns (generic.Empty); + // Can be used by the frontend to detect an active sync. If the sync aborted + // with an error, the next call to this method will return the error. + rpc MediaSyncStatus(generic.Empty) returns (MediaSyncStatusResponse); rpc SyncLogin(SyncLoginRequest) returns (SyncAuth); rpc SyncStatus(SyncAuth) returns (SyncStatusResponse); - rpc SyncCollection(SyncAuth) returns (SyncCollectionResponse); - rpc FullUpload(SyncAuth) returns (generic.Empty); - rpc FullDownload(SyncAuth) returns (generic.Empty); + rpc SyncCollection(SyncCollectionRequest) returns (SyncCollectionResponse); + rpc FullUploadOrDownload(FullUploadOrDownloadRequest) returns (generic.Empty); rpc AbortSync(generic.Empty) returns (generic.Empty); } @@ -45,6 +47,11 @@ message SyncStatusResponse { optional string new_endpoint = 4; } +message SyncCollectionRequest { + SyncAuth auth = 1; + bool sync_media = 2; +} + message SyncCollectionResponse { enum ChangesRequired { NO_CHANGES = 0; @@ -60,4 +67,23 @@ message SyncCollectionResponse { string server_message = 2; ChangesRequired required = 3; optional string new_endpoint = 4; + int32 server_media_usn = 5; +} + +message MediaSyncStatusResponse { + bool active = 1; + MediaSyncProgress progress = 2; +} + +message MediaSyncProgress { + string checked = 1; + string added = 2; + string removed = 3; +} + +message FullUploadOrDownloadRequest { + SyncAuth auth = 1; + bool upload = 2; + // if not provided, media syncing will be skipped + optional int32 server_usn = 3; } diff --git a/pylib/anki/collection.py b/pylib/anki/collection.py index 8073334c6..a17664e6a 100644 --- a/pylib/anki/collection.py +++ b/pylib/anki/collection.py @@ -16,6 +16,7 @@ from anki import ( links_pb2, search_pb2, stats_pb2, + sync_pb2, ) from anki._legacy import DeprecatedNamesMixin, deprecated from anki.sync_pb2 import SyncLoginRequest @@ -49,6 +50,7 @@ AddImageOcclusionNoteRequest = image_occlusion_pb2.AddImageOcclusionNoteRequest GetImageOcclusionNoteResponse = image_occlusion_pb2.GetImageOcclusionNoteResponse AddonInfo = ankiweb_pb2.AddonInfo CheckForUpdateResponse = ankiweb_pb2.CheckForUpdateResponse +MediaSyncStatus = sync_pb2.MediaSyncStatusResponse import copy import os @@ -1246,11 +1248,14 @@ class Collection(DeprecatedNamesMixin): def abort_sync(self) -> None: self._backend.abort_sync() - def full_upload(self, auth: SyncAuth) -> None: - self._backend.full_upload(auth) - - def full_download(self, auth: SyncAuth) -> None: - self._backend.full_download(auth) + def full_upload_or_download( + self, *, auth: SyncAuth, server_usn: int | None, upload: bool + ) -> None: + self._backend.full_upload_or_download( + sync_pb2.FullUploadOrDownloadRequest( + auth=auth, server_usn=server_usn, upload=upload + ) + ) def sync_login( self, username: str, password: str, endpoint: str | None @@ -1259,8 +1264,8 @@ class Collection(DeprecatedNamesMixin): SyncLoginRequest(username=username, password=password, endpoint=endpoint) ) - def sync_collection(self, auth: SyncAuth) -> SyncOutput: - return self._backend.sync_collection(auth) + def sync_collection(self, auth: SyncAuth, sync_media: bool) -> SyncOutput: + return self._backend.sync_collection(auth=auth, sync_media=sync_media) def sync_media(self, auth: SyncAuth) -> None: self._backend.sync_media(auth) @@ -1268,6 +1273,10 @@ class Collection(DeprecatedNamesMixin): def sync_status(self, auth: SyncAuth) -> SyncStatus: return self._backend.sync_status(auth) + def media_sync_status(self) -> MediaSyncStatus: + "This will throw if the sync failed with an error." + return self._backend.media_sync_status() + def get_preferences(self) -> Preferences: return self._backend.get_preferences() diff --git a/qt/aqt/forms/synclog.ui b/qt/aqt/forms/synclog.ui index e0d93b35a..c48416bc9 100644 --- a/qt/aqt/forms/synclog.ui +++ b/qt/aqt/forms/synclog.ui @@ -6,8 +6,8 @@ 0 0 - 557 - 295 + 482 + 90 @@ -15,12 +15,15 @@ - - - true + + + TextLabel - - + + Qt::PlainText + + + Qt::AlignCenter diff --git a/qt/aqt/main.py b/qt/aqt/main.py index f707ffc02..df25ac058 100644 --- a/qt/aqt/main.py +++ b/qt/aqt/main.py @@ -1019,9 +1019,6 @@ title="{}" {}>{}""".format( def _sync_collection_and_media(self, after_sync: Callable[[], None]) -> None: "Caller should ensure auth available." - # start media sync if not already running - if not self.media_syncer.is_syncing(): - self.media_syncer.start() def on_collection_sync_finished() -> None: self.col.clear_python_undo() diff --git a/qt/aqt/mediasync.py b/qt/aqt/mediasync.py index a5617c0b6..f0312102e 100644 --- a/qt/aqt/mediasync.py +++ b/qt/aqt/mediasync.py @@ -5,110 +5,93 @@ from __future__ import annotations import time from concurrent.futures import Future -from dataclasses import dataclass -from typing import Any, Callable, Union +from datetime import datetime +from typing import Any, Callable import aqt import aqt.forms import aqt.main -from anki.collection import Progress +from anki.collection import Collection from anki.errors import Interrupted -from anki.types import assert_exhaustive from anki.utils import int_time from aqt import gui_hooks -from aqt.qt import QDialog, QDialogButtonBox, QPushButton, QTextCursor, QTimer, qconnect -from aqt.utils import disable_help_button, tr - -LogEntry = Union[Progress.MediaSync, str] - - -@dataclass -class LogEntryWithTime: - time: int - entry: LogEntry +from aqt.operations import QueryOp +from aqt.qt import QDialog, QDialogButtonBox, QPushButton, Qt, QTimer, qconnect +from aqt.utils import disable_help_button, show_info, tr class MediaSyncer: def __init__(self, mw: aqt.main.AnkiQt) -> None: self.mw = mw self._syncing: bool = False - self._log: list[LogEntryWithTime] = [] - self._progress_timer: QTimer | None = None + self.last_progress = "" + self._last_progress_at = 0 gui_hooks.media_sync_did_start_or_stop.append(self._on_start_stop) - def _on_progress(self) -> None: - progress = self.mw.col.latest_progress() - if not progress.HasField("media_sync"): - return - sync_progress = progress.media_sync - self._log_and_notify(sync_progress) - def start(self) -> None: "Start media syncing in the background, if it's not already running." + if not self.mw.pm.media_syncing_enabled() or not ( + auth := self.mw.pm.sync_auth() + ): + return + + def run(col: Collection) -> None: + col.sync_media(auth) + + # this will exit after the thread is spawned, but may block if there's an existing + # backend lock + QueryOp(parent=aqt.mw, op=run, success=lambda _: 1).run_in_background() + + self.start_monitoring() + + def start_monitoring(self) -> None: if self._syncing: return - - if not self.mw.pm.media_syncing_enabled(): - self._log_and_notify(tr.sync_media_disabled()) - return - - auth = self.mw.pm.sync_auth() - if auth is None: - return - - self._log_and_notify(tr.sync_media_starting()) self._syncing = True - self._progress_timer = self.mw.progress.timer( - 1000, self._on_progress, True, True, parent=self.mw - ) gui_hooks.media_sync_did_start_or_stop(True) + self._update_progress(tr.sync_media_starting()) - def run() -> None: - self.mw.col.sync_media(auth) + def monitor() -> None: + while True: + resp = self.mw.col.media_sync_status() + if not resp.active: + return + if p := resp.progress: + self._update_progress(f"{p.added}, {p.removed}, {p.checked}") - self.mw.taskman.run_in_background(run, self._on_finished) + time.sleep(0.25) - def _log_and_notify(self, entry: LogEntry) -> None: - entry_with_time = LogEntryWithTime(time=int_time(), entry=entry) - self._log.append(entry_with_time) - self.mw.taskman.run_on_main( - lambda: gui_hooks.media_sync_did_progress(entry_with_time) - ) + self.mw.taskman.run_in_background(monitor, self._on_finished) + + def _update_progress(self, progress: str) -> None: + self.last_progress = progress + self.mw.taskman.run_on_main(lambda: gui_hooks.media_sync_did_progress(progress)) def _on_finished(self, future: Future) -> None: self._syncing = False - if self._progress_timer: - self._progress_timer.stop() - self._progress_timer.deleteLater() - self._progress_timer = None + self._last_progress_at = int_time() gui_hooks.media_sync_did_start_or_stop(False) exc = future.exception() if exc is not None: self._handle_sync_error(exc) else: - self._log_and_notify(tr.sync_media_complete()) + self._update_progress(tr.sync_media_complete()) def _handle_sync_error(self, exc: BaseException) -> None: if isinstance(exc, Interrupted): - self._log_and_notify(tr.sync_media_aborted()) + self._update_progress(tr.sync_media_aborted()) return else: - # Avoid popups for errors; they can cause a deadlock if - # a modal window happens to be active, or a duplicate auth - # failed message if the password is changed. - self._log_and_notify(str(exc)) + show_info(str(exc), modality=Qt.WindowModality.NonModal) return - def entries(self) -> list[LogEntryWithTime]: - return self._log - def abort(self) -> None: if not self.is_syncing(): return - self._log_and_notify(tr.sync_media_aborting()) self.mw.col.set_wants_abort() self.mw.col.abort_media_sync() + self._update_progress(tr.sync_media_aborting()) def is_syncing(self) -> bool: return self._syncing @@ -140,11 +123,7 @@ class MediaSyncer: if self.is_syncing(): return 0 - if self._log: - last = self._log[-1].time - else: - last = 0 - return int_time() - last + return int_time() - self._last_progress_at class MediaSyncDialog(QDialog): @@ -172,10 +151,7 @@ class MediaSyncDialog(QDialog): gui_hooks.media_sync_did_progress.append(self._on_log_entry) gui_hooks.media_sync_did_start_or_stop.append(self._on_start_stop) - self.form.plainTextEdit.setPlainText( - "\n".join(self._entry_to_text(x) for x in syncer.entries()) - ) - self.form.plainTextEdit.moveCursor(QTextCursor.MoveOperation.End) + self._on_log_entry(syncer.last_progress) self.show() def reject(self) -> None: @@ -197,24 +173,11 @@ class MediaSyncDialog(QDialog): self._syncer.abort() self.abort_button.setHidden(True) - def _time_and_text(self, stamp: int, text: str) -> str: - asctime = time.asctime(time.localtime(stamp)) - return f"{asctime}: {text}" - - def _entry_to_text(self, entry: LogEntryWithTime) -> str: - if isinstance(entry.entry, str): - txt = entry.entry - elif isinstance(entry.entry, Progress.MediaSync): - txt = self._logentry_to_text(entry.entry) - else: - assert_exhaustive(entry.entry) - return self._time_and_text(entry.time, txt) - - def _logentry_to_text(self, e: Progress.MediaSync) -> str: - return f"{e.added}, {e.removed}, {e.checked}" - - def _on_log_entry(self, entry: LogEntryWithTime) -> None: - self.form.plainTextEdit.appendPlainText(self._entry_to_text(entry)) + def _on_log_entry(self, entry: str) -> None: + dt = datetime.fromtimestamp(int_time()) + time = dt.strftime("%H:%M:%S") + text = f"{time}: {entry}" + self.form.log_label.setText(text) if not self._syncer.is_syncing(): self.abort_button.setHidden(True) diff --git a/qt/aqt/sync.py b/qt/aqt/sync.py index 34f7a2dc1..f44a49b1f 100644 --- a/qt/aqt/sync.py +++ b/qt/aqt/sync.py @@ -113,14 +113,15 @@ def sync_collection(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: if out.server_message: showText(out.server_message) if out.required == out.NO_CHANGES: - # all done + # all done; track media progress + mw.media_syncer.start_monitoring() return on_done() else: full_sync(mw, out, on_done) mw.col.save(trx=False) mw.taskman.with_progress( - lambda: mw.col.sync_collection(auth), + lambda: mw.col.sync_collection(auth, mw.pm.media_syncing_enabled()), on_future_done, label=tr.sync_checking(), immediate=True, @@ -130,10 +131,11 @@ def sync_collection(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: def full_sync( mw: aqt.main.AnkiQt, out: SyncOutput, on_done: Callable[[], None] ) -> None: + server_usn = out.server_media_usn if mw.pm.media_syncing_enabled() else None if out.required == out.FULL_DOWNLOAD: - confirm_full_download(mw, on_done) + confirm_full_download(mw, server_usn, on_done) elif out.required == out.FULL_UPLOAD: - full_upload(mw, on_done) + full_upload(mw, server_usn, on_done) else: button_labels: list[str] = [ tr.sync_upload_to_ankiweb(), @@ -143,9 +145,9 @@ def full_sync( def callback(choice: int) -> None: if choice == 0: - full_upload(mw, on_done) + full_upload(mw, server_usn, on_done) elif choice == 1: - full_download(mw, on_done) + full_download(mw, server_usn, on_done) else: on_done() @@ -157,13 +159,15 @@ def full_sync( ) -def confirm_full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: +def confirm_full_download( + mw: aqt.main.AnkiQt, server_usn: int, on_done: Callable[[], None] +) -> None: # confirmation step required, as some users customize their notetypes # in an empty collection, then want to upload them if not askUser(tr.sync_confirm_empty_download()): return on_done() else: - mw.closeAllWindows(lambda: full_download(mw, on_done)) + mw.closeAllWindows(lambda: full_download(mw, server_usn, on_done)) def on_full_sync_timer(mw: aqt.main.AnkiQt, label: str) -> None: @@ -185,7 +189,9 @@ def on_full_sync_timer(mw: aqt.main.AnkiQt, label: str) -> None: mw.col.abort_sync() -def full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: +def full_download( + mw: aqt.main.AnkiQt, server_usn: int, on_done: Callable[[], None] +) -> None: label = tr.sync_downloading_from_ankiweb() def on_timer() -> None: @@ -201,7 +207,9 @@ def full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: def download() -> None: mw.create_backup_now() mw.col.close_for_full_sync() - mw.col.full_download(mw.pm.sync_auth()) + mw.col.full_upload_or_download( + auth=mw.pm.sync_auth(), server_usn=server_usn, upload=False + ) def on_future_done(fut: Future) -> None: timer.stop() @@ -211,7 +219,7 @@ def full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: fut.result() except Exception as err: handle_sync_error(mw, err) - mw.media_syncer.start() + mw.media_syncer.start_monitoring() return on_done() mw.taskman.with_progress( @@ -220,7 +228,9 @@ def full_download(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: ) -def full_upload(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: +def full_upload( + mw: aqt.main.AnkiQt, server_usn: int | None, on_done: Callable[[], None] +) -> None: gui_hooks.collection_will_temporarily_close(mw.col) mw.col.close_for_full_sync() @@ -242,11 +252,13 @@ def full_upload(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: except Exception as err: handle_sync_error(mw, err) return on_done() - mw.media_syncer.start() + mw.media_syncer.start_monitoring() return on_done() mw.taskman.with_progress( - lambda: mw.col.full_upload(mw.pm.sync_auth()), + lambda: mw.col.full_upload_or_download( + auth=mw.pm.sync_auth(), server_usn=server_usn, upload=True + ), on_future_done, ) diff --git a/qt/aqt/utils.py b/qt/aqt/utils.py index 32b9215b1..8b0f833d9 100644 --- a/qt/aqt/utils.py +++ b/qt/aqt/utils.py @@ -141,12 +141,13 @@ class MessageBox(QMessageBox): buttons: Sequence[str | QMessageBox.StandardButton] | None = None, default_button: int = 0, textFormat: Qt.TextFormat = Qt.TextFormat.PlainText, + modality: Qt.WindowModality = Qt.WindowModality.WindowModal, ) -> None: parent = parent or aqt.mw.app.activeWindow() or aqt.mw super().__init__(parent) self.setText(text) self.setWindowTitle(title) - self.setWindowModality(Qt.WindowModality.WindowModal) + self.setWindowModality(modality) self.setIcon(icon) if icon == QMessageBox.Icon.Question and theme_manager.night_mode: img = self.iconPixmap().toImage() diff --git a/qt/tools/genhooks_gui.py b/qt/tools/genhooks_gui.py index 5145925f5..6a4cb0567 100644 --- a/qt/tools/genhooks_gui.py +++ b/qt/tools/genhooks_gui.py @@ -881,7 +881,7 @@ gui_hooks.webview_did_inject_style_into_page.append(mytest) ), Hook( name="media_sync_did_progress", - args=["entry: aqt.mediasync.LogEntryWithTime"], + args=["entry: str"], ), Hook(name="media_sync_did_start_or_stop", args=["running: bool"]), Hook( diff --git a/rslib/process/src/lib.rs b/rslib/process/src/lib.rs index aebcf9fb3..fcf91ae2c 100644 --- a/rslib/process/src/lib.rs +++ b/rslib/process/src/lib.rs @@ -17,7 +17,7 @@ pub enum Error { cmdline: String, source: std::io::Error, }, - #[snafu(display("Fail with code {code:?}: {cmdline}"))] + #[snafu(display("Failed with code {code:?}: {cmdline}"))] ReturnedError { cmdline: String, code: Option }, #[snafu(display("Couldn't decode stdout/stderr as utf8"))] InvalidUtf8 { diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index eb611dafd..4d4bd95eb 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -55,6 +55,7 @@ pub struct BackendInner { runtime: OnceCell, state: Mutex, backup_task: Mutex>>>, + media_sync_task: Mutex>>>, web_client: OnceCell, } @@ -89,6 +90,7 @@ impl Backend { runtime: OnceCell::new(), state: Mutex::new(BackendState::default()), backup_task: Mutex::new(None), + media_sync_task: Mutex::new(None), web_client: OnceCell::new(), })) } diff --git a/rslib/src/backend/sync.rs b/rslib/src/backend/sync.rs index 0e3a345cc..5aa84014c 100644 --- a/rslib/src/backend/sync.rs +++ b/rslib/src/backend/sync.rs @@ -2,15 +2,16 @@ // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html use anki_proto::sync::sync_status_response::Required; +use anki_proto::sync::MediaSyncStatusResponse; use anki_proto::sync::SyncStatusResponse; use futures::future::AbortHandle; use futures::future::AbortRegistration; use futures::future::Abortable; use reqwest::Url; -use tracing::warn; use super::Backend; use crate::prelude::*; +use crate::services::BackendCollectionService; use crate::sync::collection::normal::ClientSyncState; use crate::sync::collection::normal::SyncActionRequired; use crate::sync::collection::normal::SyncOutput; @@ -67,6 +68,7 @@ impl From for anki_proto::sync::SyncCollectionResponse { anki_proto::sync::sync_collection_response::ChangesRequired::NormalSync as i32 } }, + server_media_usn: o.server_media_usn.0, } } } @@ -96,7 +98,12 @@ impl TryFrom for SyncAuth { impl crate::services::BackendSyncService for Backend { fn sync_media(&self, input: anki_proto::sync::SyncAuth) -> Result<()> { - self.sync_media_inner(input).map(Into::into) + let auth = input.try_into()?; + self.sync_media_in_background(auth, None).map(Into::into) + } + + fn media_sync_status(&self) -> Result { + self.get_media_sync_status() } fn abort_sync(&self) -> Result<()> { @@ -131,23 +138,26 @@ impl crate::services::BackendSyncService for Backend { fn sync_collection( &self, - input: anki_proto::sync::SyncAuth, + input: anki_proto::sync::SyncCollectionRequest, ) -> Result { self.sync_collection_inner(input) } - fn full_upload(&self, input: anki_proto::sync::SyncAuth) -> Result<()> { - self.full_sync_inner(input, true)?; - Ok(()) - } - - fn full_download(&self, input: anki_proto::sync::SyncAuth) -> Result<()> { - self.full_sync_inner(input, false)?; + fn full_upload_or_download( + &self, + input: anki_proto::sync::FullUploadOrDownloadRequest, + ) -> Result<()> { + self.full_sync_inner( + input.auth.or_invalid("missing auth")?, + input.server_usn.map(Usn), + input.upload, + )?; Ok(()) } } impl Backend { + /// Return a handle for regular (non-media) syncing. fn sync_abort_handle( &self, ) -> Result<( @@ -155,18 +165,8 @@ impl Backend { AbortRegistration, )> { let (abort_handle, abort_reg) = AbortHandle::new_pair(); - // Register the new abort_handle. - let old_handle = self.sync_abort.lock().unwrap().replace(abort_handle); - if old_handle.is_some() { - // NOTE: In the future we would ideally be able to handle multiple - // abort handles by just iterating over them all in - // abort_sync). But for now, just log a warning if there was - // already one present -- but don't abort it either. - warn!( - "new sync_abort handle registered, but old one was still present (old sync job might not be cancelled on abort)" - ); - } + self.sync_abort.lock().unwrap().replace(abort_handle); // Clear the abort handle after the caller is done and drops the guard. let guard = scopeguard::guard(self.clone(), |backend| { backend.sync_abort.lock().unwrap().take(); @@ -174,19 +174,63 @@ impl Backend { Ok((guard, abort_reg)) } - pub(super) fn sync_media_inner(&self, auth: anki_proto::sync::SyncAuth) -> Result<()> { - let auth = auth.try_into()?; - // mark media sync as active - let (abort_handle, abort_reg) = AbortHandle::new_pair(); - { - let mut guard = self.state.lock().unwrap(); - if guard.sync.media_sync_abort.is_some() { - // media sync is already active + pub(super) fn sync_media_in_background( + &self, + auth: SyncAuth, + server_usn: Option, + ) -> Result<()> { + let mut task = self.media_sync_task.lock().unwrap(); + if let Some(handle) = &*task { + if !handle.is_finished() { + // already running return Ok(()); } else { - guard.sync.media_sync_abort = Some(abort_handle); + // clean up + task.take(); } } + let backend = self.clone(); + *task = Some(std::thread::spawn(move || { + backend.sync_media_blocking(auth, server_usn) + })); + Ok(()) + } + + /// True if active. Will throw if terminated with error. + fn get_media_sync_status(&self) -> Result { + let mut task = self.media_sync_task.lock().unwrap(); + let active = if let Some(handle) = &*task { + if !handle.is_finished() { + true + } else { + match task.take().unwrap().join() { + Ok(inner_result) => inner_result?, + Err(panic) => invalid_input!("{:?}", panic), + }; + false + } + } else { + false + }; + let progress = self.latest_progress()?; + let progress = if let Some(anki_proto::collection::progress::Value::MediaSync(progress)) = + progress.value + { + Some(progress) + } else { + None + }; + Ok(MediaSyncStatusResponse { active, progress }) + } + + pub(super) fn sync_media_blocking( + &self, + auth: SyncAuth, + server_usn: Option, + ) -> Result<()> { + // abort handle + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + self.state.lock().unwrap().sync.media_sync_abort = Some(abort_handle); // start the sync let (mgr, progress) = { @@ -195,11 +239,11 @@ impl Backend { (col.media()?, col.new_progress_handler()) }; let rt = self.runtime_handle(); - let sync_fut = mgr.sync_media(progress, auth, self.web_client().clone()); + let sync_fut = mgr.sync_media(progress, auth, self.web_client().clone(), server_usn); let abortable_sync = Abortable::new(sync_fut, abort_reg); let result = rt.block_on(abortable_sync); - // mark inactive + // clean up the handle self.state.lock().unwrap().sync.media_sync_abort.take(); // return result @@ -222,6 +266,7 @@ impl Backend { drop(guard); // block until it aborts + while self.state.lock().unwrap().sync.media_sync_abort.is_some() { std::thread::sleep(std::time::Duration::from_millis(100)); self.progress_state.lock().unwrap().want_abort = true; @@ -297,13 +342,14 @@ impl Backend { pub(super) fn sync_collection_inner( &self, - input: anki_proto::sync::SyncAuth, + input: anki_proto::sync::SyncCollectionRequest, ) -> Result { - let auth: SyncAuth = input.try_into()?; + let auth: SyncAuth = input.auth.or_invalid("missing auth")?.try_into()?; let (_guard, abort_reg) = self.sync_abort_handle()?; let rt = self.runtime_handle(); let client = self.web_client().clone(); + let auth2 = auth.clone(); let ret = self.with_col(|col| { let sync_fut = col.normal_sync(auth.clone(), client.clone()); @@ -325,6 +371,13 @@ impl Backend { }); let output: SyncOutput = ret?; + + if input.sync_media + && !matches!(output.required, SyncActionRequired::FullSyncRequired { .. }) + { + self.sync_media_in_background(auth2, Some(output.server_media_usn))?; + } + self.state .lock() .unwrap() @@ -337,9 +390,11 @@ impl Backend { pub(super) fn full_sync_inner( &self, input: anki_proto::sync::SyncAuth, + server_usn: Option, upload: bool, ) -> Result<()> { - let auth = input.try_into()?; + let auth: SyncAuth = input.try_into()?; + let auth2 = auth.clone(); self.abort_media_sync_and_wait(); let rt = self.runtime_handle(); @@ -368,7 +423,7 @@ impl Backend { // ensure re-opened regardless of outcome col.replace(builder.build()?); - match result { + let result = match result { Ok(sync_result) => { if sync_result.is_ok() { self.state @@ -381,7 +436,13 @@ impl Backend { sync_result } Err(_) => Err(AnkiError::Interrupted), + }; + + if result.is_ok() && server_usn.is_some() { + self.sync_media_in_background(auth2, server_usn)?; } + + result } } diff --git a/rslib/src/media/mod.rs b/rslib/src/media/mod.rs index f216706ca..259dd52f8 100644 --- a/rslib/src/media/mod.rs +++ b/rslib/src/media/mod.rs @@ -147,10 +147,11 @@ impl MediaManager { progress: ThrottlingProgressHandler, auth: SyncAuth, client: Client, + server_usn: Option, ) -> Result<()> { let client = HttpSyncClient::new(auth, client); let mut syncer = MediaSyncer::new(self, progress, client)?; - syncer.sync().await + syncer.sync(server_usn).await } pub fn all_checksums_after_checking( diff --git a/rslib/src/progress.rs b/rslib/src/progress.rs index 644a4d29f..243afdc2e 100644 --- a/rslib/src/progress.rs +++ b/rslib/src/progress.rs @@ -240,11 +240,8 @@ pub(crate) fn progress_to_proto( } } -fn media_sync_progress( - p: MediaSyncProgress, - tr: &I18n, -) -> anki_proto::collection::progress::MediaSync { - anki_proto::collection::progress::MediaSync { +fn media_sync_progress(p: MediaSyncProgress, tr: &I18n) -> anki_proto::sync::MediaSyncProgress { + anki_proto::sync::MediaSyncProgress { checked: tr.sync_media_checked_count(p.checked).into(), added: tr .sync_media_added_count(p.uploaded_files, p.downloaded_files) diff --git a/rslib/src/sync/collection/meta.rs b/rslib/src/sync/collection/meta.rs index 4bfd61c22..e6a521f22 100644 --- a/rslib/src/sync/collection/meta.rs +++ b/rslib/src/sync/collection/meta.rs @@ -42,6 +42,9 @@ pub struct SyncMeta { pub host_number: u32, #[serde(default)] pub empty: bool, + /// This field is not set by col.sync_meta(), and must be filled in + /// separately. + pub media_usn: Usn, #[serde(skip)] pub v2_scheduler_or_later: bool, #[serde(skip)] @@ -77,6 +80,7 @@ impl SyncMeta { server_message: remote.server_message, host_number: remote.host_number, new_endpoint, + server_media_usn: remote.media_usn, } } } @@ -132,6 +136,8 @@ impl Collection { empty: !self.storage.have_at_least_one_card()?, v2_scheduler_or_later: self.scheduler_version() == SchedulerVersion::V2, v2_timezone: self.get_creation_utc_offset().is_some(), + // must be filled in by calling code + media_usn: Usn(0), }) } } diff --git a/rslib/src/sync/collection/normal.rs b/rslib/src/sync/collection/normal.rs index 981f9a76d..ff6ef1ab4 100644 --- a/rslib/src/sync/collection/normal.rs +++ b/rslib/src/sync/collection/normal.rs @@ -53,6 +53,7 @@ pub struct ClientSyncState { pub(in crate::sync) server_usn: Usn, // -1 in client case; used to locate pending entries pub(in crate::sync) pending_usn: Usn, + pub(in crate::sync) server_media_usn: Usn, } impl NormalSyncer<'_> { @@ -139,6 +140,8 @@ pub struct SyncOutput { pub server_message: String, pub host_number: u32, pub new_endpoint: Option, + #[allow(unused)] + pub(crate) server_media_usn: Usn, } impl From for SyncOutput { @@ -148,6 +151,7 @@ impl From for SyncOutput { server_message: s.server_message, host_number: s.host_number, new_endpoint: s.new_endpoint, + server_media_usn: s.server_media_usn, } } } diff --git a/rslib/src/sync/http_server/handlers.rs b/rslib/src/sync/http_server/handlers.rs index 3fe7dcdfd..441501b62 100644 --- a/rslib/src/sync/http_server/handlers.rs +++ b/rslib/src/sync/http_server/handlers.rs @@ -61,7 +61,9 @@ impl SyncProtocol for Arc { async fn meta(&self, req: SyncRequest) -> HttpResult> { self.with_authenticated_user(req, |user, req| { let req = req.json()?; - user.with_col(|col| server_meta(req, col)) + let mut meta = user.with_col(|col| server_meta(req, col))?; + meta.media_usn = user.media.last_usn()?; + Ok(meta) }) .await .and_then(SyncResponse::try_from_obj) diff --git a/rslib/src/sync/media/syncer.rs b/rslib/src/sync/media/syncer.rs index b37c9e7b5..2732565de 100644 --- a/rslib/src/sync/media/syncer.rs +++ b/rslib/src/sync/media/syncer.rs @@ -50,20 +50,24 @@ impl MediaSyncer { }) } - pub async fn sync(&mut self) -> Result<()> { - self.sync_inner().await.map_err(|e| { + pub async fn sync(&mut self, server_usn: Option) -> Result<()> { + self.sync_inner(server_usn).await.map_err(|e| { debug!("sync error: {:?}", e); e }) } #[allow(clippy::useless_let_if_seq)] - async fn sync_inner(&mut self) -> Result<()> { + async fn sync_inner(&mut self, server_usn: Option) -> Result<()> { self.register_changes()?; let meta = self.mgr.db.get_meta()?; let client_usn = meta.last_sync_usn; - let server_usn = self.begin_sync().await?; + let server_usn = if let Some(usn) = server_usn { + usn + } else { + self.begin_sync().await? + }; let mut actions_performed = false; diff --git a/rslib/src/sync/media/tests.rs b/rslib/src/sync/media/tests.rs index c5a9f8acd..245b75e48 100644 --- a/rslib/src/sync/media/tests.rs +++ b/rslib/src/sync/media/tests.rs @@ -151,13 +151,13 @@ impl SyncTestContext { async fn sync_media1(&self) -> Result<()> { let mut syncer = MediaSyncer::new(self.media1(), ignore_progress(), self.client.clone()).unwrap(); - syncer.sync().await + syncer.sync(None).await } async fn sync_media2(&self) -> Result<()> { let mut syncer = MediaSyncer::new(self.media2(), ignore_progress(), self.client.clone()).unwrap(); - syncer.sync().await + syncer.sync(None).await } /// As local change detection depends on a millisecond timestamp,