store sync state in a struct, and reuse ctx across methods

This commit is contained in:
Damien Elmes 2020-02-01 21:21:19 +10:00
parent f20b5b8db6
commit 5e5906f183
3 changed files with 257 additions and 248 deletions

View file

@ -88,41 +88,27 @@ impl MediaDatabaseContext<'_> {
} }
} }
/// Call the provided closure with a session that exists for
/// the duration of the call.
///
/// This function should be used for read-only requests. To mutate
/// the database, use transact() instead.
pub(super) fn query<F, R>(db: &Connection, func: F) -> Result<R>
where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{
func(&mut Self::new(db))
}
/// Execute the provided closure in a transaction, rolling back if /// Execute the provided closure in a transaction, rolling back if
/// an error is returned. /// an error is returned.
pub(super) fn transact<F, R>(db: &Connection, func: F) -> Result<R> pub(super) fn transact<F, R>(&mut self, func: F) -> Result<R>
where where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>, F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{ {
MediaDatabaseContext::query(db, |ctx| { self.begin()?;
ctx.begin()?;
let mut res = func(ctx); let mut res = func(self);
if res.is_ok() { if res.is_ok() {
if let Err(e) = ctx.commit() { if let Err(e) = self.commit() {
res = Err(e); res = Err(e);
}
} }
}
if res.is_err() { if res.is_err() {
ctx.rollback()?; self.rollback()?;
} }
res res
})
} }
fn begin(&mut self) -> Result<()> { fn begin(&mut self) -> Result<()> {
@ -273,8 +259,9 @@ mod test {
let db_file = NamedTempFile::new()?; let db_file = NamedTempFile::new()?;
let db_file_path = db_file.path().to_str().unwrap(); let db_file_path = db_file.path().to_str().unwrap();
let mut mgr = MediaManager::new("/dummy", db_file_path)?; let mut mgr = MediaManager::new("/dummy", db_file_path)?;
let mut ctx = mgr.dbctx();
mgr.transact(|ctx| { ctx.transact(|ctx| {
// no entry exists yet // no entry exists yet
assert_eq!(ctx.get_entry("test.mp3")?, None); assert_eq!(ctx.get_entry("test.mp3")?, None);
@ -314,13 +301,13 @@ mod test {
})?; })?;
// reopen database and ensure data was committed // reopen database and ensure data was committed
drop(ctx);
drop(mgr); drop(mgr);
mgr = MediaManager::new("/dummy", db_file_path)?; mgr = MediaManager::new("/dummy", db_file_path)?;
mgr.query(|ctx| { let mut ctx = mgr.dbctx();
let meta = ctx.get_meta()?; let meta = ctx.get_meta()?;
assert_eq!(meta.folder_mtime, 123); assert_eq!(meta.folder_mtime, 123);
Ok(()) Ok(())
})
} }
} }

View file

