report normal sync progress

Also:
- provide a way for the progress handler to skip the throttling so that
we can ensure progress is updated at the end of a stage
- show 'checking' at the end of full sync
This commit is contained in:
Damien Elmes 2020-05-31 14:43:27 +10:00
parent 6bf9f5bc6c
commit c6f0710ce7
8 changed files with 219 additions and 58 deletions

View file

@ -486,6 +486,7 @@ message Progress {
MediaSyncProgress media_sync = 2;
string media_check = 3;
FullSyncProgress full_sync = 4;
NormalSyncProgress normal_sync = 5;
}
}
@ -505,6 +506,12 @@ message MediaSyncUploadProgress {
uint32 deletions = 2;
}
message NormalSyncProgress {
string stage = 1;
string added = 2;
string removed = 3;
}
// Messages
///////////////////////////////////////////////////////////

View file

@ -150,6 +150,7 @@ def proto_exception_to_native(err: pb.BackendError) -> Exception:
MediaSyncProgress = pb.MediaSyncProgress
FullSyncProgress = pb.FullSyncProgress
NormalSyncProgress = pb.NormalSyncProgress
FormatTimeSpanContext = pb.FormatTimespanIn.Context
@ -159,6 +160,7 @@ class ProgressKind(enum.Enum):
MediaSync = 1
MediaCheck = 2
FullSync = 3
NormalSync = 4
@dataclass
@ -175,6 +177,8 @@ class Progress:
return Progress(kind=ProgressKind.MediaCheck, val=proto.media_check)
elif kind == "full_sync":
return Progress(kind=ProgressKind.FullSync, val=proto.full_sync)
elif kind == "normal_sync":
return Progress(kind=ProgressKind.NormalSync, val=proto.normal_sync)
else:
return Progress(kind=ProgressKind.NoProgress, val="")

View file

@ -185,6 +185,11 @@ class ProgressManager:
else:
return False
def set_title(self, title: str) -> None:
win = self._win
if win:
win.setWindowTitle(title)
class ProgressDialog(QDialog):
def __init__(self, parent):

View file

