diff --git a/rslib/src/media/database.rs b/rslib/src/media/database.rs index 666c65f6d..ae808dac0 100644 --- a/rslib/src/media/database.rs +++ b/rslib/src/media/database.rs @@ -2,7 +2,6 @@ // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html use crate::err::Result; -use crate::media::MediaManager; use rusqlite::{params, Connection, OptionalExtension, Statement, NO_PARAMS}; use std::collections::HashMap; use std::path::Path; @@ -62,7 +61,7 @@ macro_rules! cached_sql { }}; } -pub struct MediaDatabaseSession<'a> { +pub struct MediaDatabaseContext<'a> { db: &'a Connection, get_entry_stmt: Option>, @@ -70,7 +69,53 @@ pub struct MediaDatabaseSession<'a> { remove_entry_stmt: Option>, } -impl MediaDatabaseSession<'_> { +impl MediaDatabaseContext<'_> { + pub(super) fn new(db: &Connection) -> MediaDatabaseContext { + MediaDatabaseContext { + db, + get_entry_stmt: None, + update_entry_stmt: None, + remove_entry_stmt: None, + } + } + + /// Call the provided closure with a session that exists for + /// the duration of the call. + /// + /// This function should be used for read-only requests. To mutate + /// the database, use transact() instead. + pub(super) fn query(db: &Connection, func: F) -> Result + where + F: FnOnce(&mut MediaDatabaseContext) -> Result, + { + func(&mut Self::new(db)) + } + + /// Execute the provided closure in a transaction, rolling back if + /// an error is returned. + pub(super) fn transact(db: &Connection, func: F) -> Result + where + F: FnOnce(&mut MediaDatabaseContext) -> Result, + { + MediaDatabaseContext::query(db, |ctx| { + ctx.begin()?; + + let mut res = func(ctx); + + if res.is_ok() { + if let Err(e) = ctx.commit() { + res = Err(e); + } + } + + if res.is_err() { + ctx.rollback()?; + } + + res + }) + } + fn begin(&mut self) -> Result<()> { self.db.execute_batch("begin").map_err(Into::into) } @@ -79,7 +124,7 @@ impl MediaDatabaseSession<'_> { self.db.execute_batch("commit").map_err(Into::into) } - pub(super) fn rollback(&mut self) -> Result<()> { + fn rollback(&mut self) -> Result<()> { self.db.execute_batch("rollback").map_err(Into::into) } @@ -216,88 +261,6 @@ delete from media where fname=?" } } -impl MediaManager { - pub fn get_entry(&mut self, fname: &str) -> Result> { - self.query(|ctx| ctx.get_entry(fname)) - } - - pub fn set_entry(&mut self, entry: &MediaEntry) -> Result<()> { - self.transact(|ctx| ctx.set_entry(entry)) - } - - pub fn remove_entry(&mut self, fname: &str) -> Result<()> { - self.transact(|ctx| ctx.remove_entry(fname)) - } - - pub fn get_meta(&mut self) -> Result { - self.query(|ctx| ctx.get_meta()) - } - - pub fn set_meta(&mut self, meta: &MediaDatabaseMetadata) -> Result<()> { - self.transact(|ctx| ctx.set_meta(meta)) - } - - pub fn clear(&mut self) -> Result<()> { - self.transact(|ctx| ctx.clear()) - } - - pub fn changes_pending(&mut self) -> Result { - self.query(|ctx| ctx.changes_pending()) - } - - pub fn count(&mut self) -> Result { - self.query(|ctx| ctx.count()) - } - - pub fn get_pending_uploads(&mut self, max_entries: u32) -> Result> { - self.query(|ctx| ctx.get_pending_uploads(max_entries)) - } - - /// Call the provided closure with a session that exists for - /// the duration of the call. - /// - /// This function should be used for read-only requests. To mutate - /// the database, use transact() instead. - pub(super) fn query(&self, func: F) -> Result - where - F: FnOnce(&mut MediaDatabaseSession) -> Result, - { - let mut session = MediaDatabaseSession { - db: &self.db, - get_entry_stmt: None, - update_entry_stmt: None, - remove_entry_stmt: None, - }; - - func(&mut session) - } - - /// Execute the provided closure in a transaction, rolling back if - /// an error is returned. - pub(super) fn transact(&self, func: F) -> Result - where - F: FnOnce(&mut MediaDatabaseSession) -> Result, - { - self.query(|ctx| { - ctx.begin()?; - - let mut res = func(ctx); - - if res.is_ok() { - if let Err(e) = ctx.commit() { - res = Err(e); - } - } - - if res.is_err() { - ctx.rollback()?; - } - - res - }) - } -} - #[cfg(test)] mod test { use crate::err::Result; @@ -310,49 +273,55 @@ mod test { fn test_database() -> Result<()> { let db_file = NamedTempFile::new()?; let db_file_path = db_file.path().to_str().unwrap(); - let mut db = MediaManager::new("/dummy", db_file_path)?; + let mut mgr = MediaManager::new("/dummy", db_file_path)?; - // no entry exists yet - assert_eq!(db.get_entry("test.mp3")?, None); + mgr.transact(|ctx| { + // no entry exists yet + assert_eq!(ctx.get_entry("test.mp3")?, None); - // add one - let mut entry = MediaEntry { - fname: "test.mp3".into(), - sha1: None, - mtime: 0, - sync_required: false, - }; - db.set_entry(&entry)?; - assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry); + // add one + let mut entry = MediaEntry { + fname: "test.mp3".into(), + sha1: None, + mtime: 0, + sync_required: false, + }; + ctx.set_entry(&entry)?; + assert_eq!(ctx.get_entry("test.mp3")?.unwrap(), entry); - // update it - entry.sha1 = Some(sha1_of_data("hello".as_bytes())); - entry.mtime = 123; - entry.sync_required = true; - db.set_entry(&entry)?; - assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry); + // update it + entry.sha1 = Some(sha1_of_data("hello".as_bytes())); + entry.mtime = 123; + entry.sync_required = true; + ctx.set_entry(&entry)?; + assert_eq!(ctx.get_entry("test.mp3")?.unwrap(), entry); - assert_eq!(db.get_pending_uploads(25)?, vec![entry]); + assert_eq!(ctx.get_pending_uploads(25)?, vec![entry]); - let mut meta = db.get_meta()?; - assert_eq!(meta.folder_mtime, 0); - assert_eq!(meta.last_sync_usn, 0); + let mut meta = ctx.get_meta()?; + assert_eq!(meta.folder_mtime, 0); + assert_eq!(meta.last_sync_usn, 0); - meta.folder_mtime = 123; - meta.last_sync_usn = 321; + meta.folder_mtime = 123; + meta.last_sync_usn = 321; - db.set_meta(&meta)?; + ctx.set_meta(&meta)?; - meta = db.get_meta()?; - assert_eq!(meta.folder_mtime, 123); - assert_eq!(meta.last_sync_usn, 321); + meta = ctx.get_meta()?; + assert_eq!(meta.folder_mtime, 123); + assert_eq!(meta.last_sync_usn, 321); - // reopen database, and ensure data was committed - drop(db); - db = MediaManager::new("/dummy", db_file_path)?; - meta = db.get_meta()?; - assert_eq!(meta.folder_mtime, 123); + Ok(()) + })?; - Ok(()) + // reopen database and ensure data was committed + drop(mgr); + mgr = MediaManager::new("/dummy", db_file_path)?; + mgr.query(|ctx| { + let meta = ctx.get_meta()?; + assert_eq!(meta.folder_mtime, 123); + + Ok(()) + }) } } diff --git a/rslib/src/media/mod.rs b/rslib/src/media/mod.rs index 3f92a17b7..8ac46a616 100644 --- a/rslib/src/media/mod.rs +++ b/rslib/src/media/mod.rs @@ -2,7 +2,7 @@ // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html use crate::err::Result; -use crate::media::database::{open_or_create, MediaEntry}; +use crate::media::database::{open_or_create, MediaDatabaseContext, MediaEntry}; use crate::media::files::{ add_data_to_folder_uniquely, mtime_as_i64, sha1_of_data, sha1_of_file, MEDIA_SYNC_FILESIZE_LIMIT, NONSYNCABLE_FILENAME, @@ -95,150 +95,199 @@ impl MediaManager { /// Note any added/changed/deleted files. pub fn register_changes(&mut self) -> Result<()> { - // folder mtime unchanged? - let dirmod = mtime_as_i64(&self.media_folder)?; - let mut meta = self.get_meta()?; - if dirmod == meta.folder_mtime { - return Ok(()); - } else { - meta.folder_mtime = dirmod; - } + self.transact(|ctx| { + // folder mtime unchanged? + let dirmod = mtime_as_i64(&self.media_folder)?; - let mtimes = self.query(|ctx| ctx.all_mtimes())?; + let mut meta = ctx.get_meta()?; + if dirmod == meta.folder_mtime { + return Ok(()); + } else { + meta.folder_mtime = dirmod; + } - let (changed, removed) = self.media_folder_changes(mtimes)?; + let mtimes = ctx.all_mtimes()?; - self.add_updated_entries(changed)?; - self.remove_deleted_files(removed)?; + let (changed, removed) = media_folder_changes(&self.media_folder, mtimes)?; - self.set_meta(&meta)?; + add_updated_entries(ctx, changed)?; + remove_deleted_files(ctx, removed)?; - Ok(()) + ctx.set_meta(&meta)?; + + Ok(()) + }) } - /// Scan through the media folder, finding changes. - /// Returns (added/changed files, removed files). - /// - /// Checks for invalid filenames and unicode normalization are deferred - /// until syncing time, as we can't trust the entries previous Anki versions - /// wrote are correct. - fn media_folder_changes( - &self, - mut mtimes: HashMap, - ) -> Result<(Vec, Vec)> { - let mut added_or_changed = vec![]; + // syncDelete + pub fn remove_entry(&mut self, fname: &str) -> Result<()> { + self.transact(|ctx| ctx.remove_entry(fname)) + } - // loop through on-disk files - for dentry in self.media_folder.read_dir()? { - let dentry = dentry?; + // forceResync + pub fn clear(&mut self) -> Result<()> { + self.transact(|ctx| ctx.clear()) + } - // skip folders - if dentry.file_type()?.is_dir() { + // lastUsn + pub fn get_last_usn(&mut self) -> Result { + self.query(|ctx| Ok(ctx.get_meta()?.last_sync_usn)) + } + + // setLastUsn + pub fn set_last_usn(&mut self, usn: i32) -> Result<()> { + self.transact(|ctx| { + let mut meta = ctx.get_meta()?; + meta.last_sync_usn = usn; + ctx.set_meta(&meta) + }) + } + + // dirtyCount + pub fn changes_pending(&mut self) -> Result { + self.query(|ctx| ctx.changes_pending()) + } + + // mediaCount + pub fn count(&mut self) -> Result { + self.query(|ctx| ctx.count()) + } + + // mediaChangesZip + pub fn get_pending_uploads(&mut self, max_entries: u32) -> Result> { + self.query(|ctx| ctx.get_pending_uploads(max_entries)) + } + + // db helpers + + pub(super) fn query(&self, func: F) -> Result + where + F: FnOnce(&mut MediaDatabaseContext) -> Result, + { + MediaDatabaseContext::query(&self.db, func) + } + + pub(super) fn transact(&self, func: F) -> Result + where + F: FnOnce(&mut MediaDatabaseContext) -> Result, + { + MediaDatabaseContext::transact(&self.db, func) + } +} + +/// Scan through the media folder, finding changes. +/// Returns (added/changed files, removed files). +/// +/// Checks for invalid filenames and unicode normalization are deferred +/// until syncing time, as we can't trust the entries previous Anki versions +/// wrote are correct. +fn media_folder_changes( + media_folder: &Path, + mut mtimes: HashMap, +) -> Result<(Vec, Vec)> { + let mut added_or_changed = vec![]; + + // loop through on-disk files + for dentry in media_folder.read_dir()? { + let dentry = dentry?; + + // skip folders + if dentry.file_type()?.is_dir() { + continue; + } + + // if the filename is not valid unicode, skip it + let fname_os = dentry.file_name(); + let fname = match fname_os.to_str() { + Some(s) => s, + None => continue, + }; + + // ignore blacklisted files + if NONSYNCABLE_FILENAME.is_match(fname) { + continue; + } + + // ignore large files + let metadata = dentry.metadata()?; + if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 { + continue; + } + + // remove from mtimes for later deletion tracking + let previous_mtime = mtimes.remove(fname); + + // skip files that have not been modified + let mtime = metadata + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + if let Some(previous_mtime) = previous_mtime { + if previous_mtime == mtime { continue; } - - // if the filename is not valid unicode, skip it - let fname_os = dentry.file_name(); - let fname = match fname_os.to_str() { - Some(s) => s, - None => continue, - }; - - // ignore blacklisted files - if NONSYNCABLE_FILENAME.is_match(fname) { - continue; - } - - // ignore large files - let metadata = dentry.metadata()?; - if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 { - continue; - } - - // remove from mtimes for later deletion tracking - let previous_mtime = mtimes.remove(fname); - - // skip files that have not been modified - let mtime = metadata - .modified()? - .duration_since(time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64; - if let Some(previous_mtime) = previous_mtime { - if previous_mtime == mtime { - continue; - } - } - - // add entry to the list - let sha1 = Some(sha1_of_file(&dentry.path())?); - added_or_changed.push(FilesystemEntry { - fname: fname.to_string(), - sha1, - mtime, - is_new: previous_mtime.is_none(), - }); } - // any remaining entries from the database have been deleted - let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect(); - - Ok((added_or_changed, removed)) + // add entry to the list + let sha1 = Some(sha1_of_file(&dentry.path())?); + added_or_changed.push(FilesystemEntry { + fname: fname.to_string(), + sha1, + mtime, + is_new: previous_mtime.is_none(), + }); } - /// Add added/updated entries to the media DB. - /// - /// Skip files where the mod time differed, but checksums are the same. - fn add_updated_entries(&mut self, entries: Vec) -> Result<()> { - for chunk in entries.chunks(1_024) { - self.transact(|ctx| { - for fentry in chunk { - let mut sync_required = true; - if !fentry.is_new { - if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { - if db_entry.sha1 == fentry.sha1 { - // mtime bumped but file contents are the same, - // so we can preserve the current updated flag. - // we still need to update the mtime however. - sync_required = db_entry.sync_required - } - } - }; + // any remaining entries from the database have been deleted + let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect(); - ctx.set_entry(&MediaEntry { - fname: fentry.fname.clone(), - sha1: fentry.sha1, - mtime: fentry.mtime, - sync_required, - })?; + Ok((added_or_changed, removed)) +} + +/// Add added/updated entries to the media DB. +/// +/// Skip files where the mod time differed, but checksums are the same. +fn add_updated_entries( + ctx: &mut MediaDatabaseContext, + entries: Vec, +) -> Result<()> { + for fentry in entries { + let mut sync_required = true; + if !fentry.is_new { + if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { + if db_entry.sha1 == fentry.sha1 { + // mtime bumped but file contents are the same, + // so we can preserve the current updated flag. + // we still need to update the mtime however. + sync_required = db_entry.sync_required } + } + }; - Ok(()) - })?; - } - - Ok(()) + ctx.set_entry(&MediaEntry { + fname: fentry.fname, + sha1: fentry.sha1, + mtime: fentry.mtime, + sync_required, + })?; } - /// Remove deleted files from the media DB. - fn remove_deleted_files(&mut self, removed: Vec) -> Result<()> { - for chunk in removed.chunks(4_096) { - self.transact(|ctx| { - for fname in chunk { - ctx.set_entry(&MediaEntry { - fname: fname.clone(), - sha1: None, - mtime: 0, - sync_required: true, - })?; - } + Ok(()) +} - Ok(()) - })?; - } - - Ok(()) +/// Remove deleted files from the media DB. +fn remove_deleted_files(ctx: &mut MediaDatabaseContext, removed: Vec) -> Result<()> { + for fname in removed { + ctx.set_entry(&MediaEntry { + fname, + sha1: None, + mtime: 0, + sync_required: true, + })?; } + + Ok(()) } #[cfg(test)] @@ -271,7 +320,10 @@ mod test { let media_db = dir.path().join("media.db"); let mut mgr = MediaManager::new(&media_dir, media_db)?; - assert_eq!(mgr.count()?, 0); + mgr.query(|ctx| { + assert_eq!(ctx.count()?, 0); + Ok(()) + })?; // add a file and check it's picked up let f1 = media_dir.join("file.jpg"); @@ -280,57 +332,66 @@ mod test { change_mtime(&media_dir); mgr.register_changes()?; - assert_eq!(mgr.count()?, 1); - assert_eq!(mgr.changes_pending()?, 1); - let mut entry = mgr.get_entry("file.jpg")?.unwrap(); - assert_eq!( - entry, - MediaEntry { - fname: "file.jpg".into(), - sha1: Some(sha1_of_data("hello".as_bytes())), - mtime: f1 - .metadata()? - .modified()? - .duration_since(time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - sync_required: true, - } - ); + let mut entry = mgr.transact(|ctx| { + assert_eq!(ctx.count()?, 1); + assert_eq!(ctx.changes_pending()?, 1); + let mut entry = ctx.get_entry("file.jpg")?.unwrap(); + assert_eq!( + entry, + MediaEntry { + fname: "file.jpg".into(), + sha1: Some(sha1_of_data("hello".as_bytes())), + mtime: f1 + .metadata()? + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + sync_required: true, + } + ); - // mark it as unmodified - entry.sync_required = false; - mgr.set_entry(&entry)?; - assert_eq!(mgr.changes_pending()?, 0); + // mark it as unmodified + entry.sync_required = false; + ctx.set_entry(&entry)?; + assert_eq!(ctx.changes_pending()?, 0); - // modify it - fs::write(&f1, "hello1")?; - change_mtime(&f1); + // modify it + fs::write(&f1, "hello1")?; + change_mtime(&f1); + + change_mtime(&media_dir); + + Ok(entry) + })?; - change_mtime(&media_dir); mgr.register_changes()?; - assert_eq!(mgr.count()?, 1); - assert_eq!(mgr.changes_pending()?, 1); - assert_eq!( - mgr.get_entry("file.jpg")?.unwrap(), - MediaEntry { - fname: "file.jpg".into(), - sha1: Some(sha1_of_data("hello1".as_bytes())), - mtime: f1 - .metadata()? - .modified()? - .duration_since(time::UNIX_EPOCH) - .unwrap() - .as_secs() as i64, - sync_required: true, - } - ); + mgr.transact(|ctx| { + assert_eq!(ctx.count()?, 1); + assert_eq!(ctx.changes_pending()?, 1); + assert_eq!( + ctx.get_entry("file.jpg")?.unwrap(), + MediaEntry { + fname: "file.jpg".into(), + sha1: Some(sha1_of_data("hello1".as_bytes())), + mtime: f1 + .metadata()? + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + sync_required: true, + } + ); - // mark it as unmodified - entry.sync_required = false; - mgr.set_entry(&entry)?; - assert_eq!(mgr.changes_pending()?, 0); + // mark it as unmodified + entry.sync_required = false; + ctx.set_entry(&entry)?; + assert_eq!(ctx.changes_pending()?, 0); + + Ok(()) + })?; // delete it fs::remove_file(&f1)?; @@ -338,18 +399,20 @@ mod test { change_mtime(&media_dir); mgr.register_changes().unwrap(); - assert_eq!(mgr.count()?, 0); - assert_eq!(mgr.changes_pending()?, 1); - assert_eq!( - mgr.get_entry("file.jpg")?.unwrap(), - MediaEntry { - fname: "file.jpg".into(), - sha1: None, - mtime: 0, - sync_required: true, - } - ); + mgr.query(|ctx| { + assert_eq!(ctx.count()?, 0); + assert_eq!(ctx.changes_pending()?, 1); + assert_eq!( + ctx.get_entry("file.jpg")?.unwrap(), + MediaEntry { + fname: "file.jpg".into(), + sha1: None, + mtime: 0, + sync_required: true, + } + ); - Ok(()) + Ok(()) + }) } }