diff --git a/proto/backend.proto b/proto/backend.proto index 2974a4b7f..992c9cab4 100644 --- a/proto/backend.proto +++ b/proto/backend.proto @@ -104,7 +104,7 @@ message SyncError { } message MediaSyncProgress { - uint32 downloaded_meta = 1; + uint32 checked = 1; uint32 downloaded_files = 2; uint32 downloaded_deletions = 3; uint32 uploaded_files = 4; diff --git a/qt/aqt/mediasync.py b/qt/aqt/mediasync.py index 074fa3d80..82ab6513d 100644 --- a/qt/aqt/mediasync.py +++ b/qt/aqt/mediasync.py @@ -248,13 +248,13 @@ class MediaSyncDialog(QDialog): def _logentry_to_text(self, e: MediaSyncProgress) -> str: return _( - "Added: %(a_up)s↑, %(a_dwn)s↓, Removed: %(r_up)s↑, %(r_dwn)s↓, Checked: %(chk)s" + "Added: %(a_up)s↑ %(a_dwn)s↓, Removed: %(r_up)s↑ %(r_dwn)s↓, Checked: %(chk)s" ) % dict( a_up=e.uploaded_files, a_dwn=e.downloaded_files, r_up=e.uploaded_deletions, r_dwn=e.downloaded_deletions, - chk=e.downloaded_meta, + chk=e.checked, ) def _on_log_entry(self, entry: LogEntryWithTime): diff --git a/rslib/Cargo.toml b/rslib/Cargo.toml index 2b589a0d2..222105b58 100644 --- a/rslib/Cargo.toml +++ b/rslib/Cargo.toml @@ -28,6 +28,7 @@ zip = "0.5.4" log = "0.4.8" serde_tuple = "0.4.0" trash = "1.0.0" +coarsetime = "0.1.12" [target.'cfg(target_vendor="apple")'.dependencies] rusqlite = { version = "0.21.0", features = ["trace"] } diff --git a/rslib/src/backend.rs b/rslib/src/backend.rs index e4bb6df8d..e34e93f14 100644 --- a/rslib/src/backend.rs +++ b/rslib/src/backend.rs @@ -361,7 +361,7 @@ fn progress_to_proto_bytes(progress: Progress) -> Vec { let proto = pt::Progress { value: Some(match progress { Progress::MediaSync(p) => pt::progress::Value::MediaSync(pt::MediaSyncProgress { - downloaded_meta: p.downloaded_meta as u32, + checked: p.checked as u32, downloaded_files: p.downloaded_files as u32, downloaded_deletions: p.downloaded_deletions as u32, uploaded_files: p.uploaded_files as u32, diff --git a/rslib/src/media/changetracker.rs b/rslib/src/media/changetracker.rs index 07ee6d47a..8949ca67d 100644 --- a/rslib/src/media/changetracker.rs +++ b/rslib/src/media/changetracker.rs @@ -1,7 +1,7 @@ // Copyright: Ankitects Pty Ltd and contributors // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html -use crate::err::Result; +use crate::err::{AnkiError, Result}; use crate::media::database::{MediaDatabaseContext, MediaEntry}; use crate::media::files::{ mtime_as_i64, sha1_of_file, MEDIA_SYNC_FILESIZE_LIMIT, NONSYNCABLE_FILENAME, @@ -17,150 +17,205 @@ struct FilesystemEntry { is_new: bool, } -pub(super) fn register_changes(ctx: &mut MediaDatabaseContext, folder: &Path) -> Result<()> { - ctx.transact(|ctx| { - // folder mtime unchanged? - let dirmod = mtime_as_i64(folder)?; - - let mut meta = ctx.get_meta()?; - if dirmod == meta.folder_mtime { - return Ok(()); - } else { - meta.folder_mtime = dirmod; - } - - let mtimes = ctx.all_mtimes()?; - - let (changed, removed) = media_folder_changes(folder, mtimes)?; - - add_updated_entries(ctx, changed)?; - remove_deleted_files(ctx, removed)?; - - ctx.set_meta(&meta)?; - - Ok(()) - }) +pub(super) struct ChangeTracker<'a, F> +where + F: FnMut(usize) -> bool, +{ + media_folder: &'a Path, + progress_cb: F, + checked: usize, } -/// Scan through the media folder, finding changes. -/// Returns (added/changed files, removed files). -/// -/// Checks for invalid filenames and unicode normalization are deferred -/// until syncing time, as we can't trust the entries previous Anki versions -/// wrote are correct. -fn media_folder_changes( - media_folder: &Path, - mut mtimes: HashMap, -) -> Result<(Vec, Vec)> { - let mut added_or_changed = vec![]; - - // loop through on-disk files - for dentry in media_folder.read_dir()? { - let dentry = dentry?; - - // skip folders - if dentry.file_type()?.is_dir() { - continue; +impl ChangeTracker<'_, F> +where + F: FnMut(usize) -> bool, +{ + pub(super) fn new(media_folder: &Path, progress: F) -> ChangeTracker { + ChangeTracker { + media_folder, + progress_cb: progress, + checked: 0, } + } - // if the filename is not valid unicode, skip it - let fname_os = dentry.file_name(); - let fname = match fname_os.to_str() { - Some(s) => s, - None => continue, - }; - - // ignore blacklisted files - if NONSYNCABLE_FILENAME.is_match(fname) { - continue; + fn fire_progress_cb(&mut self) -> Result<()> { + if (self.progress_cb)(self.checked) { + Ok(()) + } else { + Err(AnkiError::Interrupted) } + } - // ignore large files - let metadata = dentry.metadata()?; - if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 { - continue; - } + pub(super) fn register_changes(&mut self, ctx: &mut MediaDatabaseContext) -> Result<()> { + ctx.transact(|ctx| { + // folder mtime unchanged? + let dirmod = mtime_as_i64(self.media_folder)?; - // remove from mtimes for later deletion tracking - let previous_mtime = mtimes.remove(fname); + let mut meta = ctx.get_meta()?; + if dirmod == meta.folder_mtime { + return Ok(()); + } else { + meta.folder_mtime = dirmod; + } - // skip files that have not been modified - let mtime = metadata - .modified()? - .duration_since(time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64; - if let Some(previous_mtime) = previous_mtime { - if previous_mtime == mtime { + let mtimes = ctx.all_mtimes()?; + self.checked += mtimes.len(); + self.fire_progress_cb()?; + + let (changed, removed) = self.media_folder_changes(mtimes)?; + + self.add_updated_entries(ctx, changed)?; + self.remove_deleted_files(ctx, removed)?; + + ctx.set_meta(&meta)?; + + // unconditional fire at end of op for accurate counts + self.fire_progress_cb()?; + + Ok(()) + }) + } + + /// Scan through the media folder, finding changes. + /// Returns (added/changed files, removed files). + /// + /// Checks for invalid filenames and unicode normalization are deferred + /// until syncing time, as we can't trust the entries previous Anki versions + /// wrote are correct. + fn media_folder_changes( + &mut self, + mut mtimes: HashMap, + ) -> Result<(Vec, Vec)> { + let mut added_or_changed = vec![]; + + // loop through on-disk files + for dentry in self.media_folder.read_dir()? { + let dentry = dentry?; + + // skip folders + if dentry.file_type()?.is_dir() { continue; } - } - // add entry to the list - let sha1 = Some(sha1_of_file(&dentry.path())?); - added_or_changed.push(FilesystemEntry { - fname: fname.to_string(), - sha1, - mtime, - is_new: previous_mtime.is_none(), - }); - } + // if the filename is not valid unicode, skip it + let fname_os = dentry.file_name(); + let fname = match fname_os.to_str() { + Some(s) => s, + None => continue, + }; - // any remaining entries from the database have been deleted - let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect(); + // ignore blacklisted files + if NONSYNCABLE_FILENAME.is_match(fname) { + continue; + } - Ok((added_or_changed, removed)) -} + // ignore large files + let metadata = dentry.metadata()?; + if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 { + continue; + } -/// Add added/updated entries to the media DB. -/// -/// Skip files where the mod time differed, but checksums are the same. -fn add_updated_entries( - ctx: &mut MediaDatabaseContext, - entries: Vec, -) -> Result<()> { - for fentry in entries { - let mut sync_required = true; - if !fentry.is_new { - if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { - if db_entry.sha1 == fentry.sha1 { - // mtime bumped but file contents are the same, - // so we can preserve the current updated flag. - // we still need to update the mtime however. - sync_required = db_entry.sync_required + // remove from mtimes for later deletion tracking + let previous_mtime = mtimes.remove(fname); + + // skip files that have not been modified + let mtime = metadata + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + if let Some(previous_mtime) = previous_mtime { + if previous_mtime == mtime { + continue; } } - }; - ctx.set_entry(&MediaEntry { - fname: fentry.fname, - sha1: fentry.sha1, - mtime: fentry.mtime, - sync_required, - })?; + // add entry to the list + let sha1 = Some(sha1_of_file(&dentry.path())?); + added_or_changed.push(FilesystemEntry { + fname: fname.to_string(), + sha1, + mtime, + is_new: previous_mtime.is_none(), + }); + + self.checked += 1; + if self.checked % 10 == 0 { + self.fire_progress_cb()?; + } + } + + // any remaining entries from the database have been deleted + let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect(); + + Ok((added_or_changed, removed)) } - Ok(()) -} + /// Add added/updated entries to the media DB. + /// + /// Skip files where the mod time differed, but checksums are the same. + fn add_updated_entries( + &mut self, + ctx: &mut MediaDatabaseContext, + entries: Vec, + ) -> Result<()> { + for fentry in entries { + let mut sync_required = true; + if !fentry.is_new { + if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { + if db_entry.sha1 == fentry.sha1 { + // mtime bumped but file contents are the same, + // so we can preserve the current updated flag. + // we still need to update the mtime however. + sync_required = db_entry.sync_required + } + } + }; -/// Remove deleted files from the media DB. -fn remove_deleted_files(ctx: &mut MediaDatabaseContext, removed: Vec) -> Result<()> { - for fname in removed { - ctx.set_entry(&MediaEntry { - fname, - sha1: None, - mtime: 0, - sync_required: true, - })?; + ctx.set_entry(&MediaEntry { + fname: fentry.fname, + sha1: fentry.sha1, + mtime: fentry.mtime, + sync_required, + })?; + + self.checked += 1; + if self.checked % 10 == 0 { + self.fire_progress_cb()?; + } + } + + Ok(()) } - Ok(()) + /// Remove deleted files from the media DB. + fn remove_deleted_files( + &mut self, + ctx: &mut MediaDatabaseContext, + removed: Vec, + ) -> Result<()> { + for fname in removed { + ctx.set_entry(&MediaEntry { + fname, + sha1: None, + mtime: 0, + sync_required: true, + })?; + + self.checked += 1; + if self.checked % 10 == 0 { + self.fire_progress_cb()?; + } + } + + Ok(()) + } } #[cfg(test)] mod test { use crate::err::Result; - use crate::media::changetracker::register_changes; + use crate::media::changetracker::ChangeTracker; use crate::media::database::MediaEntry; use crate::media::files::sha1_of_data; use crate::media::MediaManager; @@ -197,7 +252,10 @@ mod test { fs::write(&f1, "hello")?; change_mtime(&media_dir); - register_changes(&mut ctx, &mgr.media_folder)?; + + let progress_cb = |_n| true; + + ChangeTracker::new(&mgr.media_folder, progress_cb).register_changes(&mut ctx)?; let mut entry = ctx.transact(|ctx| { assert_eq!(ctx.count()?, 1); @@ -232,7 +290,7 @@ mod test { Ok(entry) })?; - register_changes(&mut ctx, &mgr.media_folder)?; + ChangeTracker::new(&mgr.media_folder, progress_cb).register_changes(&mut ctx)?; ctx.transact(|ctx| { assert_eq!(ctx.count()?, 1); @@ -264,7 +322,8 @@ mod test { fs::remove_file(&f1)?; change_mtime(&media_dir); - register_changes(&mut ctx, &mgr.media_folder)?; + + ChangeTracker::new(&mgr.media_folder, progress_cb).register_changes(&mut ctx)?; assert_eq!(ctx.count()?, 0); assert!(!ctx.get_pending_uploads(1)?.is_empty()); diff --git a/rslib/src/media/sync.rs b/rslib/src/media/sync.rs index 1794fc896..7fc358f3f 100644 --- a/rslib/src/media/sync.rs +++ b/rslib/src/media/sync.rs @@ -2,7 +2,7 @@ // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html use crate::err::{AnkiError, Result, SyncErrorKind}; -use crate::media::changetracker::register_changes; +use crate::media::changetracker::ChangeTracker; use crate::media::database::{MediaDatabaseContext, MediaDatabaseMetadata, MediaEntry}; use crate::media::files::{ add_file_from_ankiweb, data_for_file, mtime_as_i64, normalize_filename, remove_files, AddedFile, @@ -10,6 +10,7 @@ use crate::media::files::{ use crate::media::MediaManager; use crate::version; use bytes::Bytes; +use coarsetime::Instant; use log::debug; use reqwest; use reqwest::{multipart, Client, Response}; @@ -25,10 +26,9 @@ static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; static SYNC_SINGLE_FILE_MAX_BYTES: usize = 100 * 1024 * 1024; - #[derive(Debug, Default)] pub struct MediaSyncProgress { - pub downloaded_meta: usize, + pub checked: usize, pub downloaded_files: usize, pub downloaded_deletions: usize, pub uploaded_files: usize, @@ -45,7 +45,7 @@ where client: Client, progress_cb: P, progress: MediaSyncProgress, - progress_updated: u64, + progress_updated: Instant, endpoint: &'a str, } @@ -150,7 +150,7 @@ where client, progress_cb, progress: Default::default(), - progress_updated: 0, + progress_updated: Instant::now(), endpoint, } } @@ -161,8 +161,7 @@ where #[allow(clippy::useless_let_if_seq)] pub async fn sync(&mut self, hkey: &str) -> Result<()> { - // make sure media DB is up to date - register_changes(&mut self.ctx, self.mgr.media_folder.as_path())?; + self.register_changes()?; let meta = self.ctx.get_meta()?; let client_usn = meta.last_sync_usn; @@ -192,11 +191,35 @@ where self.finalize_sync().await?; } + self.fire_progress_cb()?; + debug!("media sync complete"); Ok(()) } + /// Make sure media DB is up to date. + fn register_changes(&mut self) -> Result<()> { + // make borrow checker happy + let progress = &mut self.progress; + let updated = &mut self.progress_updated; + let progress_cb = &self.progress_cb; + + let progress = |checked| { + progress.checked = checked; + let now = Instant::now(); + if now.duration_since(*updated).as_secs() < 1 { + true + } else { + *updated = now; + (progress_cb)(progress) + } + }; + + ChangeTracker::new(self.mgr.media_folder.as_path(), progress) + .register_changes(&mut self.ctx) + } + async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> { let url = format!("{}begin", self.endpoint); @@ -229,8 +252,8 @@ where } last_usn = batch.last().unwrap().usn; - self.progress.downloaded_meta += batch.len(); - self.notify_progress()?; + self.progress.checked += batch.len(); + self.maybe_fire_progress_cb()?; let (to_download, to_delete, to_remove_pending) = determine_required_changes(&mut self.ctx, &batch)?; @@ -238,7 +261,7 @@ where // file removal remove_files(self.mgr.media_folder.as_path(), to_delete.as_slice())?; self.progress.downloaded_deletions += to_delete.len(); - self.notify_progress()?; + self.maybe_fire_progress_cb()?; // file download let mut downloaded = vec![]; @@ -258,7 +281,7 @@ where downloaded.extend(download_batch); self.progress.downloaded_files += len; - self.notify_progress()?; + self.maybe_fire_progress_cb()?; } // then update the DB @@ -296,7 +319,7 @@ where self.progress.uploaded_files += processed_files.len(); self.progress.uploaded_deletions += processed_deletions.len(); - self.notify_progress()?; + self.maybe_fire_progress_cb()?; let fnames: Vec<_> = processed_files .iter() @@ -347,23 +370,23 @@ where } } - fn notify_progress(&mut self) -> Result<()> { - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .unwrap() - .as_secs(); - if now - self.progress_updated < 1 { - return Ok(()); - } - + fn fire_progress_cb(&self) -> Result<()> { if (self.progress_cb)(&self.progress) { - self.progress_updated = now; Ok(()) } else { Err(AnkiError::Interrupted) } } + fn maybe_fire_progress_cb(&mut self) -> Result<()> { + let now = Instant::now(); + if now.duration_since(self.progress_updated).as_secs() < 1 { + return Ok(()); + } + self.progress_updated = now; + self.fire_progress_cb() + } + async fn fetch_record_batch(&self, last_usn: i32) -> Result> { let url = format!("{}mediaChanges", self.endpoint);