From 96f0a5cc3c1c876ea9e16f6b3439d690e55f65d4 Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Wed, 29 Jan 2020 21:08:02 +1000 Subject: [PATCH] port change tracking --- rslib/Cargo.toml | 3 + rslib/src/media/database.rs | 68 +++++---- rslib/src/media/files.rs | 293 +++++++++++++++++++++++++++++++++++- rslib/src/media/mod.rs | 24 +++ 4 files changed, 357 insertions(+), 31 deletions(-) diff --git a/rslib/Cargo.toml b/rslib/Cargo.toml index 9e3abfc9a..972d1dd3d 100644 --- a/rslib/Cargo.toml +++ b/rslib/Cargo.toml @@ -24,3 +24,6 @@ rusqlite = "0.21.0" [build-dependencies] prost-build = "0.5.0" +[dev-dependencies] +utime = "0.2.1" + diff --git a/rslib/src/media/database.rs b/rslib/src/media/database.rs index b127f305d..8b5677932 100644 --- a/rslib/src/media/database.rs +++ b/rslib/src/media/database.rs @@ -2,9 +2,12 @@ // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html use crate::err::Result; +use crate::media::MediaManager; use rusqlite::{params, Connection, OptionalExtension, Statement, NO_PARAMS}; +use std::collections::HashMap; +use std::path::Path; -fn open_or_create(path: &str) -> Result { +pub(super) fn open_or_create>(path: P) -> Result { let mut db = Connection::open(path)?; db.pragma_update(None, "locking_mode", &"exclusive")?; @@ -41,7 +44,7 @@ pub struct MediaEntry { // Modification time; 0 if deleted pub mtime: i64, /// True if changed since last sync - pub dirty: bool, + pub sync_required: bool, } #[derive(Debug, PartialEq)] @@ -77,11 +80,11 @@ impl MediaDatabaseSession<'_> { self.db.execute_batch("commit").map_err(Into::into) } - fn rollback(&mut self) -> Result<()> { + pub(super) fn rollback(&mut self) -> Result<()> { self.db.execute_batch("rollback").map_err(Into::into) } - fn get_entry(&mut self, fname: &str) -> Result> { + pub(super) fn get_entry(&mut self, fname: &str) -> Result> { let stmt = cached_sql!( self.get_entry_stmt, self.db, @@ -106,14 +109,14 @@ select fname, csum, mtime, dirty from media where fname=?" fname: row.get(0)?, sha1: sha1_array, mtime: row.get(2)?, - dirty: row.get(3)?, + sync_required: row.get(3)?, }) }) .optional() .map_err(Into::into) } - fn set_entry(&mut self, entry: &MediaEntry) -> Result<()> { + pub(super) fn set_entry(&mut self, entry: &MediaEntry) -> Result<()> { let stmt = cached_sql!( self.update_entry_stmt, self.db, @@ -123,12 +126,17 @@ values (?, ?, ?, ?)" ); let sha1_str = entry.sha1.map(hex::encode); - stmt.execute(params![entry.fname, sha1_str, entry.mtime, entry.dirty])?; + stmt.execute(params![ + entry.fname, + sha1_str, + entry.mtime, + entry.sync_required + ])?; Ok(()) } - fn remove_entry(&mut self, fname: &str) -> Result<()> { + pub(super) fn remove_entry(&mut self, fname: &str) -> Result<()> { let stmt = cached_sql!( self.remove_entry_stmt, self.db, @@ -141,7 +149,7 @@ delete from media where fname=?" Ok(()) } - fn get_meta(&mut self) -> Result { + pub(super) fn get_meta(&mut self) -> Result { let mut stmt = self.db.prepare("select dirMod, lastUsn from meta")?; stmt.query_row(NO_PARAMS, |row| { @@ -153,20 +161,20 @@ delete from media where fname=?" .map_err(Into::into) } - fn set_meta(&mut self, meta: &MediaDatabaseMetadata) -> Result<()> { + pub(super) fn set_meta(&mut self, meta: &MediaDatabaseMetadata) -> Result<()> { let mut stmt = self.db.prepare("update meta set dirMod = ?, lastUsn = ?")?; stmt.execute(params![meta.folder_mtime, meta.last_sync_usn])?; Ok(()) } - fn clear(&mut self) -> Result<()> { + pub(super) fn clear(&mut self) -> Result<()> { self.db .execute_batch("delete from media; update meta set lastUsn = 0, dirMod = 0") .map_err(Into::into) } - fn changes_pending(&mut self) -> Result { + pub(super) fn changes_pending(&mut self) -> Result { self.db .query_row( "select count(*) from media where dirty=1", @@ -176,7 +184,7 @@ delete from media where fname=?" .map_err(Into::into) } - fn count(&mut self) -> Result { + pub(super) fn count(&mut self) -> Result { self.db .query_row( "select count(*) from media where csum is not null", @@ -186,7 +194,7 @@ delete from media where fname=?" .map_err(Into::into) } - fn get_pending_uploads(&mut self, max_entries: u32) -> Result> { + pub(super) fn get_pending_uploads(&mut self, max_entries: u32) -> Result> { let mut stmt = self .db .prepare("select fname from media where dirty=1 limit ?")?; @@ -199,18 +207,17 @@ delete from media where fname=?" results } -} -pub struct MediaDatabase { - db: Connection, -} - -impl MediaDatabase { - pub fn new(path: &str) -> Result { - let db = open_or_create(path)?; - Ok(MediaDatabase { db }) + pub(super) fn all_mtimes(&mut self) -> Result> { + let mut stmt = self.db.prepare("select fname, mtime from media")?; + let map: std::result::Result, rusqlite::Error> = stmt + .query_map(NO_PARAMS, |row| Ok((row.get(0)?, row.get(1)?)))? + .collect(); + Ok(map?) } +} +impl MediaManager { pub fn get_entry(&mut self, fname: &str) -> Result> { self.query(|ctx| ctx.get_entry(fname)) } @@ -252,7 +259,7 @@ impl MediaDatabase { /// /// This function should be used for read-only requests. To mutate /// the database, use transact() instead. - fn query(&self, func: F) -> Result + pub(super) fn query(&self, func: F) -> Result where F: FnOnce(&mut MediaDatabaseSession) -> Result, { @@ -268,7 +275,7 @@ impl MediaDatabase { /// Execute the provided closure in a transaction, rolling back if /// an error is returned. - fn transact(&self, func: F) -> Result + pub(super) fn transact(&self, func: F) -> Result where F: FnOnce(&mut MediaDatabaseSession) -> Result, { @@ -295,15 +302,16 @@ impl MediaDatabase { #[cfg(test)] mod test { use crate::err::Result; - use crate::media::database::{MediaDatabase, MediaEntry}; + use crate::media::database::MediaEntry; use crate::media::files::sha1_of_data; + use crate::media::MediaManager; use tempfile::NamedTempFile; #[test] fn test_database() -> Result<()> { let db_file = NamedTempFile::new()?; let db_file_path = db_file.path().to_str().unwrap(); - let mut db = MediaDatabase::new(db_file_path)?; + let mut db = MediaManager::new("/dummy", db_file_path)?; // no entry exists yet assert_eq!(db.get_entry("test.mp3")?, None); @@ -313,7 +321,7 @@ mod test { fname: "test.mp3".into(), sha1: None, mtime: 0, - dirty: false, + sync_required: false, }; db.set_entry(&entry)?; assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry); @@ -321,7 +329,7 @@ mod test { // update it entry.sha1 = Some(sha1_of_data("hello".as_bytes())); entry.mtime = 123; - entry.dirty = true; + entry.sync_required = true; db.set_entry(&entry)?; assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry); @@ -342,7 +350,7 @@ mod test { // reopen database, and ensure data was committed drop(db); - db = MediaDatabase::new(db_file_path)?; + db = MediaManager::new("/dummy", db_file_path)?; meta = db.get_meta()?; assert_eq!(meta.folder_mtime, 123); diff --git a/rslib/src/media/files.rs b/rslib/src/media/files.rs index bf2e39eb1..8ad12a431 100644 --- a/rslib/src/media/files.rs +++ b/rslib/src/media/files.rs @@ -1,13 +1,17 @@ // Copyright: Ankitects Pty Ltd and contributors // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +use crate::err::Result; +use crate::media::database::MediaEntry; +use crate::media::MediaManager; use lazy_static::lazy_static; use regex::Regex; use sha1::Sha1; use std::borrow::Cow; +use std::collections::HashMap; use std::io::Read; use std::path::Path; -use std::{fs, io}; +use std::{fs, io, time}; use unicode_normalization::{is_nfc_quick, IsNormalized, UnicodeNormalization}; /// The maximum length we allow a filename to be. When combined @@ -16,6 +20,9 @@ use unicode_normalization::{is_nfc_quick, IsNormalized, UnicodeNormalization}; /// the length of the filename. static MAX_FILENAME_LENGTH: usize = 120; +/// Media syncing does not support files over 100MiB. +static MEDIA_SYNC_FILESIZE_LIMIT: usize = 100 * 1024 * 1024; + lazy_static! { static ref WINDOWS_DEVICE_NAME: Regex = Regex::new( r#"(?xi) @@ -31,6 +38,16 @@ lazy_static! { "# ) .unwrap(); + static ref NONSYNCABLE_FILENAME: Regex = Regex::new( + r#"(?xi) + ^ + (:? + thumbs.db | .ds_store + ) + $ + "# + ) + .unwrap(); } /// True if character may cause problems on one or more platforms. @@ -209,14 +226,187 @@ pub(crate) fn sha1_of_data(data: &[u8]) -> [u8; 20] { hasher.digest().bytes() } +struct FilesystemEntry { + fname: String, + sha1: Option<[u8; 20]>, + mtime: i64, + is_new: bool, +} + +impl MediaManager { + /// Note any added/changed/deleted files. + /// + /// In the future, we could register files in the media DB as they + /// are added, meaning that for users who don't modify files externally, the + /// folder scan could be skipped. + pub fn register_changes(&mut self) -> Result<()> { + // folder mtime unchanged? + let media_dir_modified = self + .media_folder + .metadata()? + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + let mut meta = self.get_meta()?; + if media_dir_modified == meta.folder_mtime { + return Ok(()); + } else { + meta.folder_mtime = media_dir_modified; + } + + let mtimes = self.query(|ctx| ctx.all_mtimes())?; + + let (changed, removed) = self.media_folder_changes(mtimes)?; + + self.add_updated_entries(changed)?; + self.remove_deleted_files(removed)?; + + self.set_meta(&meta)?; + + Ok(()) + } + + /// Scan through the media folder, finding changes. + /// Returns (added/changed files, removed files). + /// + /// Checks for invalid filenames and unicode normalization are deferred + /// until syncing time, as we can't trust the entries previous Anki versions + /// wrote are correct. + fn media_folder_changes( + &self, + mut mtimes: HashMap, + ) -> Result<(Vec, Vec)> { + let mut added_or_changed = vec![]; + + // loop through on-disk files + for dentry in self.media_folder.read_dir()? { + let dentry = dentry?; + + // skip folders + if dentry.file_type()?.is_dir() { + continue; + } + + // if the filename is not valid unicode, skip it + let fname_os = dentry.file_name(); + let fname = match fname_os.to_str() { + Some(s) => s, + None => continue, + }; + + // ignore blacklisted files + if NONSYNCABLE_FILENAME.is_match(fname) { + continue; + } + + // ignore large files + let metadata = dentry.metadata()?; + if metadata.len() > MEDIA_SYNC_FILESIZE_LIMIT as u64 { + continue; + } + + // remove from mtimes for later deletion tracking + let previous_mtime = mtimes.remove(fname); + + // skip files that have not been modified + let mtime = metadata + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + if let Some(previous_mtime) = previous_mtime { + if previous_mtime == mtime { + continue; + } + } + + // add entry to the list + let sha1 = Some(sha1_of_file(&dentry.path())?); + added_or_changed.push(FilesystemEntry { + fname: fname.to_string(), + sha1, + mtime, + is_new: previous_mtime.is_none(), + }); + } + + // any remaining entries from the database have been deleted + let removed: Vec<_> = mtimes.into_iter().map(|(k, _)| k).collect(); + + Ok((added_or_changed, removed)) + } + + /// Add added/updated entries to the media DB. + /// + /// Skip files where the mod time differed, but checksums are the same. + fn add_updated_entries(&mut self, entries: Vec) -> Result<()> { + for chunk in entries.chunks(1_024) { + self.transact(|ctx| { + for fentry in chunk { + let mut sync_required = true; + if !fentry.is_new { + if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { + if db_entry.sha1 == fentry.sha1 { + // mtime bumped but file contents are the same, + // so we can preserve the current updated flag. + // we still need to update the mtime however. + sync_required = db_entry.sync_required + } + } + }; + + ctx.set_entry(&MediaEntry { + fname: fentry.fname.clone(), + sha1: fentry.sha1, + mtime: fentry.mtime, + sync_required, + })?; + } + + Ok(()) + })?; + } + + Ok(()) + } + + /// Remove deleted files from the media DB. + fn remove_deleted_files(&mut self, removed: Vec) -> Result<()> { + for chunk in removed.chunks(4_096) { + self.transact(|ctx| { + for fname in chunk { + ctx.set_entry(&MediaEntry { + fname: fname.clone(), + sha1: None, + mtime: 0, + sync_required: true, + })?; + } + + Ok(()) + })?; + } + + Ok(()) + } +} + #[cfg(test)] mod test { + use crate::err::Result; + use crate::media::database::MediaEntry; use crate::media::files::{ add_data_to_folder_uniquely, add_hash_suffix_to_file_stem, normalize_filename, sha1_of_data, MAX_FILENAME_LENGTH, }; + use crate::media::MediaManager; use std::borrow::Cow; + use std::path::Path; + use std::time::Duration; + use std::{fs, time}; use tempfile::tempdir; + use utime; #[test] fn test_normalize() { @@ -278,4 +468,105 @@ mod test { ] ); } + + // helper + fn change_mtime(p: &Path) { + let mtime = p.metadata().unwrap().modified().unwrap(); + let new_mtime = mtime - Duration::from_secs(3); + let secs = new_mtime + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs(); + utime::set_file_times(p, secs, secs).unwrap(); + } + + #[test] + fn test_change_tracking() -> Result<()> { + let dir = tempdir()?; + let media_dir = dir.path().join("media"); + std::fs::create_dir(&media_dir)?; + let media_db = dir.path().join("media.db"); + + let mut mgr = MediaManager::new(&media_dir, media_db)?; + assert_eq!(mgr.count()?, 0); + + // add a file and check it's picked up + let f1 = media_dir.join("file.jpg"); + fs::write(&f1, "hello")?; + + change_mtime(&media_dir); + mgr.register_changes()?; + + assert_eq!(mgr.count()?, 1); + assert_eq!(mgr.changes_pending()?, 1); + let mut entry = mgr.get_entry("file.jpg")?.unwrap(); + assert_eq!( + entry, + MediaEntry { + fname: "file.jpg".into(), + sha1: Some(sha1_of_data("hello".as_bytes())), + mtime: f1 + .metadata()? + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + sync_required: true, + } + ); + + // mark it as unmodified + entry.sync_required = false; + mgr.set_entry(&entry)?; + assert_eq!(mgr.changes_pending()?, 0); + + // modify it + fs::write(&f1, "hello1")?; + change_mtime(&f1); + + change_mtime(&media_dir); + mgr.register_changes()?; + + assert_eq!(mgr.count()?, 1); + assert_eq!(mgr.changes_pending()?, 1); + assert_eq!( + mgr.get_entry("file.jpg")?.unwrap(), + MediaEntry { + fname: "file.jpg".into(), + sha1: Some(sha1_of_data("hello1".as_bytes())), + mtime: f1 + .metadata()? + .modified()? + .duration_since(time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + sync_required: true, + } + ); + + // mark it as unmodified + entry.sync_required = false; + mgr.set_entry(&entry)?; + assert_eq!(mgr.changes_pending()?, 0); + + // delete it + fs::remove_file(&f1)?; + + change_mtime(&media_dir); + mgr.register_changes().unwrap(); + + assert_eq!(mgr.count()?, 0); + assert_eq!(mgr.changes_pending()?, 1); + assert_eq!( + mgr.get_entry("file.jpg")?.unwrap(), + MediaEntry { + fname: "file.jpg".into(), + sha1: None, + mtime: 0, + sync_required: true, + } + ); + + Ok(()) + } } diff --git a/rslib/src/media/mod.rs b/rslib/src/media/mod.rs index 3d1580545..440d260d6 100644 --- a/rslib/src/media/mod.rs +++ b/rslib/src/media/mod.rs @@ -1,2 +1,26 @@ +use crate::err::Result; +use crate::media::database::open_or_create; +use rusqlite::Connection; +use std::path::{Path, PathBuf}; + pub mod database; pub mod files; + +pub struct MediaManager { + db: Connection, + media_folder: PathBuf, +} + +impl MediaManager { + pub fn new(media_folder: P, media_db: P2) -> Result + where + P: Into, + P2: AsRef, + { + let db = open_or_create(media_db.as_ref())?; + Ok(MediaManager { + db, + media_folder: media_folder.into(), + }) + } +}