diff --git a/proto/backend.proto b/proto/backend.proto index d45dd55e1..331fa5d83 100644 --- a/proto/backend.proto +++ b/proto/backend.proto @@ -486,6 +486,7 @@ message Progress { MediaSyncProgress media_sync = 2; string media_check = 3; FullSyncProgress full_sync = 4; + NormalSyncProgress normal_sync = 5; } } @@ -505,6 +506,12 @@ message MediaSyncUploadProgress { uint32 deletions = 2; } +message NormalSyncProgress { + string stage = 1; + string added = 2; + string removed = 3; +} + // Messages /////////////////////////////////////////////////////////// diff --git a/pylib/anki/rsbackend.py b/pylib/anki/rsbackend.py index eed465e00..3b62e4330 100644 --- a/pylib/anki/rsbackend.py +++ b/pylib/anki/rsbackend.py @@ -150,6 +150,7 @@ def proto_exception_to_native(err: pb.BackendError) -> Exception: MediaSyncProgress = pb.MediaSyncProgress FullSyncProgress = pb.FullSyncProgress +NormalSyncProgress = pb.NormalSyncProgress FormatTimeSpanContext = pb.FormatTimespanIn.Context @@ -159,6 +160,7 @@ class ProgressKind(enum.Enum): MediaSync = 1 MediaCheck = 2 FullSync = 3 + NormalSync = 4 @dataclass @@ -175,6 +177,8 @@ class Progress: return Progress(kind=ProgressKind.MediaCheck, val=proto.media_check) elif kind == "full_sync": return Progress(kind=ProgressKind.FullSync, val=proto.full_sync) + elif kind == "normal_sync": + return Progress(kind=ProgressKind.NormalSync, val=proto.normal_sync) else: return Progress(kind=ProgressKind.NoProgress, val="") diff --git a/qt/aqt/progress.py b/qt/aqt/progress.py index 0bf80ee91..e96b72cf0 100644 --- a/qt/aqt/progress.py +++ b/qt/aqt/progress.py @@ -185,6 +185,11 @@ class ProgressManager: else: return False + def set_title(self, title: str) -> None: + win = self._win + if win: + win.setWindowTitle(title) + class ProgressDialog(QDialog): def __init__(self, parent): diff --git a/qt/aqt/sync.py b/qt/aqt/sync.py index 48fcb63f5..2e7d58b72 100644 --- a/qt/aqt/sync.py +++ b/qt/aqt/sync.py @@ -11,6 +11,7 @@ from anki.rsbackend import ( TR, FullSyncProgress, Interrupted, + NormalSyncProgress, ProgressKind, SyncError, SyncErrorKind, @@ -29,10 +30,6 @@ from aqt.qt import ( ) from aqt.utils import askUser, askUserDialog, showText, showWarning, tr -# fixme: catch auth error in other routines, clear sync auth -# fixme: sync progress -# fixme: curDeck marking collection modified - class FullSyncChoice(enum.Enum): CANCEL = 0 @@ -63,12 +60,35 @@ def handle_sync_error(mw: aqt.main.AnkiQt, err: Exception): showWarning(str(err)) +def on_normal_sync_timer(mw: aqt.main.AnkiQt) -> None: + progress = mw.col.latest_progress() + if progress.kind != ProgressKind.NormalSync: + return + + assert isinstance(progress.val, NormalSyncProgress) + mw.progress.update( + label=f"{progress.val.added}\n{progress.val.removed}", process=False, + ) + mw.progress.set_title(progress.val.stage) + + if mw.progress.want_cancel(): + mw.col.backend.abort_sync() + + def sync_collection(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None: auth = mw.pm.sync_auth() assert auth + def on_timer(): + on_normal_sync_timer(mw) + + timer = QTimer(mw) + qconnect(timer.timeout, on_timer) + timer.start(150) + def on_future_done(fut): mw.col.db.begin() + timer.stop() try: out: SyncOutput = fut.result() except Exception as err: @@ -129,8 +149,15 @@ def on_full_sync_timer(mw: aqt.main.AnkiQt) -> None: return assert isinstance(progress.val, FullSyncProgress) + if progress.val.transferred == progress.val.total: + label = tr(TR.SYNC_CHECKING) + else: + label = None mw.progress.update( - value=progress.val.transferred, max=progress.val.total, process=False + value=progress.val.transferred, + max=progress.val.total, + process=False, + label=label, ) if mw.progress.want_cancel(): diff --git a/rslib/ftl/sync.ftl b/rslib/ftl/sync.ftl index aa6cc649b..3fc0a4bfe 100644 --- a/rslib/ftl/sync.ftl +++ b/rslib/ftl/sync.ftl @@ -1,6 +1,5 @@ ### Messages shown when synchronizing with AnkiWeb. - ## Media synchronization sync-media-added-count = Added: { $up }↑ { $down }↓ @@ -48,9 +47,11 @@ sync-download-from-ankiweb = Download from AnkiWeb sync-upload-to-ankiweb = Upload to AnkiWeb sync-cancel-button = Cancel -## Progress +## Normal sync progress sync-downloading-from-ankiweb = Downloading from AnkiWeb... sync-uploading-to-ankiweb = Uploading to AnkiWeb... sync-syncing = Syncing... sync-checking = Checking... +sync-connecting = Connecting... +sync-added-updated-count = Added/modified: { $up }↑ { $down }↓ diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index aa3e94e05..da0b07d1a 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -32,7 +32,10 @@ use crate::{ sched::cutoff::local_minutes_west_for_stamp, sched::timespan::{answer_button_time, learning_congrats, studied_today, time_span}, search::SortMode, - sync::{sync_login, FullSyncProgress, SyncActionRequired, SyncAuth, SyncOutput}, + sync::{ + sync_login, FullSyncProgress, NormalSyncProgress, SyncActionRequired, SyncAuth, SyncOutput, + SyncStage, + }, template::RenderedNode, text::{extract_av_tags, strip_av_tags, AVTag}, timestamp::TimestampSecs, @@ -62,9 +65,9 @@ struct ThrottlingProgressHandler { impl ThrottlingProgressHandler { /// Returns true if should continue. - fn update(&mut self, progress: impl Into) -> bool { + fn update(&mut self, progress: impl Into, throttle: bool) -> bool { let now = coarsetime::Instant::now(); - if now.duration_since(self.last_update).as_f64() < 0.1 { + if throttle && now.duration_since(self.last_update).as_f64() < 0.1 { return true; } self.last_update = now; @@ -94,6 +97,7 @@ enum Progress { MediaSync(MediaSyncProgress), MediaCheck(u32), FullSync(FullSyncProgress), + NormalSync(NormalSyncProgress), } /// Convert an Anki error to a protobuf error. @@ -836,7 +840,8 @@ impl BackendService for Backend { fn empty_trash(&mut self, _input: Empty) -> BackendResult { let mut handler = self.new_progress_handler(); - let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32)); + let progress_fn = + move |progress| handler.update(Progress::MediaCheck(progress as u32), true); self.with_col(|col| { let mgr = MediaManager::new(&col.media_folder, &col.media_db)?; @@ -851,7 +856,8 @@ impl BackendService for Backend { fn restore_trash(&mut self, _input: Empty) -> BackendResult { let mut handler = self.new_progress_handler(); - let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32)); + let progress_fn = + move |progress| handler.update(Progress::MediaCheck(progress as u32), true); self.with_col(|col| { let mgr = MediaManager::new(&col.media_folder, &col.media_db)?; @@ -875,7 +881,8 @@ impl BackendService for Backend { fn check_media(&mut self, _input: pb::Empty) -> Result { let mut handler = self.new_progress_handler(); - let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32)); + let progress_fn = + move |progress| handler.update(Progress::MediaCheck(progress as u32), true); self.with_col(|col| { let mgr = MediaManager::new(&col.media_folder, &col.media_db)?; col.transact(None, |ctx| { @@ -1198,7 +1205,7 @@ impl Backend { self.sync_abort = Some(abort_handle); let mut handler = self.new_progress_handler(); - let progress_fn = move |progress| handler.update(progress); + let progress_fn = move |progress| handler.update(progress, true); let mgr = MediaManager::new(&folder, &db)?; let mut rt = Runtime::new().unwrap(); @@ -1249,7 +1256,12 @@ impl Backend { let abortable_sync = Abortable::new(sync_fut, abort_reg); rt.block_on(abortable_sync) } else { - let sync_fut = col.normal_sync(input.into()); + let mut handler = self.new_progress_handler(); + let progress_fn = move |progress: NormalSyncProgress, throttle: bool| { + handler.update(progress, throttle); + }; + + let sync_fut = col.normal_sync(input.into(), progress_fn); let abortable_sync = Abortable::new(sync_fut, abort_reg); rt.block_on(abortable_sync) }; @@ -1283,8 +1295,8 @@ impl Backend { let logger = col_inner.log.clone(); let mut handler = self.new_progress_handler(); - let progress_fn = move |progress: FullSyncProgress| { - handler.update(progress); + let progress_fn = move |progress: FullSyncProgress, throttle: bool| { + handler.update(progress, throttle); }; let mut rt = Runtime::new().unwrap(); @@ -1381,6 +1393,29 @@ fn progress_to_proto(progress: Option, i18n: &I18n) -> pb::Progress { transferred: p.transferred_bytes as u32, total: p.total_bytes as u32, }), + Progress::NormalSync(p) => { + let stage = match p.stage { + SyncStage::Connecting => i18n.tr(TR::SyncSyncing), + SyncStage::Syncing => i18n.tr(TR::SyncSyncing), + SyncStage::Finalizing => i18n.tr(TR::SyncChecking), + } + .to_string(); + let added = i18n.trn( + TR::SyncAddedUpdatedCount, + tr_args![ + "up"=>p.local_update, "down"=>p.remote_update], + ); + let removed = i18n.trn( + TR::SyncMediaRemovedCount, + tr_args![ + "up"=>p.local_remove, "down"=>p.remote_remove], + ); + pb::progress::Value::NormalSync(pb::NormalSyncProgress { + stage, + added, + removed, + }) + } } } else { pb::progress::Value::None(pb::Empty {}) @@ -1534,3 +1569,9 @@ impl From for Progress { Progress::MediaSync(p) } } + +impl From for Progress { + fn from(p: NormalSyncProgress) -> Self { + Progress::NormalSync(p) + } +} diff --git a/rslib/src/sync/http_client.rs b/rslib/src/sync/http_client.rs index b91933c97..a730b9017 100644 --- a/rslib/src/sync/http_client.rs +++ b/rslib/src/sync/http_client.rs @@ -238,7 +238,7 @@ impl HTTPSyncClient { mut progress_fn: P, ) -> Result where - P: FnMut(FullSyncProgress), + P: FnMut(FullSyncProgress, bool), { let mut temp_file = NamedTempFile::new_in(folder)?; let (size, mut stream) = self.download_inner().await?; @@ -250,8 +250,9 @@ impl HTTPSyncClient { let chunk = chunk?; temp_file.write_all(&chunk)?; progress.transferred_bytes += chunk.len(); - progress_fn(progress); + progress_fn(progress, true); } + progress_fn(progress, false); Ok(temp_file) } @@ -265,7 +266,7 @@ impl HTTPSyncClient { pub(crate) async fn upload

