diff --git a/proto/backend.proto b/proto/backend.proto index a51b1f3c2..6b4cddc6f 100644 --- a/proto/backend.proto +++ b/proto/backend.proto @@ -56,6 +56,8 @@ message BackendError { StringError network_error = 5; Empty ankiweb_auth_failed = 6; StringError ankiweb_misc_error = 7; + // user interrupted operation + Empty interrupted = 8; } } diff --git a/rslib/src/backend.rs b/rslib/src/backend.rs index c60b201f6..c4e929555 100644 --- a/rslib/src/backend.rs +++ b/rslib/src/backend.rs @@ -37,6 +37,7 @@ impl std::convert::From for pt::BackendError { AnkiError::NetworkError { info } => V::NetworkError(pt::StringError { info }), AnkiError::AnkiWebAuthenticationFailed => V::AnkiwebAuthFailed(Empty {}), AnkiError::AnkiWebMiscError { info } => V::AnkiwebMiscError(pt::StringError { info }), + AnkiError::Interrupted => V::Interrupted(Empty {}), }; pt::BackendError { value: Some(value) } diff --git a/rslib/src/err.rs b/rslib/src/err.rs index 68afb506a..a45dcf114 100644 --- a/rslib/src/err.rs +++ b/rslib/src/err.rs @@ -28,6 +28,9 @@ pub enum AnkiError { #[fail(display = "AnkiWeb error: {}", info)] AnkiWebMiscError { info: String }, + + #[fail(display = "The user interrupted the operation.")] + Interrupted, } // error helpers diff --git a/rslib/src/media/sync.rs b/rslib/src/media/sync.rs index 760d78f0d..37092be0c 100644 --- a/rslib/src/media/sync.rs +++ b/rslib/src/media/sync.rs @@ -19,8 +19,6 @@ use std::io::{Read, Write}; use std::path::Path; use std::{io, time}; -// fixme: callback using PyEval_SaveThread(); -// fixme: runCommand() could be releasing GIL, but perhaps overkill for all commands? // fixme: sync url // fixme: version string // fixme: shards @@ -32,15 +30,31 @@ static SYNC_URL: &str = "https://sync.ankiweb.net/msync/"; static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; -struct SyncContext<'a> { +/// The counts are not cumulative - the progress hook should accumulate them. +#[derive(Debug)] +pub enum Progress { + DownloadedChanges(usize), + DownloadedFiles(usize), + Uploaded { files: usize, deletions: usize }, + RemovedFiles(usize), +} + +struct SyncContext<'a, P> +where + P: Fn(Progress) -> bool, +{ mgr: &'a MediaManager, ctx: MediaDatabaseContext<'a>, skey: Option, client: Client, + progress_cb: P, } -impl SyncContext<'_> { - fn new(mgr: &MediaManager) -> SyncContext { +impl

SyncContext<'_, P> +where + P: Fn(Progress) -> bool, +{ + fn new(mgr: &MediaManager, progress_cb: P) -> SyncContext

{ let client = Client::builder() .connect_timeout(time::Duration::from_secs(30)) .build() @@ -52,6 +66,7 @@ impl SyncContext<'_> { ctx, skey: None, client, + progress_cb, } } @@ -84,6 +99,7 @@ impl SyncContext<'_> { let mut last_usn = client_usn; loop { debug!("fetching record batch starting from usn {}", last_usn); + let batch = fetch_record_batch(&self.client, self.skey(), last_usn).await?; if batch.is_empty() { debug!("empty batch, done"); @@ -91,18 +107,33 @@ impl SyncContext<'_> { } last_usn = batch.last().unwrap().usn; + self.progress(Progress::DownloadedChanges(batch.len()))?; + let (to_download, to_delete, to_remove_pending) = determine_required_changes(&mut self.ctx, &batch)?; - // do file removal and additions first + // file removal remove_files(self.mgr.media_folder.as_path(), to_delete.as_slice())?; - let downloaded = download_files( - self.mgr.media_folder.as_path(), - &self.client, - self.skey(), - to_download.as_slice(), - ) - .await?; + self.progress(Progress::RemovedFiles(to_delete.len()))?; + + // file download + let mut downloaded = vec![]; + let mut dl_fnames = to_download.as_slice(); + while !dl_fnames.is_empty() { + let batch: Vec<_> = dl_fnames + .iter() + .take(SYNC_MAX_FILES) + .map(ToOwned::to_owned) + .collect(); + let zip_data = fetch_zip(&self.client, self.skey(), batch.as_slice()).await?; + let download_batch = + extract_into_media_folder(self.mgr.media_folder.as_path(), zip_data)? + .into_iter(); + let len = download_batch.len(); + dl_fnames = &dl_fnames[len..]; + downloaded.extend(download_batch); + self.progress(Progress::DownloadedFiles(len))?; + } // then update the DB self.ctx.transact(|ctx| { @@ -122,9 +153,16 @@ impl SyncContext<'_> { break; } + let file_count = pending.iter().filter(|e| e.sha1.is_some()).count(); + let zip_data = zip_files(&self.mgr.media_folder, &pending)?; send_zip_data(&self.client, self.skey(), zip_data).await?; + self.progress(Progress::Uploaded { + files: file_count, + deletions: pending.len() - file_count, + })?; + let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect(); self.ctx .transact(|ctx| record_clean(ctx, fnames.as_slice()))?; @@ -156,15 +194,25 @@ impl SyncContext<'_> { }) } } + + fn progress(&self, progress: Progress) -> Result<()> { + if (self.progress_cb)(progress) { + Ok(()) + } else { + Err(AnkiError::Interrupted) + } + } } #[allow(clippy::useless_let_if_seq)] -pub async fn sync_media(mgr: &mut MediaManager, hkey: &str) -> Result<()> { - let mut sctx = SyncContext::new(mgr); +pub async fn sync_media(mgr: &mut MediaManager, hkey: &str, progress_cb: F) -> Result<()> +where + F: Fn(Progress) -> bool, +{ + let mut sctx = SyncContext::new(mgr, progress_cb); // make sure media DB is up to date register_changes(&mut sctx.ctx, mgr.media_folder.as_path())?; - //mgr.register_changes()?; let client_usn = sctx.ctx.get_meta()?.last_sync_usn; @@ -396,29 +444,6 @@ async fn fetch_record_batch( } } -async fn download_files( - media_folder: &Path, - client: &Client, - skey: &str, - mut fnames: &[&String], -) -> Result> { - let mut downloaded = vec![]; - while !fnames.is_empty() { - let batch: Vec<_> = fnames - .iter() - .take(SYNC_MAX_FILES) - .map(ToOwned::to_owned) - .collect(); - let zip_data = fetch_zip(client, skey, batch.as_slice()).await?; - let download_batch = extract_into_media_folder(media_folder, zip_data)?.into_iter(); - let len = download_batch.len(); - fnames = &fnames[len..]; - downloaded.extend(download_batch); - } - - Ok(downloaded) -} - #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct ZipRequest<'a> { @@ -618,9 +643,14 @@ mod test { std::fs::write(media_dir.join("test.file").as_path(), "hello")?; + let progress = |progress| { + println!("got progress: {:?}", progress); + true + }; + let mut mgr = MediaManager::new(&media_dir, &media_db)?; - sync_media(&mut mgr, hkey).await?; + sync_media(&mut mgr, hkey, progress).await?; Ok(()) }