@ -59,7 +59,7 @@ impl MediaManager {
let post_add_folder_mtime = mtime_as_i64(&self.media_folder)?; let post_add_folder_mtime = mtime_as_i64(&self.media_folder)?;
// add to the media DB // add to the media DB
self.transact(|ctx| { self.dbctx().transact(|ctx| {
let existing_entry = ctx.get_entry(&chosen_fname)?; let existing_entry = ctx.get_entry(&chosen_fname)?;
let new_sha1 = Some(data_hash); let new_sha1 = Some(data_hash);
@ -94,52 +94,55 @@ impl MediaManager {
Ok(chosen_fname) Ok(chosen_fname)
} }
/// Note any added/changed/deleted files.
fn register_changes(&mut self) -> Result<()> {
self.transact(|ctx| {
// folder mtime unchanged?
let dirmod = mtime_as_i64(&self.media_folder)?;
let mut meta = ctx.get_meta()?;
if dirmod == meta.folder_mtime {
return Ok(());
} else {
meta.folder_mtime = dirmod;
}
let mtimes = ctx.all_mtimes()?;
let (changed, removed) = media_folder_changes(&self.media_folder, mtimes)?;
add_updated_entries(ctx, changed)?;
remove_deleted_files(ctx, removed)?;
ctx.set_meta(&meta)?;
Ok(())
})
}
// forceResync // forceResync
pub fn clear(&mut self) -> Result<()> { pub fn clear(&mut self) -> Result<()> {
self.transact(|ctx| ctx.clear()) self.dbctx().transact(|ctx| ctx.clear())
}
fn dbctx(&self) -> MediaDatabaseContext {
MediaDatabaseContext::new(&self.db)
} }
// db helpers // db helpers
pub(super) fn query<F, R>(&self, func: F) -> Result<R> // pub(super) fn query<F, R>(&self, func: F) -> Result<R>
where // where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>, // F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{ // {
MediaDatabaseContext::query(&self.db, func) // MediaDatabaseContext::query(&self.db, func)
} // }
pub(super) fn transact<F, R>(&self, func: F) -> Result<R> // pub(super) fn transact<F, R>(&self, func: F) -> Result<R>
where // where
F: FnOnce(&mut MediaDatabaseContext) -> Result<R>, // F: FnOnce(&mut MediaDatabaseContext) -> Result<R>,
{ // {
MediaDatabaseContext::transact(&self.db, func) // MediaDatabaseContext::transact(&self.db, func)
} // }
}
fn register_changes(ctx: &mut MediaDatabaseContext, folder: &Path) -> Result<()> {
ctx.transact(|ctx| {
// folder mtime unchanged?
let dirmod = mtime_as_i64(folder)?;
let mut meta = ctx.get_meta()?;
if dirmod == meta.folder_mtime {
return Ok(());
} else {
meta.folder_mtime = dirmod;
}
let mtimes = ctx.all_mtimes()?;
let (changed, removed) = media_folder_changes(folder, mtimes)?;
add_updated_entries(ctx, changed)?;
remove_deleted_files(ctx, removed)?;
ctx.set_meta(&meta)?;
Ok(())
})
} }
/// Scan through the media folder, finding changes. /// Scan through the media folder, finding changes.
@ -262,7 +265,7 @@ mod test {
use crate::err::Result; use crate::err::Result;
use crate::media::database::MediaEntry; use crate::media::database::MediaEntry;
use crate::media::files::sha1_of_data; use crate::media::files::sha1_of_data;
use crate::media::MediaManager; use crate::media::{register_changes, MediaManager};
use std::path::Path; use std::path::Path;
use std::time::Duration; use std::time::Duration;
use std::{fs, time}; use std::{fs, time};
@ -286,20 +289,19 @@ mod test {
std::fs::create_dir(&media_dir)?; std::fs::create_dir(&media_dir)?;
let media_db = dir.path().join("media.db"); let media_db = dir.path().join("media.db");
let mut mgr = MediaManager::new(&media_dir, media_db)?; let mgr = MediaManager::new(&media_dir, media_db)?;
mgr.query(|ctx| { let mut ctx = mgr.dbctx();
assert_eq!(ctx.count()?, 0);
Ok(()) assert_eq!(ctx.count()?, 0);
})?;
// add a file and check it's picked up // add a file and check it's picked up
let f1 = media_dir.join("file.jpg"); let f1 = media_dir.join("file.jpg");
fs::write(&f1, "hello")?; fs::write(&f1, "hello")?;
change_mtime(&media_dir); change_mtime(&media_dir);
mgr.register_changes()?; register_changes(&mut ctx, &mgr.media_folder)?;
let mut entry = mgr.transact(|ctx| { let mut entry = ctx.transact(|ctx| {
assert_eq!(ctx.count()?, 1); assert_eq!(ctx.count()?, 1);
assert!(!ctx.get_pending_uploads(1)?.is_empty()); assert!(!ctx.get_pending_uploads(1)?.is_empty());
let mut entry = ctx.get_entry("file.jpg")?.unwrap(); let mut entry = ctx.get_entry("file.jpg")?.unwrap();
@ -332,9 +334,9 @@ mod test {
Ok(entry) Ok(entry)
})?; })?;
mgr.register_changes()?; register_changes(&mut ctx, &mgr.media_folder)?;
mgr.transact(|ctx| { ctx.transact(|ctx| {
assert_eq!(ctx.count()?, 1); assert_eq!(ctx.count()?, 1);
assert!(!ctx.get_pending_uploads(1)?.is_empty()); assert!(!ctx.get_pending_uploads(1)?.is_empty());
assert_eq!( assert_eq!(
@ -364,22 +366,20 @@ mod test {
fs::remove_file(&f1)?; fs::remove_file(&f1)?;
change_mtime(&media_dir); change_mtime(&media_dir);
mgr.register_changes().unwrap(); register_changes(&mut ctx, &mgr.media_folder)?;
mgr.query(|ctx| { assert_eq!(ctx.count()?, 0);
assert_eq!(ctx.count()?, 0); assert!(!ctx.get_pending_uploads(1)?.is_empty());
assert!(!ctx.get_pending_uploads(1)?.is_empty()); assert_eq!(
assert_eq!( ctx.get_entry("file.jpg")?.unwrap(),
ctx.get_entry("file.jpg")?.unwrap(), MediaEntry {
MediaEntry { fname: "file.jpg".into(),
fname: "file.jpg".into(), sha1: None,
sha1: None, mtime: 0,
mtime: 0, sync_required: true,
sync_required: true, }
} );
);
Ok(()) Ok(())
})
} }
} }

View file

@ -6,7 +6,7 @@ use crate::media::database::{MediaDatabaseContext, MediaEntry};
use crate::media::files::{ use crate::media::files::{
add_file_from_ankiweb, data_for_file, normalize_filename, remove_files, AddedFile, add_file_from_ankiweb, data_for_file, normalize_filename, remove_files, AddedFile,
}; };
use crate::media::MediaManager; use crate::media::{register_changes, MediaManager};
use bytes::Bytes; use bytes::Bytes;
use log::debug; use log::debug;
use reqwest; use reqwest;
@ -32,19 +32,145 @@ static SYNC_URL: &str = "https://sync.ankiweb.net/msync/";
static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_FILES: usize = 25;
static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize;
struct SyncContext<'a> {
mgr: &'a MediaManager,
ctx: MediaDatabaseContext<'a>,
skey: Option<String>,
client: Client,
}
impl SyncContext<'_> {
fn new(mgr: &MediaManager) -> SyncContext {
let client = Client::builder()
.connect_timeout(time::Duration::from_secs(30))
.build()
.unwrap();
let ctx = mgr.dbctx();
SyncContext {
mgr,
ctx,
skey: None,
client,
}
}
fn skey(&self) -> &str {
self.skey.as_ref().unwrap()
}
async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> {
let url = format!("{}/begin", SYNC_URL);
let resp = self
.client
.get(&url)
.query(&[("k", hkey), ("v", "ankidesktop,2.1.19,mac")])
.send()
.await?
.error_for_status()
.map_err(rewrite_forbidden)?;
let reply: SyncBeginResult = resp.json().await?;
if let Some(data) = reply.data {
Ok((data.sync_key, data.usn))
} else {
Err(AnkiError::AnkiWebMiscError { info: reply.err })
}
}
async fn fetch_changes(&mut self, client_usn: i32) -> Result<()> {
let mut last_usn = client_usn;
loop {
debug!("fetching record batch starting from usn {}", last_usn);
let batch = fetch_record_batch(&self.client, self.skey(), last_usn).await?;
if batch.is_empty() {
debug!("empty batch, done");
break;
}
last_usn = batch.last().unwrap().usn;
let (to_download, to_delete, to_remove_pending) =
determine_required_changes(&mut self.ctx, &batch)?;
// do file removal and additions first
remove_files(self.mgr.media_folder.as_path(), to_delete.as_slice())?;
let downloaded = download_files(
self.mgr.media_folder.as_path(),
&self.client,
self.skey(),
to_download.as_slice(),
)
.await?;
// then update the DB
self.ctx.transact(|ctx| {
record_removals(ctx, &to_delete)?;
record_additions(ctx, downloaded)?;
record_clean(ctx, &to_remove_pending)?;
Ok(())
})?;
}
Ok(())
}
async fn send_changes(&mut self) -> Result<()> {
loop {
let pending: Vec<MediaEntry> = self.ctx.get_pending_uploads(SYNC_MAX_FILES as u32)?;
if pending.is_empty() {
break;
}
let zip_data = zip_files(&self.mgr.media_folder, &pending)?;
send_zip_data(&self.client, self.skey(), zip_data).await?;
let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect();
self.ctx
.transact(|ctx| record_clean(ctx, fnames.as_slice()))?;
}
Ok(())
}
async fn finalize_sync(&mut self) -> Result<()> {
let url = format!("{}/mediaSanity", SYNC_URL);
let local = self.ctx.count()?;
let obj = FinalizeRequest { local };
let resp = ankiweb_json_request(&self.client, &url, &obj, self.skey()).await?;
let resp: FinalizeResponse = resp.json().await?;
if let Some(data) = resp.data {
if data == "OK" {
Ok(())
} else {
// fixme: force resync
Err(AnkiError::AnkiWebMiscError {
info: "resync required ".into(),
})
}
} else {
Err(AnkiError::AnkiWebMiscError {
info: format!("finalize failed: {}", resp.err),
})
}
}
}
#[allow(clippy::useless_let_if_seq)] #[allow(clippy::useless_let_if_seq)]
pub async fn sync_media(mgr: &mut MediaManager, hkey: &str) -> Result<()> { pub async fn sync_media(mgr: &mut MediaManager, hkey: &str) -> Result<()> {
let mut sctx = SyncContext::new(mgr);
// make sure media DB is up to date // make sure media DB is up to date
mgr.register_changes()?; register_changes(&mut sctx.ctx, mgr.media_folder.as_path())?;
//mgr.register_changes()?;
let client_usn = mgr.query(|ctx| Ok(ctx.get_meta()?.last_sync_usn))?; let client_usn = sctx.ctx.get_meta()?.last_sync_usn;
let client = Client::builder()
.connect_timeout(time::Duration::from_secs(30))
.build()?;
debug!("beginning media sync"); debug!("beginning media sync");
let (sync_key, server_usn) = sync_begin(&client, hkey).await?; let (sync_key, server_usn) = sctx.sync_begin(hkey).await?;
sctx.skey = Some(sync_key);
debug!("server usn was {}", server_usn); debug!("server usn was {}", server_usn);
let mut actions_performed = false; let mut actions_performed = false;
@ -52,19 +178,19 @@ pub async fn sync_media(mgr: &mut MediaManager, hkey: &str) -> Result<()> {
// need to fetch changes from server? // need to fetch changes from server?
if client_usn != server_usn { if client_usn != server_usn {
debug!("differs from local usn {}, fetching changes", client_usn); debug!("differs from local usn {}, fetching changes", client_usn);
fetch_changes(mgr, &client, &sync_key, client_usn).await?; sctx.fetch_changes(client_usn).await?;
actions_performed = true; actions_performed = true;
} }
// need to send changes to server? // need to send changes to server?
let changes_pending = mgr.query(|ctx| Ok(!ctx.get_pending_uploads(1)?.is_empty()))?; let changes_pending = !sctx.ctx.get_pending_uploads(1)?.is_empty();
if changes_pending { if changes_pending {
send_changes(mgr, &client, &sync_key).await?; sctx.send_changes().await?;
actions_performed = true; actions_performed = true;
} }
if actions_performed { if actions_performed {
finalize_sync(mgr, &client, &sync_key).await?; sctx.finalize_sync().await?;
} }
debug!("media sync complete"); debug!("media sync complete");
@ -93,65 +219,6 @@ fn rewrite_forbidden(err: reqwest::Error) -> AnkiError {
} }
} }
async fn sync_begin(client: &Client, hkey: &str) -> Result<(String, i32)> {
let url = format!("{}/begin", SYNC_URL);
let resp = client
.get(&url)
.query(&[("k", hkey), ("v", "ankidesktop,2.1.19,mac")])
.send()
.await?
.error_for_status()
.map_err(rewrite_forbidden)?;
let reply: SyncBeginResult = resp.json().await?;
if let Some(data) = reply.data {
Ok((data.sync_key, data.usn))
} else {
Err(AnkiError::AnkiWebMiscError { info: reply.err })
}
}
async fn fetch_changes(
mgr: &mut MediaManager,
client: &Client,
skey: &str,
client_usn: i32,
) -> Result<()> {
let mut last_usn = client_usn;
loop {
debug!("fetching record batch starting from usn {}", last_usn);
let batch = fetch_record_batch(client, skey, last_usn).await?;
if batch.is_empty() {
debug!("empty batch, done");
break;
}
last_usn = batch.last().unwrap().usn;
let (to_download, to_delete, to_remove_pending) = determine_required_changes(mgr, &batch)?;
// do file removal and additions first
remove_files(mgr.media_folder.as_path(), to_delete.as_slice())?;
let downloaded = download_files(
mgr.media_folder.as_path(),
client,
skey,
to_download.as_slice(),
)
.await?;
// then update the DB
mgr.transact(|ctx| {
record_removals(ctx, &to_delete)?;
record_additions(ctx, downloaded)?;
record_clean(ctx, &to_remove_pending)?;
Ok(())
})?;
}
Ok(())
}
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
enum LocalState { enum LocalState {
NotInDB, NotInDB,
@ -204,49 +271,47 @@ fn determine_required_change(
/// Get a list of server filenames and the actions required on them. /// Get a list of server filenames and the actions required on them.
/// Returns filenames in (to_download, to_delete). /// Returns filenames in (to_download, to_delete).
fn determine_required_changes<'a>( fn determine_required_changes<'a>(
mgr: &mut MediaManager, ctx: &mut MediaDatabaseContext,
records: &'a [ServerMediaRecord], records: &'a [ServerMediaRecord],
) -> Result<(Vec<&'a String>, Vec<&'a String>, Vec<&'a String>)> { ) -> Result<(Vec<&'a String>, Vec<&'a String>, Vec<&'a String>)> {
mgr.query(|ctx| { let mut to_download = vec![];
let mut to_download = vec![]; let mut to_delete = vec![];
let mut to_delete = vec![]; let mut to_remove_pending = vec![];
let mut to_remove_pending = vec![];
for remote in records { for remote in records {
let (local_sha1, local_state) = match ctx.get_entry(&remote.fname)? { let (local_sha1, local_state) = match ctx.get_entry(&remote.fname)? {
Some(entry) => ( Some(entry) => (
match entry.sha1 { match entry.sha1 {
Some(arr) => hex::encode(arr), Some(arr) => hex::encode(arr),
None => "".to_string(), None => "".to_string(),
}, },
if entry.sync_required { if entry.sync_required {
LocalState::InDBAndPending LocalState::InDBAndPending
} else { } else {
LocalState::InDBNotPending LocalState::InDBNotPending
}, },
), ),
None => ("".to_string(), LocalState::NotInDB), None => ("".to_string(), LocalState::NotInDB),
}; };
let req_change = determine_required_change(&local_sha1, &remote.sha1, local_state); let req_change = determine_required_change(&local_sha1, &remote.sha1, local_state);
debug!( debug!(
"for {}, lsha={} rsha={} lstate={:?} -> {:?}", "for {}, lsha={} rsha={} lstate={:?} -> {:?}",
remote.fname, remote.fname,
local_sha1.chars().take(8).collect::<String>(), local_sha1.chars().take(8).collect::<String>(),
remote.sha1.chars().take(8).collect::<String>(), remote.sha1.chars().take(8).collect::<String>(),
local_state, local_state,
req_change req_change
); );
match req_change { match req_change {
RequiredChange::Download => to_download.push(&remote.fname), RequiredChange::Download => to_download.push(&remote.fname),
RequiredChange::Delete => to_delete.push(&remote.fname), RequiredChange::Delete => to_delete.push(&remote.fname),
RequiredChange::RemovePending => to_remove_pending.push(&remote.fname), RequiredChange::RemovePending => to_remove_pending.push(&remote.fname),
RequiredChange::None => (), RequiredChange::None => (),
}; };
} }
Ok((to_download, to_delete, to_remove_pending)) Ok((to_download, to_delete, to_remove_pending))
})
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
@ -444,25 +509,6 @@ fn record_clean(ctx: &mut MediaDatabaseContext, clean: &[&String]) -> Result<()>
Ok(()) Ok(())
} }
async fn send_changes(mgr: &mut MediaManager, client: &Client, skey: &str) -> Result<()> {
loop {
let pending: Vec<MediaEntry> = mgr.query(|ctx: &mut MediaDatabaseContext| {
ctx.get_pending_uploads(SYNC_MAX_FILES as u32)
})?;
if pending.is_empty() {
break;
}
let zip_data = zip_files(&mgr.media_folder, &pending)?;
send_zip_data(client, skey, zip_data).await?;
let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect();
mgr.transact(|ctx| record_clean(ctx, fnames.as_slice()))?;
}
Ok(())
}
#[derive(Serialize_tuple)] #[derive(Serialize_tuple)]
struct UploadEntry<'a> { struct UploadEntry<'a> {
fname: &'a str, fname: &'a str,
@ -556,30 +602,6 @@ struct FinalizeResponse {
err: String, err: String,
} }
async fn finalize_sync(mgr: &mut MediaManager, client: &Client, skey: &str) -> Result<()> {
let url = format!("{}/mediaSanity", SYNC_URL);
let local = mgr.query(|ctx| ctx.count())?;
let obj = FinalizeRequest { local };
let resp = ankiweb_json_request(client, &url, &obj, skey).await?;
let resp: FinalizeResponse = resp.json().await?;
if let Some(data) = resp.data {
if data == "OK" {
Ok(())
} else {
// fixme: force resync
Err(AnkiError::AnkiWebMiscError {
info: "resync required ".into(),
})
}
} else {
Err(AnkiError::AnkiWebMiscError {
info: format!("finalize failed: {}", resp.err),
})
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::err::Result; use crate::err::Result;