diff --git a/rslib/BUILD.bazel b/rslib/BUILD.bazel index 91284f4c1..c2ae6fb9f 100644 --- a/rslib/BUILD.bazel +++ b/rslib/BUILD.bazel @@ -65,6 +65,7 @@ rust_library( proc_macro_deps = [ "//rslib/cargo:serde_derive", "//rslib/cargo:serde_repr", + "//rslib/cargo:async_trait", ], rustc_env = _anki_rustc_env, visibility = ["//visibility:public"], diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index f67bb77b8..fa99c7f36 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -1694,11 +1694,11 @@ impl Backend { }; let result = if upload { - let sync_fut = col_inner.full_upload(input.into(), progress_fn); + let sync_fut = col_inner.full_upload(input.into(), Box::new(progress_fn)); let abortable_sync = Abortable::new(sync_fut, abort_reg); rt.block_on(abortable_sync) } else { - let sync_fut = col_inner.full_download(input.into(), progress_fn); + let sync_fut = col_inner.full_download(input.into(), Box::new(progress_fn)); let abortable_sync = Abortable::new(sync_fut, abort_reg); rt.block_on(abortable_sync) }; diff --git a/rslib/src/collection.rs b/rslib/src/collection.rs index c893d4e68..dad3d3b5b 100644 --- a/rslib/src/collection.rs +++ b/rslib/src/collection.rs @@ -38,11 +38,18 @@ pub fn open_collection>( Ok(col) } +// We need to make a Builder for Collection in the future. + #[cfg(test)] pub fn open_test_collection() -> Collection { + open_test_collection_with_server(false) +} + +#[cfg(test)] +pub fn open_test_collection_with_server(server: bool) -> Collection { use crate::log; let i18n = I18n::new(&[""], "", log::terminal()); - open_collection(":memory:", "", "", false, i18n, log::terminal()).unwrap() + open_collection(":memory:", "", "", server, i18n, log::terminal()).unwrap() } #[derive(Debug, Default)] diff --git a/rslib/src/storage/card/mod.rs b/rslib/src/storage/card/mod.rs index abbf2f286..eef2f81b0 100644 --- a/rslib/src/storage/card/mod.rs +++ b/rslib/src/storage/card/mod.rs @@ -229,7 +229,7 @@ impl super::SqliteStorage { .prepare_cached("select null from cards")? .query(NO_PARAMS)? .next() - .map(|o| o.is_none()) + .map(|o| o.is_some()) .map_err(Into::into) } diff --git a/rslib/src/storage/mod.rs b/rslib/src/storage/mod.rs index f9749989f..6ef205599 100644 --- a/rslib/src/storage/mod.rs +++ b/rslib/src/storage/mod.rs @@ -16,6 +16,7 @@ mod tag; mod upgrades; pub(crate) use sqlite::SqliteStorage; +pub(crate) use sync::open_and_check_sqlite_file; use std::fmt::Write; diff --git a/rslib/src/storage/sqlite.rs b/rslib/src/storage/sqlite.rs index 31b142888..6695538de 100644 --- a/rslib/src/storage/sqlite.rs +++ b/rslib/src/storage/sqlite.rs @@ -170,7 +170,7 @@ impl SqliteStorage { "update col set crt=?, scm=?, ver=?, conf=?", params![ crt, - crt * 1000, + TimestampMillis::now(), SCHEMA_STARTING_VERSION, &schema11_config_as_string() ], diff --git a/rslib/src/storage/sync.rs b/rslib/src/storage/sync.rs index 6ec8fb19d..976df1622 100644 --- a/rslib/src/storage/sync.rs +++ b/rslib/src/storage/sync.rs @@ -1,9 +1,11 @@ // Copyright: Ankitects Pty Ltd and contributors // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +use std::path::Path; + use super::*; use crate::prelude::*; -use rusqlite::{params, types::FromSql, ToSql, NO_PARAMS}; +use rusqlite::{params, types::FromSql, Connection, ToSql, NO_PARAMS}; impl SqliteStorage { pub(crate) fn usn(&self, server: bool) -> Result { @@ -59,3 +61,28 @@ impl SqliteStorage { Ok(()) } } + +/// Return error if file is unreadable, fails the sqlite +/// integrity check, or is not in the 'delete' journal mode. +/// On success, returns the opened DB. +pub(crate) fn open_and_check_sqlite_file(path: &Path) -> Result { + let db = Connection::open(path)?; + match db.pragma_query_value(None, "integrity_check", |row| row.get::<_, String>(0)) { + Ok(s) => { + if s != "ok" { + return Err(AnkiError::invalid_input(format!("corrupt: {}", s))); + } + } + Err(e) => return Err(e.into()), + }; + match db.pragma_query_value(None, "journal_mode", |row| row.get::<_, String>(0)) { + Ok(s) => { + if s == "delete" { + Ok(db) + } else { + Err(AnkiError::invalid_input(format!("corrupt: {}", s))) + } + } + Err(e) => Err(e.into()), + } +} diff --git a/rslib/src/sync/http_client.rs b/rslib/src/sync/http_client.rs index 9884ea307..f72d68011 100644 --- a/rslib/src/sync/http_client.rs +++ b/rslib/src/sync/http_client.rs @@ -1,7 +1,9 @@ // Copyright: Ankitects Pty Ltd and contributors // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +use super::server::SyncServer; use super::*; +use async_trait::async_trait; use bytes::Bytes; use futures::Stream; use reqwest::Body; @@ -10,11 +12,14 @@ use reqwest::Body; static SYNC_VERSION: u8 = 10; +pub type FullSyncProgressFn = Box; + pub struct HTTPSyncClient { hkey: Option, skey: String, client: Client, endpoint: String, + full_sync_progress_fn: Option, } #[derive(Serialize)] @@ -113,9 +118,14 @@ impl HTTPSyncClient { skey, client, endpoint, + full_sync_progress_fn: None, } } + pub fn set_full_sync_progress_fn(&mut self, func: Option) { + self.full_sync_progress_fn = func; + } + async fn json_request(&self, method: &str, json: &T, timeout_long: bool) -> Result where T: serde::Serialize, @@ -178,8 +188,11 @@ impl HTTPSyncClient { pub(crate) fn hkey(&self) -> &str { self.hkey.as_ref().unwrap() } +} - pub(crate) async fn meta(&self) -> Result { +#[async_trait(?Send)] +impl SyncServer for HTTPSyncClient { + async fn meta(&self) -> Result { let meta_in = MetaIn { sync_version: SYNC_VERSION, client_version: sync_client_version(), @@ -187,8 +200,8 @@ impl HTTPSyncClient { self.json_request_deserialized("meta", &meta_in).await } - pub(crate) async fn start( - &self, + async fn start( + &mut self, local_usn: Usn, minutes_west: Option, local_is_newer: bool, @@ -202,47 +215,92 @@ impl HTTPSyncClient { self.json_request_deserialized("start", &input).await } - pub(crate) async fn apply_graves(&self, chunk: Graves) -> Result<()> { + async fn apply_graves(&mut self, chunk: Graves) -> Result<()> { let input = ApplyGravesIn { chunk }; let resp = self.json_request("applyGraves", &input, false).await?; resp.error_for_status()?; Ok(()) } - pub(crate) async fn apply_changes( - &self, - changes: UnchunkedChanges, - ) -> Result { + async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result { let input = ApplyChangesIn { changes }; self.json_request_deserialized("applyChanges", &input).await } - pub(crate) async fn chunk(&self) -> Result { + async fn chunk(&mut self) -> Result { self.json_request_deserialized("chunk", &Empty {}).await } - pub(crate) async fn apply_chunk(&self, chunk: Chunk) -> Result<()> { + async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> { let input = ApplyChunkIn { chunk }; let resp = self.json_request("applyChunk", &input, false).await?; resp.error_for_status()?; Ok(()) } - pub(crate) async fn sanity_check(&self, client: SanityCheckCounts) -> Result { + async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result { let input = SanityCheckIn { client, full: true }; self.json_request_deserialized("sanityCheck2", &input).await } - pub(crate) async fn finish(&self) -> Result { + async fn finish(&mut self) -> Result { Ok(self.json_request_deserialized("finish", &Empty {}).await?) } - pub(crate) async fn abort(&self) -> Result<()> { + async fn abort(&mut self) -> Result<()> { let resp = self.json_request("abort", &Empty {}, false).await?; resp.error_for_status()?; Ok(()) } + async fn full_upload(mut self: Box, col_path: &Path, _can_consume: bool) -> Result<()> { + let file = tokio::fs::File::open(col_path).await?; + let total_bytes = file.metadata().await?.len() as usize; + let progress_fn = self + .full_sync_progress_fn + .take() + .expect("progress func was not set"); + let wrap1 = ProgressWrapper { + reader: file, + progress_fn, + progress: FullSyncProgress { + transferred_bytes: 0, + total_bytes, + }, + }; + let wrap2 = async_compression::stream::GzipEncoder::new(wrap1); + let body = Body::wrap_stream(wrap2); + self.upload_inner(body).await?; + + Ok(()) + } + + /// Download collection into a temporary file, returning it. + /// Caller should persist the file in the correct path after checking it. + /// Progress func must be set first. + async fn full_download(mut self: Box, folder: &Path) -> Result { + let mut temp_file = NamedTempFile::new_in(folder)?; + let (size, mut stream) = self.download_inner().await?; + let mut progress = FullSyncProgress { + transferred_bytes: 0, + total_bytes: size, + }; + let mut progress_fn = self + .full_sync_progress_fn + .take() + .expect("progress func was not set"); + while let Some(chunk) = stream.next().await { + let chunk = chunk?; + temp_file.write_all(&chunk)?; + progress.transferred_bytes += chunk.len(); + progress_fn(progress, true); + } + progress_fn(progress, false); + Ok(temp_file) + } +} + +impl HTTPSyncClient { async fn download_inner( &self, ) -> Result<( @@ -254,33 +312,6 @@ impl HTTPSyncClient { Ok((len as usize, resp.bytes_stream())) } - /// Download collection into a temporary file, returning it. - /// Caller should persist the file in the correct path after checking it. - pub(crate) async fn download

( - &self, - folder: &Path, - mut progress_fn: P, - ) -> Result - where - P: FnMut(FullSyncProgress, bool), - { - let mut temp_file = NamedTempFile::new_in(folder)?; - let (size, mut stream) = self.download_inner().await?; - let mut progress = FullSyncProgress { - transferred_bytes: 0, - total_bytes: size, - }; - while let Some(chunk) = stream.next().await { - let chunk = chunk?; - temp_file.write_all(&chunk)?; - progress.transferred_bytes += chunk.len(); - progress_fn(progress, true); - } - progress_fn(progress, false); - - Ok(temp_file) - } - async fn upload_inner(&self, body: Body) -> Result<()> { let data_part = multipart::Part::stream(body); let resp = self.request("upload", data_part, true).await?; @@ -295,27 +326,6 @@ impl HTTPSyncClient { Ok(()) } } - - pub(crate) async fn upload

(&mut self, col_path: &Path, progress_fn: P) -> Result<()> - where - P: FnMut(FullSyncProgress, bool) + Send + Sync + 'static, - { - let file = tokio::fs::File::open(col_path).await?; - let total_bytes = file.metadata().await?.len() as usize; - let wrap1 = ProgressWrapper { - reader: file, - progress_fn, - progress: FullSyncProgress { - transferred_bytes: 0, - total_bytes, - }, - }; - let wrap2 = async_compression::stream::GzipEncoder::new(wrap1); - let body = Body::wrap_stream(wrap2); - self.upload_inner(body).await?; - - Ok(()) - } } use futures::{ @@ -380,7 +390,7 @@ mod test { use tokio::runtime::Runtime; async fn http_client_inner(username: String, password: String) -> Result<()> { - let mut syncer = HTTPSyncClient::new(None, 0); + let mut syncer = Box::new(HTTPSyncClient::new(None, 0)); assert!(matches!( syncer.login("nosuchuser", "nosuchpass").await, @@ -445,17 +455,16 @@ mod test { use tempfile::tempdir; let dir = tempdir()?; - let out_path = syncer - .download(&dir.path(), |progress, _throttle| { - println!("progress: {:?}", progress); - }) - .await?; + syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| { + println!("progress: {:?}", progress); + }))); + let out_path = syncer.full_download(&dir.path()).await?; - syncer - .upload(&out_path.path(), |progress, _throttle| { - println!("progress {:?}", progress); - }) - .await?; + let mut syncer = Box::new(HTTPSyncClient::new(None, 0)); + syncer.set_full_sync_progress_fn(Some(Box::new(|progress, _throttle| { + println!("progress {:?}", progress); + }))); + syncer.full_upload(&out_path.path(), false).await?; Ok(()) } diff --git a/rslib/src/sync/mod.rs b/rslib/src/sync/mod.rs index d469b273d..6f861a489 100644 --- a/rslib/src/sync/mod.rs +++ b/rslib/src/sync/mod.rs @@ -2,6 +2,7 @@ // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html mod http_client; +mod server; use crate::{ backend_proto::{sync_status_out, SyncStatusOut}, @@ -14,12 +15,14 @@ use crate::{ prelude::*, revlog::RevlogEntry, serde::{default_on_invalid, deserialize_int_from_number}, + storage::open_and_check_sqlite_file, tags::{join_tags, split_tags}, version::sync_client_version, }; use flate2::write::GzEncoder; use flate2::Compression; use futures::StreamExt; +pub use http_client::FullSyncProgressFn; use http_client::HTTPSyncClient; pub use http_client::Timeouts; use itertools::Itertools; @@ -27,6 +30,7 @@ use reqwest::{multipart, Client, Response}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; use serde_tuple::Serialize_tuple; +use server::SyncServer; use std::io::prelude::*; use std::{collections::HashMap, path::Path, time::Duration}; use tempfile::NamedTempFile; @@ -174,7 +178,7 @@ enum SanityCheckStatus { Bad, } -#[derive(Serialize_tuple, Deserialize, Debug)] +#[derive(Serialize_tuple, Deserialize, Debug, PartialEq)] pub struct SanityCheckCounts { pub counts: SanityCheckDueCounts, pub cards: u32, @@ -187,7 +191,7 @@ pub struct SanityCheckCounts { pub deck_config: u32, } -#[derive(Serialize_tuple, Deserialize, Debug, Default)] +#[derive(Serialize_tuple, Deserialize, Debug, Default, PartialEq)] pub struct SanityCheckDueCounts { pub new: u32, pub learn: u32, @@ -221,6 +225,7 @@ struct SyncState { host_number: u32, } +#[derive(Debug)] pub struct SyncOutput { pub required: SyncActionRequired, pub server_message: String, @@ -235,7 +240,7 @@ pub struct SyncAuth { struct NormalSyncer<'a, F> { col: &'a mut Collection, - remote: HTTPSyncClient, + remote: Box, progress: NormalSyncProgress, progress_fn: F, } @@ -292,14 +297,17 @@ impl NormalSyncer<'_, F> where F: FnMut(NormalSyncProgress, bool), { - /// Create a new syncing instance. If host_number is unavailable, use 0. - pub fn new(col: &mut Collection, auth: SyncAuth, progress_fn: F) -> NormalSyncer<'_, F> + pub fn new( + col: &mut Collection, + server: Box, + progress_fn: F, + ) -> NormalSyncer<'_, F> where F: FnMut(NormalSyncProgress, bool), { NormalSyncer { col, - remote: HTTPSyncClient::new(Some(auth.hkey), auth.host_number), + remote: server, progress: NormalSyncProgress::default(), progress_fn, } @@ -347,6 +355,7 @@ where async fn get_sync_state(&self) -> Result { let remote: SyncMeta = self.remote.meta().await?; + debug!(self.col.log, "remote {:?}", &remote); if !remote.should_continue { debug!(self.col.log, "server says abort"; "message"=>&remote.server_message); return Err(AnkiError::SyncError { @@ -356,6 +365,7 @@ where } let local = self.col.sync_meta()?; + debug!(self.col.log, "local {:?}", &local); let delta = remote.current_time.0 - local.current_time.0; if delta.abs() > 300 { debug!(self.col.log, "clock off"; "delta"=>delta); @@ -553,7 +563,7 @@ where } } - async fn finalize(&self, state: &SyncState) -> Result<()> { + async fn finalize(&mut self, state: &SyncState) -> Result<()> { let new_server_mtime = self.remote.finish().await?; self.col.finalize_sync(state, new_server_mtime) } @@ -595,7 +605,7 @@ pub async fn sync_login(username: &str, password: &str) -> Result { } pub async fn sync_abort(hkey: String, host_number: u32) -> Result<()> { - let remote = HTTPSyncClient::new(Some(hkey), host_number); + let mut remote = HTTPSyncClient::new(Some(hkey), host_number); remote.abort().await } @@ -624,45 +634,53 @@ impl Collection { Ok(self.sync_meta()?.compared_to_remote(remote).required.into()) } + /// Create a new syncing instance. If host_number is unavailable, use 0. pub async fn normal_sync(&mut self, auth: SyncAuth, progress_fn: F) -> Result where F: FnMut(NormalSyncProgress, bool), { - NormalSyncer::new(self, auth, progress_fn).sync().await + NormalSyncer::new( + self, + Box::new(HTTPSyncClient::new(Some(auth.hkey), auth.host_number)), + progress_fn, + ) + .sync() + .await } /// Upload collection to AnkiWeb. Caller must re-open afterwards. - pub async fn full_upload(mut self, auth: SyncAuth, progress_fn: F) -> Result<()> - where - F: FnMut(FullSyncProgress, bool) + Send + Sync + 'static, - { + pub async fn full_upload(self, auth: SyncAuth, progress_fn: FullSyncProgressFn) -> Result<()> { + let mut server = HTTPSyncClient::new(Some(auth.hkey), auth.host_number); + server.set_full_sync_progress_fn(Some(progress_fn)); + self.full_upload_inner(Box::new(server)).await + // remote.upload(&col_path, progress_fn).await?; + } + + pub(crate) async fn full_upload_inner(mut self, server: Box) -> Result<()> { self.before_upload()?; let col_path = self.col_path.clone(); self.close(true)?; - let mut remote = HTTPSyncClient::new(Some(auth.hkey), auth.host_number); - remote.upload(&col_path, progress_fn).await?; - Ok(()) + server.full_upload(&col_path, false).await } /// Download collection from AnkiWeb. Caller must re-open afterwards. - pub async fn full_download(self, auth: SyncAuth, progress_fn: F) -> Result<()> - where - F: FnMut(FullSyncProgress, bool), - { + pub async fn full_download( + self, + auth: SyncAuth, + progress_fn: FullSyncProgressFn, + ) -> Result<()> { + let mut server = HTTPSyncClient::new(Some(auth.hkey), auth.host_number); + server.set_full_sync_progress_fn(Some(progress_fn)); + self.full_download_inner(Box::new(server)).await + } + + pub(crate) async fn full_download_inner(self, server: Box) -> Result<()> { let col_path = self.col_path.clone(); let folder = col_path.parent().unwrap(); self.close(false)?; - let remote = HTTPSyncClient::new(Some(auth.hkey), auth.host_number); - let out_file = remote.download(folder, progress_fn).await?; + let out_file = server.full_download(folder).await?; // check file ok - let db = rusqlite::Connection::open(out_file.path())?; - let check_result: String = db.pragma_query_value(None, "integrity_check", |r| r.get(0))?; - if check_result != "ok" { - return Err(AnkiError::SyncError { - info: "download corrupt".into(), - kind: SyncErrorKind::Other, - }); - } + let db = open_and_check_sqlite_file(out_file.path())?; db.execute_batch("update col set ls=mod")?; drop(db); // overwrite existing collection atomically @@ -683,11 +701,11 @@ impl Collection { server_message: "".into(), should_continue: true, host_number: 0, - empty: self.storage.have_at_least_one_card()?, + empty: !self.storage.have_at_least_one_card()?, }) } - fn apply_graves(&self, graves: Graves, latest_usn: Usn) -> Result<()> { + pub fn apply_graves(&self, graves: Graves, latest_usn: Usn) -> Result<()> { for nid in graves.notes { self.storage.remove_note(nid)?; self.storage.add_note_grave(nid, latest_usn)?; @@ -896,6 +914,9 @@ impl Collection { // Remote->local chunks //---------------------------------------------------------------- + /// pending_usn is used to decide whether the local objects are newer. + /// If the provided objects are not modified locally, the USN inside + /// the individual objects is used. fn apply_chunk(&mut self, chunk: Chunk, pending_usn: Usn) -> Result<()> { self.merge_revlog(chunk.revlog)?; self.merge_cards(chunk.cards, pending_usn)?; @@ -1173,6 +1194,10 @@ impl From for sync_status_out::Required { #[cfg(test)] mod test { + use async_trait::async_trait; + use lazy_static::lazy_static; + + use super::server::LocalServer; use super::*; use crate::log; use crate::{ @@ -1186,40 +1211,145 @@ mod test { fn full_progress(_: FullSyncProgress, _: bool) {} - struct TestContext { - dir: TempDir, - auth: SyncAuth, - col1: Option, - col2: Option, + #[test] + /// Run remote tests if hkey provided in environment; otherwise local. + fn syncing() -> Result<()> { + let ctx: Box = if let Ok(hkey) = std::env::var("TEST_HKEY") { + Box::new(RemoteTestContext { + auth: SyncAuth { + hkey, + host_number: 0, + }, + }) + } else { + Box::new(LocalTestContext {}) + }; + let mut rt = Runtime::new().unwrap(); + rt.block_on(upload_download(&ctx))?; + rt.block_on(regular_sync(&ctx)) } - fn open_col(ctx: &TestContext, fname: &str) -> Result { - let path = ctx.dir.path().join(fname); + fn open_col(dir: &Path, server: bool, fname: &str) -> Result { + let path = dir.join(fname); let i18n = I18n::new(&[""], "", log::terminal()); - open_collection(path, "".into(), "".into(), false, i18n, log::terminal()) + open_collection(path, "".into(), "".into(), server, i18n, log::terminal()) } - async fn upload_download(ctx: &mut TestContext) -> Result<()> { - // add a card - let mut col1 = open_col(ctx, "col1.anki2")?; - let nt = col1.get_notetype_by_name("Basic")?.unwrap(); + #[async_trait(?Send)] + trait TestContext { + fn server(&self) -> Box; + + fn col1(&self) -> Collection { + open_col(self.dir(), false, "col1.anki2").unwrap() + } + + fn col2(&self) -> Collection { + open_col(self.dir(), false, "col2.anki2").unwrap() + } + + fn dir(&self) -> &Path { + lazy_static! { + static ref DIR: TempDir = tempdir().unwrap(); + } + DIR.path() + } + + async fn normal_sync(&self, col: &mut Collection) -> SyncOutput { + NormalSyncer::new(col, self.server(), norm_progress) + .sync() + .await + .unwrap() + } + + async fn full_upload(&self, col: Collection) { + col.full_upload_inner(self.server()).await.unwrap() + } + + async fn full_download(&self, col: Collection) { + col.full_download_inner(self.server()).await.unwrap() + } + } + + // Local specifics + ///////////////////// + + struct LocalTestContext {} + + #[async_trait(?Send)] + impl TestContext for LocalTestContext { + fn server(&self) -> Box { + let col = open_col(self.dir(), true, "server.anki2").unwrap(); + Box::new(LocalServer::new(col)) + } + } + + // Remote specifics + ///////////////////// + + struct RemoteTestContext { + auth: SyncAuth, + } + + impl RemoteTestContext { + fn server_inner(&self) -> HTTPSyncClient { + let auth = self.auth.clone(); + HTTPSyncClient::new(Some(auth.hkey), auth.host_number) + } + } + + #[async_trait(?Send)] + impl TestContext for RemoteTestContext { + fn server(&self) -> Box { + Box::new(self.server_inner()) + } + + async fn full_upload(&self, col: Collection) { + let mut server = self.server_inner(); + server.set_full_sync_progress_fn(Some(Box::new(full_progress))); + col.full_upload_inner(Box::new(server)).await.unwrap() + } + + async fn full_download(&self, col: Collection) { + let mut server = self.server_inner(); + server.set_full_sync_progress_fn(Some(Box::new(full_progress))); + col.full_download_inner(Box::new(server)).await.unwrap() + } + } + + // Setup + full syncs + ///////////////////// + + fn col1_setup(col: &mut Collection) { + let nt = col.get_notetype_by_name("Basic").unwrap().unwrap(); let mut note = nt.new_note(); note.fields[0] = "1".into(); - col1.add_note(&mut note, DeckID(1))?; + col.add_note(&mut note, DeckID(1)).unwrap(); - let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?; + // // set our schema time back, so when initial server + // // col is created, it's not identical + // col.storage + // .db + // .execute_batch("update col set scm = 123") + // .unwrap() + } + + async fn upload_download(ctx: &Box) -> Result<()> { + let mut col1 = ctx.col1(); + col1_setup(&mut col1); + + let out = ctx.normal_sync(&mut col1).await; assert!(matches!( out.required, SyncActionRequired::FullSyncRequired { .. } )); - col1.full_upload(ctx.auth.clone(), full_progress).await?; + ctx.full_upload(col1).await; // another collection - let mut col2 = open_col(ctx, "col2.anki2")?; + let mut col2 = ctx.col2(); // won't allow ankiweb clobber - let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col2).await; assert_eq!( out.required, SyncActionRequired::FullSyncRequired { @@ -1229,20 +1359,19 @@ mod test { ); // fetch so we're in sync - col2.full_download(ctx.auth.clone(), full_progress).await?; - - // reopen the two collections - ctx.col1 = Some(open_col(ctx, "col1.anki2")?); - ctx.col2 = Some(open_col(ctx, "col2.anki2")?); + ctx.full_download(col2).await; Ok(()) } - async fn regular_sync(ctx: &mut TestContext) -> Result<()> { - let col1 = ctx.col1.as_mut().unwrap(); - let col2 = ctx.col2.as_mut().unwrap(); + // Regular syncs + ///////////////////// + async fn regular_sync(ctx: &Box) -> Result<()> { // add a deck + let mut col1 = ctx.col1(); + let mut col2 = ctx.col2(); + let mut deck = col1.get_or_create_normal_deck("new deck")?; // give it a new option group @@ -1280,15 +1409,15 @@ mod test { // col1.storage.set_creation_stamp(TimestampSecs(12345))?; // and sync our changes - let remote = get_remote_sync_meta(ctx.auth.clone()).await?; - let out = col1.get_sync_status(remote)?; + let remote_meta = ctx.server().meta().await.unwrap(); + let out = col1.get_sync_status(remote_meta)?; assert_eq!(out, sync_status_out::Required::NormalSync); - let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col1).await; assert_eq!(out.required, SyncActionRequired::NoChanges); // sync the other collection - let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col2).await; assert_eq!(out.required, SyncActionRequired::NoChanges); let ntid = nt.id; @@ -1329,7 +1458,7 @@ mod test { ); assert_eq!( col1.storage.creation_stamp()?, - col1.storage.creation_stamp()? + col2.storage.creation_stamp()? ); // server doesn't send tag usns, so we can only compare tags, not usns, @@ -1351,7 +1480,7 @@ mod test { }; // make sure everything has been transferred across - compare_sides(col1, col2)?; + compare_sides(&mut col1, &mut col2)?; // make some modifications let mut note = col2.storage.get_note(note.id)?.unwrap(); @@ -1373,13 +1502,13 @@ mod test { col2.update_notetype(&mut nt, false)?; // sync the changes back - let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col2).await; assert_eq!(out.required, SyncActionRequired::NoChanges); - let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col1).await; assert_eq!(out.required, SyncActionRequired::NoChanges); // should still match - compare_sides(col1, col2)?; + compare_sides(&mut col1, &mut col2)?; // deletions should sync too for table in &["cards", "notes", "decks"] { @@ -1392,12 +1521,13 @@ mod test { // fixme: inconsistent usn arg col1.remove_cards_and_orphaned_notes(&[cardid])?; - col1.remove_note_only(noteid, col1.usn()?)?; + let usn = col1.usn()?; + col1.remove_note_only(noteid, usn)?; col1.remove_deck_and_child_decks(deckid)?; - let out: SyncOutput = col1.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col1).await; assert_eq!(out.required, SyncActionRequired::NoChanges); - let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col2).await; assert_eq!(out.required, SyncActionRequired::NoChanges); for table in &["cards", "notes", "decks"] { @@ -1410,32 +1540,8 @@ mod test { // removing things like a notetype forces a full sync col2.remove_notetype(ntid)?; - let out: SyncOutput = col2.normal_sync(ctx.auth.clone(), norm_progress).await?; + let out = ctx.normal_sync(&mut col2).await; assert!(matches!(out.required, SyncActionRequired::FullSyncRequired { .. })); Ok(()) } - - #[test] - fn collection_sync() -> Result<()> { - let hkey = match std::env::var("TEST_HKEY") { - Ok(s) => s, - Err(_) => { - return Ok(()); - } - }; - - let mut ctx = TestContext { - dir: tempdir()?, - auth: SyncAuth { - hkey, - host_number: 0, - }, - col1: None, - col2: None, - }; - - let mut rt = Runtime::new().unwrap(); - rt.block_on(upload_download(&mut ctx))?; - rt.block_on(regular_sync(&mut ctx)) - } } diff --git a/rslib/src/sync/server.rs b/rslib/src/sync/server.rs new file mode 100644 index 000000000..0fc18428b --- /dev/null +++ b/rslib/src/sync/server.rs @@ -0,0 +1,196 @@ +use std::{fs, path::Path}; + +use crate::{ + prelude::*, + storage::open_and_check_sqlite_file, + sync::{ + Chunk, Graves, SanityCheckCounts, SanityCheckOut, SanityCheckStatus, SyncMeta, + UnchunkedChanges, Usn, + }, +}; +use async_trait::async_trait; +use tempfile::NamedTempFile; + +use super::ChunkableIDs; +#[async_trait(?Send)] +pub trait SyncServer { + async fn meta(&self) -> Result; + async fn start( + &mut self, + client_usn: Usn, + minutes_west: Option, + local_is_newer: bool, + ) -> Result; + async fn apply_graves(&mut self, client_chunk: Graves) -> Result<()>; + async fn apply_changes(&mut self, client_changes: UnchunkedChanges) + -> Result; + async fn chunk(&mut self) -> Result; + async fn apply_chunk(&mut self, client_chunk: Chunk) -> Result<()>; + async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result; + async fn finish(&mut self) -> Result; + async fn abort(&mut self) -> Result<()>; + + /// If `can_consume` is true, the local server will move or remove the file, instead + /// creating a copy. The remote server ignores this argument. + async fn full_upload(self: Box, col_path: &Path, can_consume: bool) -> Result<()>; + async fn full_download(self: Box, folder: &Path) -> Result; +} + +pub struct LocalServer { + col: Collection, + + // The current sync protocol is stateful, so unfortunately we need to + // retain a bunch of information across requests. These are set either + // on start, or on subsequent methods. + server_usn: Usn, + client_usn: Usn, + /// Only used to determine whether we should send our + /// config to client. + client_is_newer: bool, + /// Set on the first call to chunk() + server_chunk_ids: Option, +} + +impl LocalServer { + #[allow(dead_code)] + pub fn new(col: Collection) -> LocalServer { + assert!(col.server); + LocalServer { + col, + server_usn: Usn(0), + client_usn: Usn(0), + client_is_newer: false, + server_chunk_ids: None, + } + } +} + +#[async_trait(?Send)] +impl SyncServer for LocalServer { + async fn meta(&self) -> Result { + Ok(SyncMeta { + modified: self.col.storage.get_modified_time()?, + schema: self.col.storage.get_schema_mtime()?, + usn: self.col.storage.usn(true)?, + current_time: TimestampSecs::now(), + server_message: String::new(), + should_continue: true, + host_number: 0, + empty: !self.col.storage.have_at_least_one_card()?, + }) + } + + async fn start( + &mut self, + client_usn: Usn, + minutes_west: Option, + client_is_newer: bool, + ) -> Result { + self.server_usn = self.col.usn()?; + self.client_usn = client_usn; + self.client_is_newer = client_is_newer; + + self.col.storage.begin_rust_trx()?; + if let Some(mins) = minutes_west { + self.col.set_local_mins_west(mins)?; + } + self.col.storage.pending_graves(client_usn) + } + + async fn apply_graves(&mut self, client_chunk: Graves) -> Result<()> { + self.col.apply_graves(client_chunk, self.server_usn) + } + + async fn apply_changes( + &mut self, + client_changes: UnchunkedChanges, + ) -> Result { + let server_changes = + self.col + .local_unchunked_changes(self.client_usn, None, !self.client_is_newer)?; + self.col.apply_changes(client_changes, self.server_usn)?; + Ok(server_changes) + } + + async fn chunk(&mut self) -> Result { + if self.server_chunk_ids.is_none() { + self.server_chunk_ids = Some(self.col.get_chunkable_ids(self.client_usn)?); + } + + self.col + .get_chunk(self.server_chunk_ids.as_mut().unwrap(), None) + } + + async fn apply_chunk(&mut self, client_chunk: Chunk) -> Result<()> { + self.col.apply_chunk(client_chunk, self.client_usn) + } + + async fn sanity_check(&mut self, mut client: SanityCheckCounts) -> Result { + client.counts = Default::default(); + let server = self.col.storage.sanity_check_info()?; + Ok(SanityCheckOut { + status: if client == server { + SanityCheckStatus::Ok + } else { + SanityCheckStatus::Bad + }, + client: Some(client), + server: Some(server), + }) + } + + async fn finish(&mut self) -> Result { + let now = TimestampMillis::now(); + self.col.storage.set_modified_time(now)?; + self.col.storage.set_last_sync(now)?; + self.col.storage.increment_usn()?; + self.col.storage.commit_rust_trx()?; + Ok(now) + } + + async fn abort(&mut self) -> Result<()> { + self.col.storage.rollback_rust_trx() + } + + /// `col_path` should point to the uploaded file, and the caller is + /// responsible for imposing limits on its size if it wishes. + /// If `can_consume` is true, the provided file will be moved into place, + /// or removed on failure. If false, the original will be left alone. + async fn full_upload( + mut self: Box, + mut col_path: &Path, + can_consume: bool, + ) -> Result<()> { + // create a copy if necessary + let new_file: NamedTempFile; + if !can_consume { + new_file = NamedTempFile::new()?; + fs::copy(col_path, &new_file.path())?; + col_path = new_file.path(); + } + + open_and_check_sqlite_file(col_path).map_err(|check_err| { + match fs::remove_file(col_path) { + Ok(_) => check_err, + Err(remove_err) => remove_err.into(), + } + })?; + + let target_col_path = self.col.col_path.clone(); + self.col.close(false)?; + fs::rename(col_path, &target_col_path).map_err(Into::into) + } + + async fn full_download(mut self: Box, output_folder: &Path) -> Result { + // bump usn/mod & close + self.col.transact(None, |col| col.storage.increment_usn())?; + let col_path = self.col.col_path.clone(); + self.col.close(true)?; + + // copy file and return path + let temp_file = NamedTempFile::new_in(output_folder)?; + fs::copy(&col_path, temp_file.path())?; + + Ok(temp_file) + } +}