more refactoring

This commit is contained in:
Damien Elmes 2020-01-30 13:48:29 +10:00
parent 10f64d54b8
commit ec8a91b493
2 changed files with 333 additions and 301 deletions

View file

@ -2,7 +2,6 @@
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use crate::err::Result; use crate::err::Result;
use crate::media::MediaManager;
use rusqlite::{params, Connection, OptionalExtension, Statement, NO_PARAMS}; use rusqlite::{params, Connection, OptionalExtension, Statement, NO_PARAMS};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
@ -62,7 +61,7 @@ macro_rules! cached_sql {
}}; }};
} }
pub struct MediaDatabaseSession<'a> { pub struct MediaDatabaseContext<'a> {
db: &'a Connection, db: &'a Connection,
get_entry_stmt: Option<Statement<'a>>, get_entry_stmt: Option<Statement<'a>>,
@ -70,7 +69,53 @@ pub struct MediaDatabaseSession<'a> {
remove_entry_stmt: Option<Statement<'a>>, remove_entry_stmt: Option<Statement<'a>>,
} }
impl MediaDatabaseSession<'_> { impl MediaDatabaseContext<'_> {
pub(super) fn new(db: &Connection) -> MediaDatabaseContext {
MediaDatabaseContext {
db,
get_entry_stmt: None,
update_entry_stmt: None,
remove_entry_stmt: None,
}
}
/// Call the provided closure with a session that exists for
/// the duration of the call.
///
/// This function should be used for read-only requests. To mutate
/// the database, use transact() instead.
pub(super) fn query<F, R>(db: &Connection, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{
func(&mut Self::new(db))
}
/// Execute the provided closure in a transaction, rolling back if
/// an error is returned.
pub(super) fn transact<F, R>(db: &Connection, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{
MediaDatabaseContext::query(db, |ctx| {
ctx.begin()?;
let mut res = func(ctx);
if res.is_ok() {
if let Err(e) = ctx.commit() {
res = Err(e);
}
}
if res.is_err() {
ctx.rollback()?;
}
res
})
}
fn begin(&mut self) -> Result<()> { fn begin(&mut self) -> Result<()> {
self.db.execute_batch("begin").map_err(Into::into) self.db.execute_batch("begin").map_err(Into::into)
} }
@ -79,7 +124,7 @@ impl MediaDatabaseSession<'_> {
self.db.execute_batch("commit").map_err(Into::into) self.db.execute_batch("commit").map_err(Into::into)
} }
pub(super) fn rollback(&mut self) -> Result<()> { fn rollback(&mut self) -> Result<()> {
self.db.execute_batch("rollback").map_err(Into::into) self.db.execute_batch("rollback").map_err(Into::into)
} }
@ -216,88 +261,6 @@ delete from media where fname=?"
} }
} }
impl MediaManager {
pub fn get_entry(&mut self, fname: &str) -> Result<Option<MediaEntry>> {
self.query(|ctx| ctx.get_entry(fname))
}
pub fn set_entry(&mut self, entry: &MediaEntry) -> Result<()> {
self.transact(|ctx| ctx.set_entry(entry))
}
pub fn remove_entry(&mut self, fname: &str) -> Result<()> {
self.transact(|ctx| ctx.remove_entry(fname))
}
pub fn get_meta(&mut self) -> Result<MediaDatabaseMetadata> {
self.query(|ctx| ctx.get_meta())
}
pub fn set_meta(&mut self, meta: &MediaDatabaseMetadata) -> Result<()> {
self.transact(|ctx| ctx.set_meta(meta))
}
pub fn clear(&mut self) -> Result<()> {
self.transact(|ctx| ctx.clear())
}
pub fn changes_pending(&mut self) -> Result<u32> {
self.query(|ctx| ctx.changes_pending())
}
pub fn count(&mut self) -> Result<u32> {
self.query(|ctx| ctx.count())
}
pub fn get_pending_uploads(&mut self, max_entries: u32) -> Result<Vec<MediaEntry>> {
self.query(|ctx| ctx.get_pending_uploads(max_entries))
}
/// Call the provided closure with a session that exists for
/// the duration of the call.
///
/// This function should be used for read-only requests. To mutate
/// the database, use transact() instead.
pub(super) fn query<F, R>(&self, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseSession) -> Result<R>,
{
let mut session = MediaDatabaseSession {
db: &self.db,
get_entry_stmt: None,
update_entry_stmt: None,
remove_entry_stmt: None,
};
func(&mut session)
}
/// Execute the provided closure in a transaction, rolling back if
/// an error is returned.
pub(super) fn transact<F, R>(&self, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseSession) -> Result<R>,
{
self.query(|ctx| {
ctx.begin()?;
let mut res = func(ctx);
if res.is_ok() {
if let Err(e) = ctx.commit() {
res = Err(e);
}
}
if res.is_err() {
ctx.rollback()?;
}
res
})
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::err::Result; use crate::err::Result;
@ -310,10 +273,11 @@ mod test {
fn test_database() -> Result<()> { fn test_database() -> Result<()> {
let db_file = NamedTempFile::new()?; let db_file = NamedTempFile::new()?;
let db_file_path = db_file.path().to_str().unwrap(); let db_file_path = db_file.path().to_str().unwrap();
let mut db = MediaManager::new("/dummy", db_file_path)?; let mut mgr = MediaManager::new("/dummy", db_file_path)?;
mgr.transact(|ctx| {
// no entry exists yet // no entry exists yet
assert_eq!(db.get_entry("test.mp3")?, None); assert_eq!(ctx.get_entry("test.mp3")?, None);
// add one // add one
let mut entry = MediaEntry { let mut entry = MediaEntry {
@ -322,37 +286,42 @@ mod test {
mtime: 0, mtime: 0,
sync_required: false, sync_required: false,
}; };
db.set_entry(&entry)?; ctx.set_entry(&entry)?;
assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry); assert_eq!(ctx.get_entry("test.mp3")?.unwrap(), entry);
// update it // update it
entry.sha1 = Some(sha1_of_data("hello".as_bytes())); entry.sha1 = Some(sha1_of_data("hello".as_bytes()));
entry.mtime = 123; entry.mtime = 123;
entry.sync_required = true; entry.sync_required = true;
db.set_entry(&entry)?; ctx.set_entry(&entry)?;
assert_eq!(db.get_entry("test.mp3")?.unwrap(), entry); assert_eq!(ctx.get_entry("test.mp3")?.unwrap(), entry);
assert_eq!(db.get_pending_uploads(25)?, vec![entry]); assert_eq!(ctx.get_pending_uploads(25)?, vec![entry]);
let mut meta = db.get_meta()?; let mut meta = ctx.get_meta()?;
assert_eq!(meta.folder_mtime, 0); assert_eq!(meta.folder_mtime, 0);
assert_eq!(meta.last_sync_usn, 0); assert_eq!(meta.last_sync_usn, 0);
meta.folder_mtime = 123; meta.folder_mtime = 123;
meta.last_sync_usn = 321; meta.last_sync_usn = 321;
db.set_meta(&meta)?; ctx.set_meta(&meta)?;
meta = db.get_meta()?; meta = ctx.get_meta()?;
assert_eq!(meta.folder_mtime, 123); assert_eq!(meta.folder_mtime, 123);
assert_eq!(meta.last_sync_usn, 321); assert_eq!(meta.last_sync_usn, 321);
// reopen database, and ensure data was committed Ok(())
drop(db); })?;
db = MediaManager::new("/dummy", db_file_path)?;
meta = db.get_meta()?; // reopen database and ensure data was committed
drop(mgr);
mgr = MediaManager::new("/dummy", db_file_path)?;
mgr.query(|ctx| {
let meta = ctx.get_meta()?;
assert_eq!(meta.folder_mtime, 123); assert_eq!(meta.folder_mtime, 123);
Ok(()) Ok(())
})
} }
} }

View file

@ -2,7 +2,7 @@
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use crate::err::Result; use crate::err::Result;
use crate::media::database::{open_or_create, MediaEntry}; use crate::media::database::{open_or_create, MediaDatabaseContext, MediaEntry};
use crate::media::files::{ use crate::media::files::{
add_data_to_folder_uniquely, mtime_as_i64, sha1_of_data, sha1_of_file, add_data_to_folder_uniquely, mtime_as_i64, sha1_of_data, sha1_of_file,
MEDIA_SYNC_FILESIZE_LIMIT, NONSYNCABLE_FILENAME, MEDIA_SYNC_FILESIZE_LIMIT, NONSYNCABLE_FILENAME,
@ -95,25 +95,84 @@ impl MediaManager {
/// Note any added/changed/deleted files. /// Note any added/changed/deleted files.
pub fn register_changes(&mut self) -> Result<()> { pub fn register_changes(&mut self) -> Result<()> {
self.transact(|ctx| {
// folder mtime unchanged? // folder mtime unchanged?
let dirmod = mtime_as_i64(&self.media_folder)?; let dirmod = mtime_as_i64(&self.media_folder)?;
let mut meta = self.get_meta()?;
let mut meta = ctx.get_meta()?;
if dirmod == meta.folder_mtime { if dirmod == meta.folder_mtime {
return Ok(()); return Ok(());
} else { } else {
meta.folder_mtime = dirmod; meta.folder_mtime = dirmod;
} }
let mtimes = self.query(|ctx| ctx.all_mtimes())?; let mtimes = ctx.all_mtimes()?;
let (changed, removed) = self.media_folder_changes(mtimes)?; let (changed, removed) = media_folder_changes(&self.media_folder, mtimes)?;
self.add_updated_entries(changed)?; add_updated_entries(ctx, changed)?;
self.remove_deleted_files(removed)?; remove_deleted_files(ctx, removed)?;
self.set_meta(&meta)?; ctx.set_meta(&meta)?;
Ok(()) Ok(())
})
}
// syncDelete
pub fn remove_entry(&mut self, fname: &str) -> Result<()> {
self.transact(|ctx| ctx.remove_entry(fname))
}
// forceResync
pub fn clear(&mut self) -> Result<()> {
self.transact(|ctx| ctx.clear())
}
// lastUsn
pub fn get_last_usn(&mut self) -> Result<i32> {
self.query(|ctx| Ok(ctx.get_meta()?.last_sync_usn))
}
// setLastUsn
pub fn set_last_usn(&mut self, usn: i32) -> Result<()> {
self.transact(|ctx| {
let mut meta = ctx.get_meta()?;
meta.last_sync_usn = usn;
ctx.set_meta(&meta)
})
}
// dirtyCount
pub fn changes_pending(&mut self) -> Result<u32> {
self.query(|ctx| ctx.changes_pending())
}
// mediaCount
pub fn count(&mut self) -> Result<u32> {
self.query(|ctx| ctx.count())
}
// mediaChangesZip
pub fn get_pending_uploads(&mut self, max_entries: u32) -> Result<Vec<MediaEntry>> {
self.query(|ctx| ctx.get_pending_uploads(max_entries))
}
// db helpers
pub(super) fn query<F, R>(&self, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{
MediaDatabaseContext::query(&self.db, func)
}
pub(super) fn transact<F, R>(&self, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{
MediaDatabaseContext::transact(&self.db, func)
}
} }
/// Scan through the media folder, finding changes. /// Scan through the media folder, finding changes.
@ -123,13 +182,13 @@ impl MediaManager {
/// until syncing time, as we can't trust the entries previous Anki versions /// until syncing time, as we can't trust the entries previous Anki versions
/// wrote are correct. /// wrote are correct.
fn media_folder_changes( fn media_folder_changes(
&self, media_folder: &Path,
mut mtimes: HashMap<String, i64>, mut mtimes: HashMap<String, i64>,
) -> Result<(Vec<FilesystemEntry>, Vec<String>)> { ) -> Result<(Vec<FilesystemEntry>, Vec<String>)> {
let mut added_or_changed = vec![]; let mut added_or_changed = vec![];
// loop through on-disk files // loop through on-disk files
for dentry in self.media_folder.read_dir()? { for dentry in media_folder.read_dir()? {
let dentry = dentry?; let dentry = dentry?;
// skip folders // skip folders
@ -189,10 +248,11 @@ impl MediaManager {
/// Add added/updated entries to the media DB. /// Add added/updated entries to the media DB.
/// ///
/// Skip files where the mod time differed, but checksums are the same. /// Skip files where the mod time differed, but checksums are the same.
fn add_updated_entries(&mut self, entries: Vec<FilesystemEntry>) -> Result<()> { fn add_updated_entries(
for chunk in entries.chunks(1_024) { ctx: &mut MediaDatabaseContext,
self.transact(|ctx| { entries: Vec<FilesystemEntry>,
for fentry in chunk { ) -> Result<()> {
for fentry in entries {
let mut sync_required = true; let mut sync_required = true;
if !fentry.is_new { if !fentry.is_new {
if let Some(db_entry) = ctx.get_entry(&fentry.fname)? { if let Some(db_entry) = ctx.get_entry(&fentry.fname)? {
@ -206,27 +266,21 @@ impl MediaManager {
}; };
ctx.set_entry(&MediaEntry { ctx.set_entry(&MediaEntry {
fname: fentry.fname.clone(), fname: fentry.fname,
sha1: fentry.sha1, sha1: fentry.sha1,
mtime: fentry.mtime, mtime: fentry.mtime,
sync_required, sync_required,
})?; })?;
} }
Ok(())
})?;
}
Ok(()) Ok(())
} }
/// Remove deleted files from the media DB. /// Remove deleted files from the media DB.
fn remove_deleted_files(&mut self, removed: Vec<String>) -> Result<()> { fn remove_deleted_files(ctx: &mut MediaDatabaseContext, removed: Vec<String>) -> Result<()> {
for chunk in removed.chunks(4_096) { for fname in removed {
self.transact(|ctx| {
for fname in chunk {
ctx.set_entry(&MediaEntry { ctx.set_entry(&MediaEntry {
fname: fname.clone(), fname,
sha1: None, sha1: None,
mtime: 0, mtime: 0,
sync_required: true, sync_required: true,
@ -234,11 +288,6 @@ impl MediaManager {
} }
Ok(()) Ok(())
})?;
}
Ok(())
}
} }
#[cfg(test)] #[cfg(test)]
@ -271,7 +320,10 @@ mod test {
let media_db = dir.path().join("media.db"); let media_db = dir.path().join("media.db");
let mut mgr = MediaManager::new(&media_dir, media_db)?; let mut mgr = MediaManager::new(&media_dir, media_db)?;
assert_eq!(mgr.count()?, 0); mgr.query(|ctx| {
assert_eq!(ctx.count()?, 0);
Ok(())
})?;
// add a file and check it's picked up // add a file and check it's picked up
let f1 = media_dir.join("file.jpg"); let f1 = media_dir.join("file.jpg");
@ -280,9 +332,10 @@ mod test {
change_mtime(&media_dir); change_mtime(&media_dir);
mgr.register_changes()?; mgr.register_changes()?;
assert_eq!(mgr.count()?, 1); let mut entry = mgr.transact(|ctx| {
assert_eq!(mgr.changes_pending()?, 1); assert_eq!(ctx.count()?, 1);
let mut entry = mgr.get_entry("file.jpg")?.unwrap(); assert_eq!(ctx.changes_pending()?, 1);
let mut entry = ctx.get_entry("file.jpg")?.unwrap();
assert_eq!( assert_eq!(
entry, entry,
MediaEntry { MediaEntry {
@ -300,20 +353,25 @@ mod test {
// mark it as unmodified // mark it as unmodified
entry.sync_required = false; entry.sync_required = false;
mgr.set_entry(&entry)?; ctx.set_entry(&entry)?;
assert_eq!(mgr.changes_pending()?, 0); assert_eq!(ctx.changes_pending()?, 0);
// modify it // modify it
fs::write(&f1, "hello1")?; fs::write(&f1, "hello1")?;
change_mtime(&f1); change_mtime(&f1);
change_mtime(&media_dir); change_mtime(&media_dir);
Ok(entry)
})?;
mgr.register_changes()?; mgr.register_changes()?;
assert_eq!(mgr.count()?, 1); mgr.transact(|ctx| {
assert_eq!(mgr.changes_pending()?, 1); assert_eq!(ctx.count()?, 1);
assert_eq!(ctx.changes_pending()?, 1);
assert_eq!( assert_eq!(
mgr.get_entry("file.jpg")?.unwrap(), ctx.get_entry("file.jpg")?.unwrap(),
MediaEntry { MediaEntry {
fname: "file.jpg".into(), fname: "file.jpg".into(),
sha1: Some(sha1_of_data("hello1".as_bytes())), sha1: Some(sha1_of_data("hello1".as_bytes())),
@ -329,8 +387,11 @@ mod test {
// mark it as unmodified // mark it as unmodified
entry.sync_required = false; entry.sync_required = false;
mgr.set_entry(&entry)?; ctx.set_entry(&entry)?;
assert_eq!(mgr.changes_pending()?, 0); assert_eq!(ctx.changes_pending()?, 0);
Ok(())
})?;
// delete it // delete it
fs::remove_file(&f1)?; fs::remove_file(&f1)?;
@ -338,10 +399,11 @@ mod test {
change_mtime(&media_dir); change_mtime(&media_dir);
mgr.register_changes().unwrap(); mgr.register_changes().unwrap();
assert_eq!(mgr.count()?, 0); mgr.query(|ctx| {
assert_eq!(mgr.changes_pending()?, 1); assert_eq!(ctx.count()?, 0);
assert_eq!(ctx.changes_pending()?, 1);
assert_eq!( assert_eq!(
mgr.get_entry("file.jpg")?.unwrap(), ctx.get_entry("file.jpg")?.unwrap(),
MediaEntry { MediaEntry {
fname: "file.jpg".into(), fname: "file.jpg".into(),
sha1: None, sha1: None,
@ -351,5 +413,6 @@ mod test {
); );
Ok(()) Ok(())
})
} }
} }