log media DB change registration progress, and allow aborting it

This commit is contained in:
Damien Elmes 2020-02-06 20:38:45 +10:00
parent e5f9ed5a5b
commit 7ae6244f6a
6 changed files with 230 additions and 147 deletions

View file

@ -104,7 +104,7 @@ message SyncError {
} }
message MediaSyncProgress { message MediaSyncProgress {
uint32 downloaded_meta = 1; uint32 checked = 1;
uint32 downloaded_files = 2; uint32 downloaded_files = 2;
uint32 downloaded_deletions = 3; uint32 downloaded_deletions = 3;
uint32 uploaded_files = 4; uint32 uploaded_files = 4;

View file

@ -248,13 +248,13 @@ class MediaSyncDialog(QDialog):
def _logentry_to_text(self, e: MediaSyncProgress) -> str: def _logentry_to_text(self, e: MediaSyncProgress) -> str:
return _( return _(
"Added: %(a_up)s, %(a_dwn)s↓, Removed: %(r_up)s, %(r_dwn)s↓, Checked: %(chk)s" "Added: %(a_up)s %(a_dwn)s↓, Removed: %(r_up)s %(r_dwn)s↓, Checked: %(chk)s"
) % dict( ) % dict(
a_up=e.uploaded_files, a_up=e.uploaded_files,
a_dwn=e.downloaded_files, a_dwn=e.downloaded_files,
r_up=e.uploaded_deletions, r_up=e.uploaded_deletions,
r_dwn=e.downloaded_deletions, r_dwn=e.downloaded_deletions,
chk=e.downloaded_meta, chk=e.checked,
) )
def _on_log_entry(self, entry: LogEntryWithTime): def _on_log_entry(self, entry: LogEntryWithTime):

View file

@ -28,6 +28,7 @@ zip = "0.5.4"
log = "0.4.8" log = "0.4.8"
serde_tuple = "0.4.0" serde_tuple = "0.4.0"
trash = "1.0.0" trash = "1.0.0"
coarsetime = "0.1.12"
[target.'cfg(target_vendor="apple")'.dependencies] [target.'cfg(target_vendor="apple")'.dependencies]
rusqlite = { version = "0.21.0", features = ["trace"] } rusqlite = { version = "0.21.0", features = ["trace"] }

View file

@ -361,7 +361,7 @@ fn progress_to_proto_bytes(progress: Progress) -> Vec<u8> {
let proto = pt::Progress { let proto = pt::Progress {
value: Some(match progress { value: Some(match progress {
Progress::MediaSync(p) => pt::progress::Value::MediaSync(pt::MediaSyncProgress { Progress::MediaSync(p) => pt::progress::Value::MediaSync(pt::MediaSyncProgress {
downloaded_meta: p.downloaded_meta as u32, checked: p.checked as u32,
downloaded_files: p.downloaded_files as u32, downloaded_files: p.downloaded_files as u32,
downloaded_deletions: p.downloaded_deletions as u32, downloaded_deletions: p.downloaded_deletions as u32,
uploaded_files: p.uploaded_files as u32, uploaded_files: p.uploaded_files as u32,

View file

@ -1,7 +1,7 @@
// Copyright: Ankitects Pty Ltd and contributors // Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use crate::err::Result; use crate::err::{AnkiError, Result};
use crate::media::database::{MediaDatabaseContext, MediaEntry}; use crate::media::database::{MediaDatabaseContext, MediaEntry};
use crate::media::files::{ use crate::media::files::{
mtime_as_i64, sha1_of_file, MEDIA_SYNC_FILESIZE_LIMIT, NONSYNCABLE_FILENAME, mtime_as_i64, sha1_of_file, MEDIA_SYNC_FILESIZE_LIMIT, NONSYNCABLE_FILENAME,
@ -17,150 +17,205 @@ struct FilesystemEntry {
is_new: bool, is_new: bool,
} }
pub(super) fn register_changes(ctx: &mut MediaDatabaseContext, folder: &Path) -> Result<()> { pub(super) struct ChangeTracker<'a, F>
ctx.transact(|ctx| { where
// folder mtime unchanged? F: FnMut(usize) -> bool,
let dirmod = mtime_as_i64(folder)?; {
media_folder: &'a Path,
let mut meta = ctx.get_meta()?; progress_cb: F,
if dirmod == meta.folder_mtime { checked: usize,
return Ok(());
} else {
meta.folder_mtime = dirmod;
}
let mtimes = ctx.all_mtimes()?;
let (changed, removed) = media_folder_changes(folder, mtimes)?;
add_updated_entries(ctx, changed)?;
remove_deleted_files(ctx, removed)?;
ctx.set_meta(&meta)?;
Ok(())
})
} }
/// Scan through the media folder, finding changes. impl<F> ChangeTracker<'_, F>
/// Returns (added/changed files, removed files). where
/// F: FnMut(usize) -> bool,
/// Checks for invalid filenames and unicode normalization are deferred {
/// until syncing time, as we can't trust the entries previous Anki versions pub(super) fn new(media_folder: &Path, progress: F) -> ChangeTracker<F> {
/// wrote are correct. ChangeTracker {
fn media_folder_changes( media_folder,
media_folder: &Path, progress_cb: progress,
mut mtimes: HashMap<String, i64>, checked: 0,
) -> Result<(Vec<FilesystemEntry>, Vec<String>)> {
let mut added_or_changed = vec![];
// loop through on-disk files
for dentry in media_folder.read_dir()? {
let dentry = dentry?;
// skip folders
if dentry.file_type()?.is_dir() {
continue;
} }
}
// if the filename is not valid unicode, skip it fn fire_progress_cb(&mut self) -> Result<()> {
let fname_os = dentry.file_name(); if (self.progress_cb)(self.checked) {
let fname = match fname_os.to_str() { Ok(())
Some(s) => s, } else {
None => continue, Err(AnkiError::Interrupted)
};
// ignore blacklisted files
if NONSYNCABLE_FILENAME.is_match(fname) {
continue;
} }
}
// ignore large files pub(super) fn register_changes(&mut self, ctx: &mut MediaDatabaseContext) -> Result<()> {
let metadata = dentry.metadata()?; ctx.transact(|ctx| {
if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 { // folder mtime unchanged?
continue; let dirmod = mtime_as_i64(self.media_folder)?;
}
// remove from mtimes for later deletion tracking let mut meta = ctx.get_meta()?;
let previous_mtime = mtimes.remove(fname); if dirmod == meta.folder_mtime {
return Ok(());
} else {
meta.folder_mtime = dirmod;
}
// skip files that have not been modified let mtimes = ctx.all_mtimes()?;
let mtime = metadata self.checked += mtimes.len();
.modified()? self.fire_progress_cb()?;
.duration_since(time::UNIX_EPOCH)
.unwrap() let (changed, removed) = self.media_folder_changes(mtimes)?;
.as_secs() as i64;
if let Some(previous_mtime) = previous_mtime { self.add_updated_entries(ctx, changed)?;
if previous_mtime == mtime { self.remove_deleted_files(ctx, removed)?;
ctx.set_meta(&meta)?;
// unconditional fire at end of op for accurate counts
self.fire_progress_cb()?;
Ok(())
})
}
/// Scan through the media folder, finding changes.
/// Returns (added/changed files, removed files).
///
/// Checks for invalid filenames and unicode normalization are deferred
/// until syncing time, as we can't trust the entries previous Anki versions
/// wrote are correct.
fn media_folder_changes(
&mut self,
mut mtimes: HashMap<String, i64>,
) -> Result<(Vec<FilesystemEntry>, Vec<String>)> {
let mut added_or_changed = vec![];
// loop through on-disk files
for dentry in self.media_folder.read_dir()? {
let dentry = dentry?;
// skip folders
if dentry.file_type()?.is_dir() {
continue; continue;
} }
}
// add entry to the list // if the filename is not valid unicode, skip it
let sha1 = Some(sha1_of_file(&dentry.path())?); let fname_os = dentry.file_name();
added_or_changed.push(FilesystemEntry { let fname = match fname_os.to_str() {
fname: fname.to_string(), Some(s) => s,
sha1, None => continue,
mtime, };
is_new: previous_mtime.is_none(),
});
}
// any remaining entries from the database have been deleted // ignore blacklisted files
let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect(); if NONSYNCABLE_FILENAME.is_match(fname) {
continue;
}
Ok((added_or_changed, removed)) // ignore large files
} let metadata = dentry.metadata()?;
if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 {
continue;
}
/// Add added/updated entries to the media DB. // remove from mtimes for later deletion tracking
/// let previous_mtime = mtimes.remove(fname);
/// Skip files where the mod time differed, but checksums are the same.
fn add_updated_entries( // skip files that have not been modified
ctx: &mut MediaDatabaseContext, let mtime = metadata
entries: Vec<FilesystemEntry>, .modified()?
) -> Result<()> { .duration_since(time::UNIX_EPOCH)
for fentry in entries { .unwrap()
let mut sync_required = true; .as_secs() as i64;
if !fentry.is_new { if let Some(previous_mtime) = previous_mtime {
if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { if previous_mtime == mtime {
if db_entry.sha1 == fentry.sha1 { continue;
// mtime bumped but file contents are the same,
// so we can preserve the current updated flag.
// we still need to update the mtime however.
sync_required = db_entry.sync_required
} }
} }
};
ctx.set_entry(&MediaEntry { // add entry to the list
fname: fentry.fname, let sha1 = Some(sha1_of_file(&dentry.path())?);
sha1: fentry.sha1, added_or_changed.push(FilesystemEntry {
mtime: fentry.mtime, fname: fname.to_string(),
sync_required, sha1,
})?; mtime,
is_new: previous_mtime.is_none(),
});
self.checked += 1;
if self.checked % 10 == 0 {
self.fire_progress_cb()?;
}
}
// any remaining entries from the database have been deleted
let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect();
Ok((added_or_changed, removed))
} }
Ok(()) /// Add added/updated entries to the media DB.
} ///
/// Skip files where the mod time differed, but checksums are the same.
fn add_updated_entries(
&mut self,
ctx: &mut MediaDatabaseContext,
entries: Vec<FilesystemEntry>,
) -> Result<()> {
for fentry in entries {
let mut sync_required = true;
if !fentry.is_new {
if let Some(db_entry) = ctx.get_entry(&fentry.fname)? {
if db_entry.sha1 == fentry.sha1 {
// mtime bumped but file contents are the same,
// so we can preserve the current updated flag.
// we still need to update the mtime however.
sync_required = db_entry.sync_required
}
}
};
/// Remove deleted files from the media DB. ctx.set_entry(&MediaEntry {
fn remove_deleted_files(ctx: &mut MediaDatabaseContext, removed: Vec<String>) -> Result<()> { fname: fentry.fname,
for fname in removed { sha1: fentry.sha1,
ctx.set_entry(&MediaEntry { mtime: fentry.mtime,
fname, sync_required,
sha1: None, })?;
mtime: 0,
sync_required: true, self.checked += 1;
})?; if self.checked % 10 == 0 {
self.fire_progress_cb()?;
}
}
Ok(())
} }
Ok(()) /// Remove deleted files from the media DB.
fn remove_deleted_files(
&mut self,
ctx: &mut MediaDatabaseContext,
removed: Vec<String>,
) -> Result<()> {
for fname in removed {
ctx.set_entry(&MediaEntry {
fname,
sha1: None,
mtime: 0,
sync_required: true,
})?;
self.checked += 1;
if self.checked % 10 == 0 {
self.fire_progress_cb()?;
}
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::err::Result; use crate::err::Result;
use crate::media::changetracker::register_changes; use crate::media::changetracker::ChangeTracker;
use crate::media::database::MediaEntry; use crate::media::database::MediaEntry;
use crate::media::files::sha1_of_data; use crate::media::files::sha1_of_data;
use crate::media::MediaManager; use crate::media::MediaManager;
@ -197,7 +252,10 @@ mod test {
fs::write(&f1, "hello")?; fs::write(&f1, "hello")?;
change_mtime(&media_dir); change_mtime(&media_dir);
register_changes(&mut ctx, &mgr.media_folder)?;
let progress_cb = |_n| true;
ChangeTracker::new(&mgr.media_folder, progress_cb).register_changes(&mut ctx)?;
let mut entry = ctx.transact(|ctx| { let mut entry = ctx.transact(|ctx| {
assert_eq!(ctx.count()?, 1); assert_eq!(ctx.count()?, 1);
@ -232,7 +290,7 @@ mod test {
Ok(entry) Ok(entry)
})?; })?;
register_changes(&mut ctx, &mgr.media_folder)?; ChangeTracker::new(&mgr.media_folder, progress_cb).register_changes(&mut ctx)?;
ctx.transact(|ctx| { ctx.transact(|ctx| {
assert_eq!(ctx.count()?, 1); assert_eq!(ctx.count()?, 1);
@ -264,7 +322,8 @@ mod test {
fs::remove_file(&f1)?; fs::remove_file(&f1)?;
change_mtime(&media_dir); change_mtime(&media_dir);
register_changes(&mut ctx, &mgr.media_folder)?;
ChangeTracker::new(&mgr.media_folder, progress_cb).register_changes(&mut ctx)?;
assert_eq!(ctx.count()?, 0); assert_eq!(ctx.count()?, 0);
assert!(!ctx.get_pending_uploads(1)?.is_empty()); assert!(!ctx.get_pending_uploads(1)?.is_empty());

View file

@ -2,7 +2,7 @@
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use crate::err::{AnkiError, Result, SyncErrorKind}; use crate::err::{AnkiError, Result, SyncErrorKind};
use crate::media::changetracker::register_changes; use crate::media::changetracker::ChangeTracker;
use crate::media::database::{MediaDatabaseContext, MediaDatabaseMetadata, MediaEntry}; use crate::media::database::{MediaDatabaseContext, MediaDatabaseMetadata, MediaEntry};
use crate::media::files::{ use crate::media::files::{
add_file_from_ankiweb, data_for_file, mtime_as_i64, normalize_filename, remove_files, AddedFile, add_file_from_ankiweb, data_for_file, mtime_as_i64, normalize_filename, remove_files, AddedFile,
@ -10,6 +10,7 @@ use crate::media::files::{
use crate::media::MediaManager; use crate::media::MediaManager;
use crate::version; use crate::version;
use bytes::Bytes; use bytes::Bytes;
use coarsetime::Instant;
use log::debug; use log::debug;
use reqwest; use reqwest;
use reqwest::{multipart, Client, Response}; use reqwest::{multipart, Client, Response};
@ -25,10 +26,9 @@ static SYNC_MAX_FILES: usize = 25;
static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize;
static SYNC_SINGLE_FILE_MAX_BYTES: usize = 100 * 1024 * 1024; static SYNC_SINGLE_FILE_MAX_BYTES: usize = 100 * 1024 * 1024;
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct MediaSyncProgress { pub struct MediaSyncProgress {
pub downloaded_meta: usize, pub checked: usize,
pub downloaded_files: usize, pub downloaded_files: usize,
pub downloaded_deletions: usize, pub downloaded_deletions: usize,
pub uploaded_files: usize, pub uploaded_files: usize,
@ -45,7 +45,7 @@ where
client: Client, client: Client,
progress_cb: P, progress_cb: P,
progress: MediaSyncProgress, progress: MediaSyncProgress,
progress_updated: u64, progress_updated: Instant,
endpoint: &'a str, endpoint: &'a str,
} }
@ -150,7 +150,7 @@ where
client, client,
progress_cb, progress_cb,
progress: Default::default(), progress: Default::default(),
progress_updated: 0, progress_updated: Instant::now(),
endpoint, endpoint,
} }
} }
@ -161,8 +161,7 @@ where
#[allow(clippy::useless_let_if_seq)] #[allow(clippy::useless_let_if_seq)]
pub async fn sync(&mut self, hkey: &str) -> Result<()> { pub async fn sync(&mut self, hkey: &str) -> Result<()> {
// make sure media DB is up to date self.register_changes()?;
register_changes(&mut self.ctx, self.mgr.media_folder.as_path())?;
let meta = self.ctx.get_meta()?; let meta = self.ctx.get_meta()?;
let client_usn = meta.last_sync_usn; let client_usn = meta.last_sync_usn;
@ -192,11 +191,35 @@ where
self.finalize_sync().await?; self.finalize_sync().await?;
} }
self.fire_progress_cb()?;
debug!("media sync complete"); debug!("media sync complete");
Ok(()) Ok(())
} }
/// Make sure media DB is up to date.
fn register_changes(&mut self) -> Result<()> {
// make borrow checker happy
let progress = &mut self.progress;
let updated = &mut self.progress_updated;
let progress_cb = &self.progress_cb;
let progress = |checked| {
progress.checked = checked;
let now = Instant::now();
if now.duration_since(*updated).as_secs() < 1 {
true
} else {
*updated = now;
(progress_cb)(progress)
}
};
ChangeTracker::new(self.mgr.media_folder.as_path(), progress)
.register_changes(&mut self.ctx)
}
async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> { async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> {
let url = format!("{}begin", self.endpoint); let url = format!("{}begin", self.endpoint);
@ -229,8 +252,8 @@ where
} }
last_usn = batch.last().unwrap().usn; last_usn = batch.last().unwrap().usn;
self.progress.downloaded_meta += batch.len(); self.progress.checked += batch.len();
self.notify_progress()?; self.maybe_fire_progress_cb()?;
let (to_download, to_delete, to_remove_pending) = let (to_download, to_delete, to_remove_pending) =
determine_required_changes(&mut self.ctx, &batch)?; determine_required_changes(&mut self.ctx, &batch)?;
@ -238,7 +261,7 @@ where
// file removal // file removal
remove_files(self.mgr.media_folder.as_path(), to_delete.as_slice())?; remove_files(self.mgr.media_folder.as_path(), to_delete.as_slice())?;
self.progress.downloaded_deletions += to_delete.len(); self.progress.downloaded_deletions += to_delete.len();
self.notify_progress()?; self.maybe_fire_progress_cb()?;
// file download // file download
let mut downloaded = vec![]; let mut downloaded = vec![];
@ -258,7 +281,7 @@ where
downloaded.extend(download_batch); downloaded.extend(download_batch);
self.progress.downloaded_files += len; self.progress.downloaded_files += len;
self.notify_progress()?; self.maybe_fire_progress_cb()?;
} }
// then update the DB // then update the DB
@ -296,7 +319,7 @@ where
self.progress.uploaded_files += processed_files.len(); self.progress.uploaded_files += processed_files.len();
self.progress.uploaded_deletions += processed_deletions.len(); self.progress.uploaded_deletions += processed_deletions.len();
self.notify_progress()?; self.maybe_fire_progress_cb()?;
let fnames: Vec<_> = processed_files let fnames: Vec<_> = processed_files
.iter() .iter()
@ -347,23 +370,23 @@ where
} }
} }
fn notify_progress(&mut self) -> Result<()> { fn fire_progress_cb(&self) -> Result<()> {
let now = time::SystemTime::now()
.duration_since(time::UNIX_EPOCH)
.unwrap()
.as_secs();
if now - self.progress_updated < 1 {
return Ok(());
}
if (self.progress_cb)(&self.progress) { if (self.progress_cb)(&self.progress) {
self.progress_updated = now;
Ok(()) Ok(())
} else { } else {
Err(AnkiError::Interrupted) Err(AnkiError::Interrupted)
} }
} }
fn maybe_fire_progress_cb(&mut self) -> Result<()> {
let now = Instant::now();
if now.duration_since(self.progress_updated).as_secs() < 1 {
return Ok(());
}
self.progress_updated = now;
self.fire_progress_cb()
}
async fn fetch_record_batch(&self, last_usn: i32) -> Result<Vec<ServerMediaRecord>> { async fn fetch_record_batch(&self, last_usn: i32) -> Result<Vec<ServerMediaRecord>> {
let url = format!("{}mediaChanges", self.endpoint); let url = format!("{}mediaChanges", self.endpoint);