From 5e5906f183aa9a1ee1979fd0bd6865bb40f28282 Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Sat, 1 Feb 2020 21:21:19 +1000 Subject: [PATCH] store sync state in a struct, and reuse ctx across methods --- rslib/src/media/database.rs | 49 ++---- rslib/src/media/mod.rs | 130 +++++++------- rslib/src/media/sync.rs | 326 +++++++++++++++++++----------------- 3 files changed, 257 insertions(+), 248 deletions(-) diff --git a/rslib/src/media/database.rs b/rslib/src/media/database.rs index 78f39380e..bf160d50c 100644 --- a/rslib/src/media/database.rs +++ b/rslib/src/media/database.rs @@ -88,41 +88,27 @@ impl MediaDatabaseContext<'_> { } } - /// Call the provided closure with a session that exists for - /// the duration of the call. - /// - /// This function should be used for read-only requests. To mutate - /// the database, use transact() instead. - pub(super) fn query(db: &Connection, func: F) -> Result - where - F: FnOnce(&mut MediaDatabaseContext) -> Result, - { - func(&mut Self::new(db)) - } - /// Execute the provided closure in a transaction, rolling back if /// an error is returned. - pub(super) fn transact(db: &Connection, func: F) -> Result + pub(super) fn transact(&mut self, func: F) -> Result where F: FnOnce(&mut MediaDatabaseContext) -> Result, { - MediaDatabaseContext::query(db, |ctx| { - ctx.begin()?; + self.begin()?; - let mut res = func(ctx); + let mut res = func(self); - if res.is_ok() { - if let Err(e) = ctx.commit() { - res = Err(e); - } + if res.is_ok() { + if let Err(e) = self.commit() { + res = Err(e); } + } - if res.is_err() { - ctx.rollback()?; - } + if res.is_err() { + self.rollback()?; + } - res - }) + res } fn begin(&mut self) -> Result<()> { @@ -273,8 +259,9 @@ mod test { let db_file = NamedTempFile::new()?; let db_file_path = db_file.path().to_str().unwrap(); let mut mgr = MediaManager::new("/dummy", db_file_path)?; + let mut ctx = mgr.dbctx(); - mgr.transact(|ctx| { + ctx.transact(|ctx| { // no entry exists yet assert_eq!(ctx.get_entry("test.mp3")?, None); @@ -314,13 +301,13 @@ mod test { })?; // reopen database and ensure data was committed + drop(ctx); drop(mgr); mgr = MediaManager::new("/dummy", db_file_path)?; - mgr.query(|ctx| { - let meta = ctx.get_meta()?; - assert_eq!(meta.folder_mtime, 123); + let mut ctx = mgr.dbctx(); + let meta = ctx.get_meta()?; + assert_eq!(meta.folder_mtime, 123); - Ok(()) - }) + Ok(()) } } diff --git a/rslib/src/media/mod.rs b/rslib/src/media/mod.rs index 1f64ebc09..6674427ff 100644 --- a/rslib/src/media/mod.rs +++ b/rslib/src/media/mod.rs @@ -59,7 +59,7 @@ impl MediaManager { let post_add_folder_mtime = mtime_as_i64(&self.media_folder)?; // add to the media DB - self.transact(|ctx| { + self.dbctx().transact(|ctx| { let existing_entry = ctx.get_entry(&chosen_fname)?; let new_sha1 = Some(data_hash); @@ -94,52 +94,55 @@ impl MediaManager { Ok(chosen_fname) } - /// Note any added/changed/deleted files. - fn register_changes(&mut self) -> Result<()> { - self.transact(|ctx| { - // folder mtime unchanged? - let dirmod = mtime_as_i64(&self.media_folder)?; - - let mut meta = ctx.get_meta()?; - if dirmod == meta.folder_mtime { - return Ok(()); - } else { - meta.folder_mtime = dirmod; - } - - let mtimes = ctx.all_mtimes()?; - - let (changed, removed) = media_folder_changes(&self.media_folder, mtimes)?; - - add_updated_entries(ctx, changed)?; - remove_deleted_files(ctx, removed)?; - - ctx.set_meta(&meta)?; - - Ok(()) - }) - } - // forceResync pub fn clear(&mut self) -> Result<()> { - self.transact(|ctx| ctx.clear()) + self.dbctx().transact(|ctx| ctx.clear()) + } + + fn dbctx(&self) -> MediaDatabaseContext { + MediaDatabaseContext::new(&self.db) } // db helpers - pub(super) fn query(&self, func: F) -> Result - where - F: FnOnce(&mut MediaDatabaseContext) -> Result, - { - MediaDatabaseContext::query(&self.db, func) - } + // pub(super) fn query(&self, func: F) -> Result + // where + // F: FnOnce(&mut MediaDatabaseContext) -> Result, + // { + // MediaDatabaseContext::query(&self.db, func) + // } - pub(super) fn transact(&self, func: F) -> Result - where - F: FnOnce(&mut MediaDatabaseContext) -> Result, - { - MediaDatabaseContext::transact(&self.db, func) - } + // pub(super) fn transact(&self, func: F) -> Result + // where + // F: FnOnce(&mut MediaDatabaseContext) -> Result, + // { + // MediaDatabaseContext::transact(&self.db, func) + // } +} + +fn register_changes(ctx: &mut MediaDatabaseContext, folder: &Path) -> Result<()> { + ctx.transact(|ctx| { + // folder mtime unchanged? + let dirmod = mtime_as_i64(folder)?; + + let mut meta = ctx.get_meta()?; + if dirmod == meta.folder_mtime { + return Ok(()); + } else { + meta.folder_mtime = dirmod; + } + + let mtimes = ctx.all_mtimes()?; + + let (changed, removed) = media_folder_changes(folder, mtimes)?; + + add_updated_entries(ctx, changed)?; + remove_deleted_files(ctx, removed)?; + + ctx.set_meta(&meta)?; + + Ok(()) + }) } /// Scan through the media folder, finding changes. @@ -262,7 +265,7 @@ mod test { use crate::err::Result; use crate::media::database::MediaEntry; use crate::media::files::sha1_of_data; - use crate::media::MediaManager; + use crate::media::{register_changes, MediaManager}; use std::path::Path; use std::time::Duration; use std::{fs, time}; @@ -286,20 +289,19 @@ mod test { std::fs::create_dir(&media_dir)?; let media_db = dir.path().join("media.db"); - let mut mgr = MediaManager::new(&media_dir, media_db)?; - mgr.query(|ctx| { - assert_eq!(ctx.count()?, 0); - Ok(()) - })?; + let mgr = MediaManager::new(&media_dir, media_db)?; + let mut ctx = mgr.dbctx(); + + assert_eq!(ctx.count()?, 0); // add a file and check it's picked up let f1 = media_dir.join("file.jpg"); fs::write(&f1, "hello")?; change_mtime(&media_dir); - mgr.register_changes()?; + register_changes(&mut ctx, &mgr.media_folder)?; - let mut entry = mgr.transact(|ctx| { + let mut entry = ctx.transact(|ctx| { assert_eq!(ctx.count()?, 1); assert!(!ctx.get_pending_uploads(1)?.is_empty()); let mut entry = ctx.get_entry("file.jpg")?.unwrap(); @@ -332,9 +334,9 @@ mod test { Ok(entry) })?; - mgr.register_changes()?; + register_changes(&mut ctx, &mgr.media_folder)?; - mgr.transact(|ctx| { + ctx.transact(|ctx| { assert_eq!(ctx.count()?, 1); assert!(!ctx.get_pending_uploads(1)?.is_empty()); assert_eq!( @@ -364,22 +366,20 @@ mod test { fs::remove_file(&f1)?; change_mtime(&media_dir); - mgr.register_changes().unwrap(); + register_changes(&mut ctx, &mgr.media_folder)?; - mgr.query(|ctx| { - assert_eq!(ctx.count()?, 0); - assert!(!ctx.get_pending_uploads(1)?.is_empty()); - assert_eq!( - ctx.get_entry("file.jpg")?.unwrap(), - MediaEntry { - fname: "file.jpg".into(), - sha1: None, - mtime: 0, - sync_required: true, - } - ); + assert_eq!(ctx.count()?, 0); + assert!(!ctx.get_pending_uploads(1)?.is_empty()); + assert_eq!( + ctx.get_entry("file.jpg")?.unwrap(), + MediaEntry { + fname: "file.jpg".into(), + sha1: None, + mtime: 0, + sync_required: true, + } + ); - Ok(()) - }) + Ok(()) } } diff --git a/rslib/src/media/sync.rs b/rslib/src/media/sync.rs index 3e2df7316..760d78f0d 100644 --- a/rslib/src/media/sync.rs +++ b/rslib/src/media/sync.rs @@ -6,7 +6,7 @@ use crate::media::database::{MediaDatabaseContext, MediaEntry}; use crate::media::files::{ add_file_from_ankiweb, data_for_file, normalize_filename, remove_files, AddedFile, }; -use crate::media::MediaManager; +use crate::media::{register_changes, MediaManager}; use bytes::Bytes; use log::debug; use reqwest; @@ -32,19 +32,145 @@ static SYNC_URL: &str = "https://sync.ankiweb.net/msync/"; static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; +struct SyncContext<'a> { + mgr: &'a MediaManager, + ctx: MediaDatabaseContext<'a>, + skey: Option, + client: Client, +} + +impl SyncContext<'_> { + fn new(mgr: &MediaManager) -> SyncContext { + let client = Client::builder() + .connect_timeout(time::Duration::from_secs(30)) + .build() + .unwrap(); + let ctx = mgr.dbctx(); + + SyncContext { + mgr, + ctx, + skey: None, + client, + } + } + + fn skey(&self) -> &str { + self.skey.as_ref().unwrap() + } + + async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> { + let url = format!("{}/begin", SYNC_URL); + + let resp = self + .client + .get(&url) + .query(&[("k", hkey), ("v", "ankidesktop,2.1.19,mac")]) + .send() + .await? + .error_for_status() + .map_err(rewrite_forbidden)?; + + let reply: SyncBeginResult = resp.json().await?; + + if let Some(data) = reply.data { + Ok((data.sync_key, data.usn)) + } else { + Err(AnkiError::AnkiWebMiscError { info: reply.err }) + } + } + + async fn fetch_changes(&mut self, client_usn: i32) -> Result<()> { + let mut last_usn = client_usn; + loop { + debug!("fetching record batch starting from usn {}", last_usn); + let batch = fetch_record_batch(&self.client, self.skey(), last_usn).await?; + if batch.is_empty() { + debug!("empty batch, done"); + break; + } + last_usn = batch.last().unwrap().usn; + + let (to_download, to_delete, to_remove_pending) = + determine_required_changes(&mut self.ctx, &batch)?; + + // do file removal and additions first + remove_files(self.mgr.media_folder.as_path(), to_delete.as_slice())?; + let downloaded = download_files( + self.mgr.media_folder.as_path(), + &self.client, + self.skey(), + to_download.as_slice(), + ) + .await?; + + // then update the DB + self.ctx.transact(|ctx| { + record_removals(ctx, &to_delete)?; + record_additions(ctx, downloaded)?; + record_clean(ctx, &to_remove_pending)?; + Ok(()) + })?; + } + Ok(()) + } + + async fn send_changes(&mut self) -> Result<()> { + loop { + let pending: Vec = self.ctx.get_pending_uploads(SYNC_MAX_FILES as u32)?; + if pending.is_empty() { + break; + } + + let zip_data = zip_files(&self.mgr.media_folder, &pending)?; + send_zip_data(&self.client, self.skey(), zip_data).await?; + + let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect(); + self.ctx + .transact(|ctx| record_clean(ctx, fnames.as_slice()))?; + } + + Ok(()) + } + + async fn finalize_sync(&mut self) -> Result<()> { + let url = format!("{}/mediaSanity", SYNC_URL); + let local = self.ctx.count()?; + + let obj = FinalizeRequest { local }; + let resp = ankiweb_json_request(&self.client, &url, &obj, self.skey()).await?; + let resp: FinalizeResponse = resp.json().await?; + + if let Some(data) = resp.data { + if data == "OK" { + Ok(()) + } else { + // fixme: force resync + Err(AnkiError::AnkiWebMiscError { + info: "resync required ".into(), + }) + } + } else { + Err(AnkiError::AnkiWebMiscError { + info: format!("finalize failed: {}", resp.err), + }) + } + } +} + #[allow(clippy::useless_let_if_seq)] pub async fn sync_media(mgr: &mut MediaManager, hkey: &str) -> Result<()> { + let mut sctx = SyncContext::new(mgr); + // make sure media DB is up to date - mgr.register_changes()?; + register_changes(&mut sctx.ctx, mgr.media_folder.as_path())?; + //mgr.register_changes()?; - let client_usn = mgr.query(|ctx| Ok(ctx.get_meta()?.last_sync_usn))?; - - let client = Client::builder() - .connect_timeout(time::Duration::from_secs(30)) - .build()?; + let client_usn = sctx.ctx.get_meta()?.last_sync_usn; debug!("beginning media sync"); - let (sync_key, server_usn) = sync_begin(&client, hkey).await?; + let (sync_key, server_usn) = sctx.sync_begin(hkey).await?; + sctx.skey = Some(sync_key); debug!("server usn was {}", server_usn); let mut actions_performed = false; @@ -52,19 +178,19 @@ pub async fn sync_media(mgr: &mut MediaManager, hkey: &str) -> Result<()> { // need to fetch changes from server? if client_usn != server_usn { debug!("differs from local usn {}, fetching changes", client_usn); - fetch_changes(mgr, &client, &sync_key, client_usn).await?; + sctx.fetch_changes(client_usn).await?; actions_performed = true; } // need to send changes to server? - let changes_pending = mgr.query(|ctx| Ok(!ctx.get_pending_uploads(1)?.is_empty()))?; + let changes_pending = !sctx.ctx.get_pending_uploads(1)?.is_empty(); if changes_pending { - send_changes(mgr, &client, &sync_key).await?; + sctx.send_changes().await?; actions_performed = true; } if actions_performed { - finalize_sync(mgr, &client, &sync_key).await?; + sctx.finalize_sync().await?; } debug!("media sync complete"); @@ -93,65 +219,6 @@ fn rewrite_forbidden(err: reqwest::Error) -> AnkiError { } } -async fn sync_begin(client: &Client, hkey: &str) -> Result<(String, i32)> { - let url = format!("{}/begin", SYNC_URL); - - let resp = client - .get(&url) - .query(&[("k", hkey), ("v", "ankidesktop,2.1.19,mac")]) - .send() - .await? - .error_for_status() - .map_err(rewrite_forbidden)?; - - let reply: SyncBeginResult = resp.json().await?; - - if let Some(data) = reply.data { - Ok((data.sync_key, data.usn)) - } else { - Err(AnkiError::AnkiWebMiscError { info: reply.err }) - } -} - -async fn fetch_changes( - mgr: &mut MediaManager, - client: &Client, - skey: &str, - client_usn: i32, -) -> Result<()> { - let mut last_usn = client_usn; - loop { - debug!("fetching record batch starting from usn {}", last_usn); - let batch = fetch_record_batch(client, skey, last_usn).await?; - if batch.is_empty() { - debug!("empty batch, done"); - break; - } - last_usn = batch.last().unwrap().usn; - - let (to_download, to_delete, to_remove_pending) = determine_required_changes(mgr, &batch)?; - - // do file removal and additions first - remove_files(mgr.media_folder.as_path(), to_delete.as_slice())?; - let downloaded = download_files( - mgr.media_folder.as_path(), - client, - skey, - to_download.as_slice(), - ) - .await?; - - // then update the DB - mgr.transact(|ctx| { - record_removals(ctx, &to_delete)?; - record_additions(ctx, downloaded)?; - record_clean(ctx, &to_remove_pending)?; - Ok(()) - })?; - } - Ok(()) -} - #[derive(Debug, Clone, Copy)] enum LocalState { NotInDB, @@ -204,49 +271,47 @@ fn determine_required_change( /// Get a list of server filenames and the actions required on them. /// Returns filenames in (to_download, to_delete). fn determine_required_changes<'a>( - mgr: &mut MediaManager, + ctx: &mut MediaDatabaseContext, records: &'a [ServerMediaRecord], ) -> Result<(Vec<&'a String>, Vec<&'a String>, Vec<&'a String>)> { - mgr.query(|ctx| { - let mut to_download = vec![]; - let mut to_delete = vec![]; - let mut to_remove_pending = vec![]; + let mut to_download = vec![]; + let mut to_delete = vec![]; + let mut to_remove_pending = vec![]; - for remote in records { - let (local_sha1, local_state) = match ctx.get_entry(&remote.fname)? { - Some(entry) => ( - match entry.sha1 { - Some(arr) => hex::encode(arr), - None => "".to_string(), - }, - if entry.sync_required { - LocalState::InDBAndPending - } else { - LocalState::InDBNotPending - }, - ), - None => ("".to_string(), LocalState::NotInDB), - }; + for remote in records { + let (local_sha1, local_state) = match ctx.get_entry(&remote.fname)? { + Some(entry) => ( + match entry.sha1 { + Some(arr) => hex::encode(arr), + None => "".to_string(), + }, + if entry.sync_required { + LocalState::InDBAndPending + } else { + LocalState::InDBNotPending + }, + ), + None => ("".to_string(), LocalState::NotInDB), + }; - let req_change = determine_required_change(&local_sha1, &remote.sha1, local_state); - debug!( - "for {}, lsha={} rsha={} lstate={:?} -> {:?}", - remote.fname, - local_sha1.chars().take(8).collect::(), - remote.sha1.chars().take(8).collect::(), - local_state, - req_change - ); - match req_change { - RequiredChange::Download => to_download.push(&remote.fname), - RequiredChange::Delete => to_delete.push(&remote.fname), - RequiredChange::RemovePending => to_remove_pending.push(&remote.fname), - RequiredChange::None => (), - }; - } + let req_change = determine_required_change(&local_sha1, &remote.sha1, local_state); + debug!( + "for {}, lsha={} rsha={} lstate={:?} -> {:?}", + remote.fname, + local_sha1.chars().take(8).collect::(), + remote.sha1.chars().take(8).collect::(), + local_state, + req_change + ); + match req_change { + RequiredChange::Download => to_download.push(&remote.fname), + RequiredChange::Delete => to_delete.push(&remote.fname), + RequiredChange::RemovePending => to_remove_pending.push(&remote.fname), + RequiredChange::None => (), + }; + } - Ok((to_download, to_delete, to_remove_pending)) - }) + Ok((to_download, to_delete, to_remove_pending)) } #[derive(Debug, Serialize)] @@ -444,25 +509,6 @@ fn record_clean(ctx: &mut MediaDatabaseContext, clean: &[&String]) -> Result<()> Ok(()) } -async fn send_changes(mgr: &mut MediaManager, client: &Client, skey: &str) -> Result<()> { - loop { - let pending: Vec = mgr.query(|ctx: &mut MediaDatabaseContext| { - ctx.get_pending_uploads(SYNC_MAX_FILES as u32) - })?; - if pending.is_empty() { - break; - } - - let zip_data = zip_files(&mgr.media_folder, &pending)?; - send_zip_data(client, skey, zip_data).await?; - - let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect(); - mgr.transact(|ctx| record_clean(ctx, fnames.as_slice()))?; - } - - Ok(()) -} - #[derive(Serialize_tuple)] struct UploadEntry<'a> { fname: &'a str, @@ -556,30 +602,6 @@ struct FinalizeResponse { err: String, } -async fn finalize_sync(mgr: &mut MediaManager, client: &Client, skey: &str) -> Result<()> { - let url = format!("{}/mediaSanity", SYNC_URL); - let local = mgr.query(|ctx| ctx.count())?; - - let obj = FinalizeRequest { local }; - let resp = ankiweb_json_request(client, &url, &obj, skey).await?; - let resp: FinalizeResponse = resp.json().await?; - - if let Some(data) = resp.data { - if data == "OK" { - Ok(()) - } else { - // fixme: force resync - Err(AnkiError::AnkiWebMiscError { - info: "resync required ".into(), - }) - } - } else { - Err(AnkiError::AnkiWebMiscError { - info: format!("finalize failed: {}", resp.err), - }) - } -} - #[cfg(test)] mod test { use crate::err::Result;