(&mut self, col_path: &Path, progress_fn: P) -> Result<()> where - P: FnMut(FullSyncProgress) + Send + Sync + 'static, + P: FnMut(FullSyncProgress, bool) + Send + Sync + 'static, { let file = tokio::fs::File::open(col_path).await?; let total_bytes = file.metadata().await?.len() as usize; @@ -304,7 +305,7 @@ struct ProgressWrapper { impl Stream for ProgressWrapper where S: AsyncRead, - P: FnMut(FullSyncProgress), + P: FnMut(FullSyncProgress, bool), { type Item = std::result::Result; @@ -312,11 +313,14 @@ where let mut buf = vec![0; 16 * 1024]; let this = self.project(); match ready!(this.reader.poll_read(cx, &mut buf)) { - Ok(0) => Poll::Ready(None), + Ok(0) => { + (this.progress_fn)(*this.progress, false); + Poll::Ready(None) + } Ok(size) => { buf.resize(size, 0); this.progress.transferred_bytes += size; - (this.progress_fn)(*this.progress); + (this.progress_fn)(*this.progress, true); Poll::Ready(Some(Ok(Bytes::from(buf)))) } Err(e) => Poll::Ready(Some(Err(e))), @@ -410,13 +414,13 @@ mod test { let dir = tempdir()?; let out_path = syncer - .download(&dir.path(), |progress| { + .download(&dir.path(), |progress, _throttle| { println!("progress: {:?}", progress); }) .await?; syncer - .upload(&out_path.path(), |progress| { + .upload(&out_path.path(), |progress, _throttle| { println!("progress {:?}", progress); }) .await?; diff --git a/rslib/src/sync/mod.rs b/rslib/src/sync/mod.rs index 4c25e2f7e..3259ab2d5 100644 --- a/rslib/src/sync/mod.rs +++ b/rslib/src/sync/mod.rs @@ -27,8 +27,27 @@ use std::io::prelude::*; use std::{collections::HashMap, path::Path, time::Duration}; use tempfile::NamedTempFile; -#[derive(Default, Debug)] -pub struct SyncProgress {} +#[derive(Default, Debug, Clone, Copy)] +pub struct NormalSyncProgress { + pub stage: SyncStage, + pub local_update: usize, + pub local_remove: usize, + pub remote_update: usize, + pub remote_remove: usize, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum SyncStage { + Connecting, + Syncing, + Finalizing, +} + +impl Default for SyncStage { + fn default() -> Self { + SyncStage::Connecting + } +} #[derive(Serialize, Deserialize, Debug)] pub struct SyncMeta { @@ -223,9 +242,11 @@ pub struct SyncAuth { pub host_number: u32, } -struct NormalSyncer<'a> { +struct NormalSyncer<'a, F> { col: &'a mut Collection, remote: HTTPSyncClient, + progress: NormalSyncProgress, + progress_fn: F, } impl Usn { @@ -239,17 +260,30 @@ impl Usn { } } -impl NormalSyncer<'_> { +impl NormalSyncer<'_, F> +where + F: FnMut(NormalSyncProgress, bool), +{ /// Create a new syncing instance. If host_number is unavailable, use 0. - pub fn new(col: &mut Collection, auth: SyncAuth) -> NormalSyncer<'_> { + pub fn new(col: &mut Collection, auth: SyncAuth, progress_fn: F) -> NormalSyncer<'_, F> + where + F: FnMut(NormalSyncProgress, bool), + { NormalSyncer { col, remote: HTTPSyncClient::new(Some(auth.hkey), auth.host_number), + progress: NormalSyncProgress::default(), + progress_fn, } } + fn fire_progress_cb(&mut self, throttle: bool) { + (self.progress_fn)(self.progress, throttle) + } + pub async fn sync(&mut self) -> Result { debug!(self.col.log, "fetching meta..."); + self.fire_progress_cb(false); let state: SyncState = self.get_sync_state().await?; debug!(self.col.log, "fetched"; "state"=>?&state); match state.required { @@ -330,6 +364,9 @@ impl NormalSyncer<'_> { /// Sync. Caller must have created a transaction, and should call /// abort on failure. async fn normal_sync_inner(&mut self, mut state: SyncState) -> Result { + self.progress.stage = SyncStage::Syncing; + self.fire_progress_cb(false); + debug!(self.col.log, "start"); self.start_and_process_deletions(&state).await?; debug!(self.col.log, "unchunked changes"); @@ -338,6 +375,10 @@ impl NormalSyncer<'_> { self.process_chunks_from_server().await?; debug!(self.col.log, "begin stream to server"); self.send_chunks_to_server(&state).await?; + + self.progress.stage = SyncStage::Finalizing; + self.fire_progress_cb(false); + debug!(self.col.log, "sanity check"); self.sanity_check().await?; debug!(self.col.log, "finalize"); @@ -348,8 +389,8 @@ impl NormalSyncer<'_> { // The following operations assume a transaction has been set up. - async fn start_and_process_deletions(&self, state: &SyncState) -> Result<()> { - let removed_on_remote: Graves = self + async fn start_and_process_deletions(&mut self, state: &SyncState) -> Result<()> { + let remote: Graves = self .remote .start( state.usn_at_last_sync, @@ -359,26 +400,30 @@ impl NormalSyncer<'_> { .await?; debug!(self.col.log, "removed on remote"; - "cards"=>removed_on_remote.cards.len(), - "notes"=>removed_on_remote.notes.len(), - "decks"=>removed_on_remote.decks.len()); + "cards"=>remote.cards.len(), + "notes"=>remote.notes.len(), + "decks"=>remote.decks.len()); - let mut locally_removed = self.col.storage.pending_graves(state.pending_usn)?; + let mut local = self.col.storage.pending_graves(state.pending_usn)?; if let Some(new_usn) = state.new_usn { self.col.storage.update_pending_grave_usns(new_usn)?; } debug!(self.col.log, "locally removed "; - "cards"=>locally_removed.cards.len(), - "notes"=>locally_removed.notes.len(), - "decks"=>locally_removed.decks.len()); + "cards"=>local.cards.len(), + "notes"=>local.notes.len(), + "decks"=>local.decks.len()); - while let Some(chunk) = locally_removed.take_chunk() { + while let Some(chunk) = local.take_chunk() { debug!(self.col.log, "sending graves chunk"); + self.progress.local_remove += chunk.cards.len() + chunk.notes.len() + chunk.decks.len(); self.remote.apply_graves(chunk).await?; + self.fire_progress_cb(true); } - self.col.apply_graves(removed_on_remote, state.latest_usn)?; + self.progress.remote_remove = remote.cards.len() + remote.notes.len() + remote.decks.len(); + self.col.apply_graves(remote, state.latest_usn)?; + self.fire_progress_cb(true); debug!(self.col.log, "applied server graves"); Ok(()) @@ -390,27 +435,41 @@ impl NormalSyncer<'_> { // usefulness. async fn process_unchunked_changes(&mut self, state: &SyncState) -> Result<()> { debug!(self.col.log, "gathering local changes"); - let local_changes = self.col.local_unchunked_changes( + let local = self.col.local_unchunked_changes( state.pending_usn, state.new_usn, state.local_is_newer, )?; + debug!(self.col.log, "sending"; - "notetypes"=>local_changes.notetypes.len(), - "decks"=>local_changes.decks_and_config.decks.len(), - "deck config"=>local_changes.decks_and_config.config.len(), - "tags"=>local_changes.tags.len(), + "notetypes"=>local.notetypes.len(), + "decks"=>local.decks_and_config.decks.len(), + "deck config"=>local.decks_and_config.config.len(), + "tags"=>local.tags.len(), ); - let remote_changes = self.remote.apply_changes(local_changes).await?; + self.progress.local_update += local.notetypes.len() + + local.decks_and_config.decks.len() + + local.decks_and_config.config.len() + + local.tags.len(); + let remote = self.remote.apply_changes(local).await?; + self.fire_progress_cb(true); + debug!(self.col.log, "received"; - "notetypes"=>remote_changes.notetypes.len(), - "decks"=>remote_changes.decks_and_config.decks.len(), - "deck config"=>remote_changes.decks_and_config.config.len(), - "tags"=>remote_changes.tags.len(), + "notetypes"=>remote.notetypes.len(), + "decks"=>remote.decks_and_config.decks.len(), + "deck config"=>remote.decks_and_config.config.len(), + "tags"=>remote.tags.len(), ); - self.col.apply_changes(remote_changes, state.latest_usn) + self.progress.remote_update += remote.notetypes.len() + + remote.decks_and_config.decks.len() + + remote.decks_and_config.config.len() + + remote.tags.len(); + + self.col.apply_changes(remote, state.latest_usn)?; + self.fire_progress_cb(true); + Ok(()) } async fn process_chunks_from_server(&mut self) -> Result<()> { @@ -424,16 +483,21 @@ impl NormalSyncer<'_> { "revlog"=>chunk.revlog.len(), ); + self.progress.remote_update += + chunk.cards.len() + chunk.notes.len() + chunk.revlog.len(); + let done = chunk.done; self.col.apply_chunk(chunk)?; + self.fire_progress_cb(true); + if done { return Ok(()); } } } - async fn send_chunks_to_server(&self, state: &SyncState) -> Result<()> { + async fn send_chunks_to_server(&mut self, state: &SyncState) -> Result<()> { let mut ids = self.col.get_chunkable_ids(state.pending_usn)?; loop { @@ -447,8 +511,13 @@ impl NormalSyncer<'_> { "revlog"=>chunk.revlog.len(), ); + self.progress.local_update += + chunk.cards.len() + chunk.notes.len() + chunk.revlog.len(); + self.remote.apply_chunk(chunk).await?; + self.fire_progress_cb(true); + if done { return Ok(()); } @@ -519,21 +588,24 @@ pub async fn sync_login(username: &str, password: &str) -> Result { impl Collection { pub async fn get_sync_status(&mut self, auth: SyncAuth) -> Result { - NormalSyncer::new(self, auth) + NormalSyncer::new(self, auth, |_p, _t| ()) .get_sync_state() .await .map(Into::into) } - pub async fn normal_sync(&mut self, auth: SyncAuth) -> Result { + pub async fn normal_sync(&mut self, auth: SyncAuth, progress_fn: F) -> Result + where + F: FnMut(NormalSyncProgress, bool), + { // fixme: server abort on failure - NormalSyncer::new(self, auth).sync().await + NormalSyncer::new(self, auth, progress_fn).sync().await } /// Upload collection to AnkiWeb. Caller must re-open afterwards. pub async fn full_upload(mut self, auth: SyncAuth, progress_fn: F) -> Result<()> where - F: FnMut(FullSyncProgress) + Send + Sync + 'static, + F: FnMut(FullSyncProgress, bool) + Send + Sync + 'static, { self.before_upload()?; let col_path = self.col_path.clone(); @@ -546,7 +618,7 @@ impl Collection { /// Download collection from AnkiWeb. Caller must re-open afterwards. pub async fn full_download(self, auth: SyncAuth, progress_fn: F) -> Result<()> where - F: FnMut(FullSyncProgress), + F: FnMut(FullSyncProgress, bool), { let col_path = self.col_path.clone(); let folder = col_path.parent().unwrap();