@ -11,6 +11,7 @@ from anki.rsbackend import (
TR,
FullSyncProgress,
Interrupted,
NormalSyncProgress,
ProgressKind,
SyncError,
SyncErrorKind,
@ -29,10 +30,6 @@ from aqt.qt import (
)
from aqt.utils import askUser, askUserDialog, showText, showWarning, tr
# fixme: catch auth error in other routines, clear sync auth
# fixme: sync progress
# fixme: curDeck marking collection modified
class FullSyncChoice(enum.Enum):
CANCEL = 0
@ -63,12 +60,35 @@ def handle_sync_error(mw: aqt.main.AnkiQt, err: Exception):
showWarning(str(err))
def on_normal_sync_timer(mw: aqt.main.AnkiQt) -> None:
progress = mw.col.latest_progress()
if progress.kind != ProgressKind.NormalSync:
return
assert isinstance(progress.val, NormalSyncProgress)
mw.progress.update(
label=f"{progress.val.added}\n{progress.val.removed}", process=False,
)
mw.progress.set_title(progress.val.stage)
if mw.progress.want_cancel():
mw.col.backend.abort_sync()
def sync_collection(mw: aqt.main.AnkiQt, on_done: Callable[[], None]) -> None:
auth = mw.pm.sync_auth()
assert auth
def on_timer():
on_normal_sync_timer(mw)
timer = QTimer(mw)
qconnect(timer.timeout, on_timer)
timer.start(150)
def on_future_done(fut):
mw.col.db.begin()
timer.stop()
try:
out: SyncOutput = fut.result()
except Exception as err:
@ -129,8 +149,15 @@ def on_full_sync_timer(mw: aqt.main.AnkiQt) -> None:
return
assert isinstance(progress.val, FullSyncProgress)
if progress.val.transferred == progress.val.total:
label = tr(TR.SYNC_CHECKING)
else:
label = None
mw.progress.update(
value=progress.val.transferred, max=progress.val.total, process=False
value=progress.val.transferred,
max=progress.val.total,
process=False,
label=label,
)
if mw.progress.want_cancel():

View file

@ -1,6 +1,5 @@
### Messages shown when synchronizing with AnkiWeb.
## Media synchronization
sync-media-added-count = Added: { $up }↑ { $down }↓
@ -48,9 +47,11 @@ sync-download-from-ankiweb = Download from AnkiWeb
sync-upload-to-ankiweb = Upload to AnkiWeb
sync-cancel-button = Cancel
## Progress
## Normal sync progress
sync-downloading-from-ankiweb = Downloading from AnkiWeb...
sync-uploading-to-ankiweb = Uploading to AnkiWeb...
sync-syncing = Syncing...
sync-checking = Checking...
sync-connecting = Connecting...
sync-added-updated-count = Added/modified: { $up }↑ { $down }↓

View file

@ -32,7 +32,10 @@ use crate::{
sched::cutoff::local_minutes_west_for_stamp,
sched::timespan::{answer_button_time, learning_congrats, studied_today, time_span},
search::SortMode,
sync::{sync_login, FullSyncProgress, SyncActionRequired, SyncAuth, SyncOutput},
sync::{
sync_login, FullSyncProgress, NormalSyncProgress, SyncActionRequired, SyncAuth, SyncOutput,
SyncStage,
},
template::RenderedNode,
text::{extract_av_tags, strip_av_tags, AVTag},
timestamp::TimestampSecs,
@ -62,9 +65,9 @@ struct ThrottlingProgressHandler {
impl ThrottlingProgressHandler {
/// Returns true if should continue.
fn update(&mut self, progress: impl Into<Progress>) -> bool {
fn update(&mut self, progress: impl Into<Progress>, throttle: bool) -> bool {
let now = coarsetime::Instant::now();
if now.duration_since(self.last_update).as_f64() < 0.1 {
if throttle && now.duration_since(self.last_update).as_f64() < 0.1 {
return true;
}
self.last_update = now;
@ -94,6 +97,7 @@ enum Progress {
MediaSync(MediaSyncProgress),
MediaCheck(u32),
FullSync(FullSyncProgress),
NormalSync(NormalSyncProgress),
}
/// Convert an Anki error to a protobuf error.
@ -836,7 +840,8 @@ impl BackendService for Backend {
fn empty_trash(&mut self, _input: Empty) -> BackendResult<Empty> {
let mut handler = self.new_progress_handler();
let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32));
let progress_fn =
move |progress| handler.update(Progress::MediaCheck(progress as u32), true);
self.with_col(|col| {
let mgr = MediaManager::new(&col.media_folder, &col.media_db)?;
@ -851,7 +856,8 @@ impl BackendService for Backend {
fn restore_trash(&mut self, _input: Empty) -> BackendResult<Empty> {
let mut handler = self.new_progress_handler();
let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32));
let progress_fn =
move |progress| handler.update(Progress::MediaCheck(progress as u32), true);
self.with_col(|col| {
let mgr = MediaManager::new(&col.media_folder, &col.media_db)?;
@ -875,7 +881,8 @@ impl BackendService for Backend {
fn check_media(&mut self, _input: pb::Empty) -> Result<pb::CheckMediaOut> {
let mut handler = self.new_progress_handler();
let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32));
let progress_fn =
move |progress| handler.update(Progress::MediaCheck(progress as u32), true);
self.with_col(|col| {
let mgr = MediaManager::new(&col.media_folder, &col.media_db)?;
col.transact(None, |ctx| {
@ -1198,7 +1205,7 @@ impl Backend {
self.sync_abort = Some(abort_handle);
let mut handler = self.new_progress_handler();
let progress_fn = move |progress| handler.update(progress);
let progress_fn = move |progress| handler.update(progress, true);
let mgr = MediaManager::new(&folder, &db)?;
let mut rt = Runtime::new().unwrap();
@ -1249,7 +1256,12 @@ impl Backend {
let abortable_sync = Abortable::new(sync_fut, abort_reg);
rt.block_on(abortable_sync)
} else {
let sync_fut = col.normal_sync(input.into());
let mut handler = self.new_progress_handler();
let progress_fn = move |progress: NormalSyncProgress, throttle: bool| {
handler.update(progress, throttle);
};
let sync_fut = col.normal_sync(input.into(), progress_fn);
let abortable_sync = Abortable::new(sync_fut, abort_reg);
rt.block_on(abortable_sync)
};
@ -1283,8 +1295,8 @@ impl Backend {
let logger = col_inner.log.clone();
let mut handler = self.new_progress_handler();
let progress_fn = move |progress: FullSyncProgress| {
handler.update(progress);
let progress_fn = move |progress: FullSyncProgress, throttle: bool| {
handler.update(progress, throttle);
};
let mut rt = Runtime::new().unwrap();
@ -1381,6 +1393,29 @@ fn progress_to_proto(progress: Option<Progress>, i18n: &I18n) -> pb::Progress {
transferred: p.transferred_bytes as u32,
total: p.total_bytes as u32,
}),
Progress::NormalSync(p) => {
let stage = match p.stage {
SyncStage::Connecting => i18n.tr(TR::SyncSyncing),
SyncStage::Syncing => i18n.tr(TR::SyncSyncing),
SyncStage::Finalizing => i18n.tr(TR::SyncChecking),
}
.to_string();
let added = i18n.trn(
TR::SyncAddedUpdatedCount,
tr_args![
"up"=>p.local_update, "down"=>p.remote_update],
);
let removed = i18n.trn(
TR::SyncMediaRemovedCount,
tr_args![
"up"=>p.local_remove, "down"=>p.remote_remove],
);
pb::progress::Value::NormalSync(pb::NormalSyncProgress {
stage,
added,
removed,
})
}
}
} else {
pb::progress::Value::None(pb::Empty {})
@ -1534,3 +1569,9 @@ impl From<MediaSyncProgress> for Progress {
Progress::MediaSync(p)
}
}
impl From<NormalSyncProgress> for Progress {
fn from(p: NormalSyncProgress) -> Self {
Progress::NormalSync(p)
}
}

View file

@ -238,7 +238,7 @@ impl HTTPSyncClient {
mut progress_fn: P,
) -> Result<NamedTempFile>
where
P: FnMut(FullSyncProgress),
P: FnMut(FullSyncProgress, bool),
{
let mut temp_file = NamedTempFile::new_in(folder)?;
let (size, mut stream) = self.download_inner().await?;
@ -250,8 +250,9 @@ impl HTTPSyncClient {
let chunk = chunk?;
temp_file.write_all(&chunk)?;
progress.transferred_bytes += chunk.len();
progress_fn(progress);
progress_fn(progress, true);
}
progress_fn(progress, false);
Ok(temp_file)
}
@ -265,7 +266,7 @@ impl HTTPSyncClient {
pub(crate) async fn upload<P>(&mut self, col_path: &Path, progress_fn: P) -> Result<()>
where
P: FnMut(FullSyncProgress) + Send + Sync + 'static,
P: FnMut(FullSyncProgress, bool) + Send + Sync + 'static,
{
let file = tokio::fs::File::open(col_path).await?;
let total_bytes = file.metadata().await?.len() as usize;
@ -304,7 +305,7 @@ struct ProgressWrapper<S, P> {
impl<S, P> Stream for ProgressWrapper<S, P>
where
S: AsyncRead,
P: FnMut(FullSyncProgress),
P: FnMut(FullSyncProgress, bool),
{
type Item = std::result::Result<Bytes, std::io::Error>;
@ -312,11 +313,14 @@ where
let mut buf = vec![0; 16 * 1024];
let this = self.project();
match ready!(this.reader.poll_read(cx, &mut buf)) {
Ok(0) => Poll::Ready(None),
Ok(0) => {
(this.progress_fn)(*this.progress, false);
Poll::Ready(None)
}
Ok(size) => {
buf.resize(size, 0);
this.progress.transferred_bytes += size;
(this.progress_fn)(*this.progress);
(this.progress_fn)(*this.progress, true);
Poll::Ready(Some(Ok(Bytes::from(buf))))
}
Err(e) => Poll::Ready(Some(Err(e))),
@ -410,13 +414,13 @@ mod test {
let dir = tempdir()?;
let out_path = syncer
.download(&dir.path(), |progress| {
.download(&dir.path(), |progress, _throttle| {
println!("progress: {:?}", progress);
})
.await?;
syncer
.upload(&out_path.path(), |progress| {
.upload(&out_path.path(), |progress, _throttle| {
println!("progress {:?}", progress);
})
.await?;

View file

@ -27,8 +27,27 @@ use std::io::prelude::*;
use std::{collections::HashMap, path::Path, time::Duration};
use tempfile::NamedTempFile;
#[derive(Default, Debug)]
pub struct SyncProgress {}
#[derive(Default, Debug, Clone, Copy)]
pub struct NormalSyncProgress {
pub stage: SyncStage,
pub local_update: usize,
pub local_remove: usize,
pub remote_update: usize,
pub remote_remove: usize,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SyncStage {
Connecting,
Syncing,
Finalizing,
}
impl Default for SyncStage {
fn default() -> Self {
SyncStage::Connecting
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SyncMeta {
@ -223,9 +242,11 @@ pub struct SyncAuth {
pub host_number: u32,
}
struct NormalSyncer<'a> {
struct NormalSyncer<'a, F> {
col: &'a mut Collection,
remote: HTTPSyncClient,
progress: NormalSyncProgress,
progress_fn: F,
}
impl Usn {
@ -239,17 +260,30 @@ impl Usn {
}
}
impl NormalSyncer<'_> {
impl<F> NormalSyncer<'_, F>
where
F: FnMut(NormalSyncProgress, bool),
{
/// Create a new syncing instance. If host_number is unavailable, use 0.
pub fn new(col: &mut Collection, auth: SyncAuth) -> NormalSyncer<'_> {
pub fn new(col: &mut Collection, auth: SyncAuth, progress_fn: F) -> NormalSyncer<'_, F>
where
F: FnMut(NormalSyncProgress, bool),
{
NormalSyncer {
col,
remote: HTTPSyncClient::new(Some(auth.hkey), auth.host_number),
progress: NormalSyncProgress::default(),
progress_fn,
}
}
fn fire_progress_cb(&mut self, throttle: bool) {
(self.progress_fn)(self.progress, throttle)
}
pub async fn sync(&mut self) -> Result<SyncOutput> {
debug!(self.col.log, "fetching meta...");
self.fire_progress_cb(false);
let state: SyncState = self.get_sync_state().await?;
debug!(self.col.log, "fetched"; "state"=>?&state);
match state.required {
@ -330,6 +364,9 @@ impl NormalSyncer<'_> {
/// Sync. Caller must have created a transaction, and should call
/// abort on failure.
async fn normal_sync_inner(&mut self, mut state: SyncState) -> Result<SyncOutput> {
self.progress.stage = SyncStage::Syncing;
self.fire_progress_cb(false);
debug!(self.col.log, "start");
self.start_and_process_deletions(&state).await?;
debug!(self.col.log, "unchunked changes");
@ -338,6 +375,10 @@ impl NormalSyncer<'_> {
self.process_chunks_from_server().await?;
debug!(self.col.log, "begin stream to server");
self.send_chunks_to_server(&state).await?;
self.progress.stage = SyncStage::Finalizing;
self.fire_progress_cb(false);
debug!(self.col.log, "sanity check");
self.sanity_check().await?;
debug!(self.col.log, "finalize");
@ -348,8 +389,8 @@ impl NormalSyncer<'_> {
// The following operations assume a transaction has been set up.
async fn start_and_process_deletions(&self, state: &SyncState) -> Result<()> {
let removed_on_remote: Graves = self
async fn start_and_process_deletions(&mut self, state: &SyncState) -> Result<()> {
let remote: Graves = self
.remote
.start(
state.usn_at_last_sync,
@ -359,26 +400,30 @@ impl NormalSyncer<'_> {
.await?;
debug!(self.col.log, "removed on remote";
"cards"=>removed_on_remote.cards.len(),
"notes"=>removed_on_remote.notes.len(),
"decks"=>removed_on_remote.decks.len());
"cards"=>remote.cards.len(),
"notes"=>remote.notes.len(),
"decks"=>remote.decks.len());
let mut locally_removed = self.col.storage.pending_graves(state.pending_usn)?;
let mut local = self.col.storage.pending_graves(state.pending_usn)?;
if let Some(new_usn) = state.new_usn {
self.col.storage.update_pending_grave_usns(new_usn)?;
}
debug!(self.col.log, "locally removed ";
"cards"=>locally_removed.cards.len(),
"notes"=>locally_removed.notes.len(),
"decks"=>locally_removed.decks.len());
"cards"=>local.cards.len(),
"notes"=>local.notes.len(),
"decks"=>local.decks.len());
while let Some(chunk) = locally_removed.take_chunk() {
while let Some(chunk) = local.take_chunk() {
debug!(self.col.log, "sending graves chunk");
self.progress.local_remove += chunk.cards.len() + chunk.notes.len() + chunk.decks.len();
self.remote.apply_graves(chunk).await?;
self.fire_progress_cb(true);
}
self.col.apply_graves(removed_on_remote, state.latest_usn)?;
self.progress.remote_remove = remote.cards.len() + remote.notes.len() + remote.decks.len();
self.col.apply_graves(remote, state.latest_usn)?;
self.fire_progress_cb(true);
debug!(self.col.log, "applied server graves");
Ok(())
@ -390,27 +435,41 @@ impl NormalSyncer<'_> {
// usefulness.
async fn process_unchunked_changes(&mut self, state: &SyncState) -> Result<()> {
debug!(self.col.log, "gathering local changes");
let local_changes = self.col.local_unchunked_changes(
let local = self.col.local_unchunked_changes(
state.pending_usn,
state.new_usn,
state.local_is_newer,
)?;
debug!(self.col.log, "sending";
"notetypes"=>local_changes.notetypes.len(),
"decks"=>local_changes.decks_and_config.decks.len(),
"deck config"=>local_changes.decks_and_config.config.len(),
"tags"=>local_changes.tags.len(),
"notetypes"=>local.notetypes.len(),
"decks"=>local.decks_and_config.decks.len(),
"deck config"=>local.decks_and_config.config.len(),
"tags"=>local.tags.len(),
);
let remote_changes = self.remote.apply_changes(local_changes).await?;
self.progress.local_update += local.notetypes.len()
+ local.decks_and_config.decks.len()
+ local.decks_and_config.config.len()
+ local.tags.len();
let remote = self.remote.apply_changes(local).await?;
self.fire_progress_cb(true);
debug!(self.col.log, "received";
"notetypes"=>remote_changes.notetypes.len(),
"decks"=>remote_changes.decks_and_config.decks.len(),
"deck config"=>remote_changes.decks_and_config.config.len(),
"tags"=>remote_changes.tags.len(),
"notetypes"=>remote.notetypes.len(),
"decks"=>remote.decks_and_config.decks.len(),
"deck config"=>remote.decks_and_config.config.len(),
"tags"=>remote.tags.len(),
);
self.col.apply_changes(remote_changes, state.latest_usn)
self.progress.remote_update += remote.notetypes.len()
+ remote.decks_and_config.decks.len()
+ remote.decks_and_config.config.len()
+ remote.tags.len();
self.col.apply_changes(remote, state.latest_usn)?;
self.fire_progress_cb(true);
Ok(())
}
async fn process_chunks_from_server(&mut self) -> Result<()> {
@ -424,16 +483,21 @@ impl NormalSyncer<'_> {
"revlog"=>chunk.revlog.len(),
);
self.progress.remote_update +=
chunk.cards.len() + chunk.notes.len() + chunk.revlog.len();
let done = chunk.done;
self.col.apply_chunk(chunk)?;
self.fire_progress_cb(true);
if done {
return Ok(());
}
}
}
async fn send_chunks_to_server(&self, state: &SyncState) -> Result<()> {
async fn send_chunks_to_server(&mut self, state: &SyncState) -> Result<()> {
let mut ids = self.col.get_chunkable_ids(state.pending_usn)?;
loop {
@ -447,8 +511,13 @@ impl NormalSyncer<'_> {
"revlog"=>chunk.revlog.len(),
);
self.progress.local_update +=
chunk.cards.len() + chunk.notes.len() + chunk.revlog.len();
self.remote.apply_chunk(chunk).await?;
self.fire_progress_cb(true);
if done {
return Ok(());
}
@ -519,21 +588,24 @@ pub async fn sync_login(username: &str, password: &str) -> Result<SyncAuth> {
impl Collection {
pub async fn get_sync_status(&mut self, auth: SyncAuth) -> Result<SyncOutput> {
NormalSyncer::new(self, auth)
NormalSyncer::new(self, auth, |_p, _t| ())
.get_sync_state()
.await
.map(Into::into)
}
pub async fn normal_sync(&mut self, auth: SyncAuth) -> Result<SyncOutput> {
pub async fn normal_sync<F>(&mut self, auth: SyncAuth, progress_fn: F) -> Result<SyncOutput>
where
F: FnMut(NormalSyncProgress, bool),
{
// fixme: server abort on failure
NormalSyncer::new(self, auth).sync().await
NormalSyncer::new(self, auth, progress_fn).sync().await
}
/// Upload collection to AnkiWeb. Caller must re-open afterwards.
pub async fn full_upload<F>(mut self, auth: SyncAuth, progress_fn: F) -> Result<()>
where
F: FnMut(FullSyncProgress) + Send + Sync + 'static,
F: FnMut(FullSyncProgress, bool) + Send + Sync + 'static,
{
self.before_upload()?;
let col_path = self.col_path.clone();
@ -546,7 +618,7 @@ impl Collection {
/// Download collection from AnkiWeb. Caller must re-open afterwards.
pub async fn full_download<F>(self, auth: SyncAuth, progress_fn: F) -> Result<()>
where
F: FnMut(FullSyncProgress),
F: FnMut(FullSyncProgress, bool),
{
let col_path = self.col_path.clone();
let folder = col_path.parent().unwrap();