diff --git a/proto/backend.proto b/proto/backend.proto index 00763dede..f9fcbd689 100644 --- a/proto/backend.proto +++ b/proto/backend.proto @@ -157,10 +157,13 @@ service BackendService { // sync rpc SyncMedia (SyncMediaIn) returns (Empty); - rpc AbortMediaSync (Empty) returns (Empty); + rpc AbortSync (Empty) returns (Empty); rpc BeforeUpload (Empty) returns (Empty); - rpc SyncLogin (SyncLoginIn) returns (SyncLoginOut); - rpc SyncCollection (SyncCollectionIn) returns (SyncCollectionOut); + rpc SyncLogin (SyncLoginIn) returns (SyncAuth); + rpc SyncStatus (SyncAuth) returns (SyncCollectionOut); + rpc SyncCollection (SyncAuth) returns (SyncCollectionOut); + rpc FullUpload (SyncAuth) returns (Empty); + rpc FullDownload (SyncAuth) returns (Empty); // translation/messages @@ -449,6 +452,7 @@ message Progress { oneof value { MediaSyncProgress media_sync = 1; string media_check = 2; + FullSyncProgress full_sync = 3; } } @@ -484,6 +488,11 @@ message MediaSyncProgress { string removed = 3; } +message FullSyncProgress { + uint32 transferred = 1; + uint32 total = 2; +} + message MediaSyncUploadProgress { uint32 files = 1; uint32 deletions = 2; @@ -884,31 +893,23 @@ message SyncLoginIn { string password = 2; } -message SyncLoginOut { - string hkey = 1; -} - -message SyncCollectionIn { - enum SyncAction { - CHECK_ONLY = 0; - NORMAL_SYNC = 1; - UPLOAD = 2; - DOWNLOAD = 3; - } - - string hkey = 1; - uint32 host_number = 2; - SyncAction action = 3; -} - message SyncCollectionOut { enum ChangesRequired { NO_CHANGES = 0; NORMAL_SYNC = 1; FULL_SYNC = 2; + // local collection has no cards; upload not an option + FULL_DOWNLOAD = 3; + // remote collection has no cards; download not an option + FULL_UPLOAD = 4; } - string host_number = 1; + uint32 host_number = 1; string server_message = 2; ChangesRequired required = 3; } + +message SyncAuth { + string hkey = 1; + uint32 host_number = 2; +} diff --git a/qt/aqt/mediasync.py b/qt/aqt/mediasync.py index 7a81b6923..d551fb45d 100644 --- a/qt/aqt/mediasync.py +++ b/qt/aqt/mediasync.py @@ -123,7 +123,7 @@ class MediaSyncer: return self._log_and_notify(tr(TR.SYNC_MEDIA_ABORTING)) self._want_stop = True - self.mw.col.backend.abort_media_sync() + self.mw.col.backend.abort_sync() def is_syncing(self) -> bool: return self._syncing diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index 3e67c20ea..02f606f82 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -33,6 +33,7 @@ use crate::{ sched::cutoff::local_minutes_west_for_stamp, sched::timespan::{answer_button_time, learning_congrats, studied_today, time_span}, search::SortMode, + sync::{sync_login, FullSyncProgress, SyncActionRequired, SyncAuth, SyncOutput}, template::RenderedNode, text::{extract_av_tags, strip_av_tags, AVTag}, timestamp::TimestampSecs, @@ -62,12 +63,13 @@ pub struct Backend { progress_callback: Option, i18n: I18n, server: bool, - media_sync_abort: Option, + sync_abort: Option, } enum Progress<'a> { MediaSync(&'a MediaSyncProgress), MediaCheck(u32), + FullSync(&'a FullSyncProgress), } /// Convert an Anki error to a protobuf error. @@ -924,15 +926,26 @@ impl BackendService for Backend { // sync //------------------------------------------------------------------- - fn sync_login(&mut self, input: pb::SyncLoginIn) -> BackendResult { - todo!() + fn sync_login(&mut self, input: pb::SyncLoginIn) -> BackendResult { + self.sync_login_inner(input) } - fn sync_collection( - &mut self, - input: pb::SyncCollectionIn, - ) -> BackendResult { - todo!() + fn sync_status(&mut self, input: pb::SyncAuth) -> BackendResult { + self.sync_collection_inner(input, true) + } + + fn sync_collection(&mut self, input: pb::SyncAuth) -> BackendResult { + self.sync_collection_inner(input, false) + } + + fn full_upload(&mut self, input: pb::SyncAuth) -> BackendResult { + self.full_sync_inner(input, true)?; + Ok(().into()) + } + + fn full_download(&mut self, input: pb::SyncAuth) -> BackendResult { + self.full_sync_inner(input, false)?; + Ok(().into()) } fn sync_media(&mut self, input: SyncMediaIn) -> BackendResult { @@ -954,8 +967,8 @@ impl BackendService for Backend { res.map(Into::into) } - fn abort_media_sync(&mut self, _input: Empty) -> BackendResult { - if let Some(handle) = self.media_sync_abort.take() { + fn abort_sync(&mut self, _input: Empty) -> BackendResult { + if let Some(handle) = self.sync_abort.take() { handle.abort(); } Ok(().into()) @@ -1090,7 +1103,7 @@ impl Backend { progress_callback: None, i18n, server, - media_sync_abort: None, + sync_abort: None, } } @@ -1148,7 +1161,7 @@ impl Backend { log: Logger, ) -> Result<()> { let (abort_handle, abort_reg) = AbortHandle::new_pair(); - self.media_sync_abort = Some(abort_handle); + self.sync_abort = Some(abort_handle); let callback = |progress: &MediaSyncProgress| { self.fire_progress_callback(Progress::MediaSync(progress)) @@ -1165,10 +1178,112 @@ impl Backend { Err(AnkiError::Interrupted) } }; - self.media_sync_abort = None; + self.sync_abort = None; ret } + fn sync_login_inner(&mut self, input: pb::SyncLoginIn) -> BackendResult { + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + self.sync_abort = Some(abort_handle); + + let mut rt = Runtime::new().unwrap(); + let sync_fut = sync_login(&input.username, &input.password); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + let ret = match rt.block_on(abortable_sync) { + Ok(sync_result) => sync_result, + Err(_) => Err(AnkiError::Interrupted), + }; + self.sync_abort = None; + ret.map(|a| pb::SyncAuth { + hkey: a.hkey, + host_number: a.host_number, + }) + } + + fn sync_collection_inner( + &mut self, + input: pb::SyncAuth, + check_only: bool, + ) -> BackendResult { + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + self.sync_abort = Some(abort_handle); + + let mut rt = Runtime::new().unwrap(); + + let ret = self.with_col(|col| { + let result = if check_only { + let sync_fut = col.get_sync_status(input.into()); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + rt.block_on(abortable_sync) + } else { + let sync_fut = col.normal_sync(input.into()); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + rt.block_on(abortable_sync) + }; + match result { + Ok(sync_result) => sync_result, + Err(_) => Err(AnkiError::Interrupted), + } + }); + self.sync_abort = None; + let output: SyncOutput = ret?; + Ok(output.into()) + } + + fn full_sync_inner(&mut self, input: pb::SyncAuth, upload: bool) -> Result<()> { + let mut col = self.col.lock().unwrap(); + if col.is_none() { + return Err(AnkiError::CollectionNotOpen); + } + if !col.as_ref().unwrap().can_close() { + return Err(AnkiError::invalid_input("can't close yet")); + } + + let col_inner = col.take().unwrap(); + + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + self.sync_abort = Some(abort_handle); + + let col_path = col_inner.col_path.clone(); + let media_folder_path = col_inner.media_folder.clone(); + let media_db_path = col_inner.media_db.clone(); + let logger = col_inner.log.clone(); + + // FIXME: throttle + let progress_fn = |progress: &FullSyncProgress| { + self.fire_progress_callback(Progress::FullSync(progress)); + }; + + let mut rt = Runtime::new().unwrap(); + + let result = if upload { + todo!() + // let sync_fut = col_inner.full_upload(input.into(), progress_fn); + // let abortable_sync = Abortable::new(sync_fut, abort_reg); + // rt.block_on(abortable_sync) + } else { + let sync_fut = col_inner.full_download(input.into(), progress_fn); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + rt.block_on(abortable_sync) + }; + self.sync_abort = None; + + // ensure re-opened regardless of outcome + col.replace(open_collection( + col_path, + media_folder_path, + media_db_path, + self.server, + self.i18n.clone(), + logger, + )?); + + match result { + Ok(sync_result) => sync_result, + Err(_) => Err(AnkiError::Interrupted), + } + } + pub fn db_command(&self, input: &[u8]) -> Result { self.with_col(|col| db_command_bytes(&col.storage, input)) } @@ -1230,6 +1345,10 @@ fn progress_to_proto_bytes(progress: Progress, i18n: &I18n) -> Vec { let s = i18n.trn(TR::MediaCheckChecked, tr_args!["count"=>n]); pb::progress::Value::MediaCheck(s) } + Progress::FullSync(p) => pb::progress::Value::FullSync(pb::FullSyncProgress { + transferred: p.transferred_bytes as u32, + total: p.total_bytes as u32, + }), }), }; @@ -1332,3 +1451,32 @@ impl From for pb::SchedTimingTodayOut { } } } + +impl From for pb::SyncCollectionOut { + fn from(o: SyncOutput) -> Self { + pb::SyncCollectionOut { + host_number: o.host_number, + server_message: o.server_message, + required: match o.required { + SyncActionRequired::NoChanges => { + pb::sync_collection_out::ChangesRequired::NoChanges as i32 + } + SyncActionRequired::FullSyncRequired => { + pb::sync_collection_out::ChangesRequired::FullSync as i32 + } + SyncActionRequired::NormalSyncRequired => { + pb::sync_collection_out::ChangesRequired::NormalSync as i32 + } + }, + } + } +} + +impl From for SyncAuth { + fn from(a: pb::SyncAuth) -> Self { + SyncAuth { + hkey: a.hkey, + host_number: a.host_number, + } + } +} diff --git a/rslib/src/storage/card/mod.rs b/rslib/src/storage/card/mod.rs index dc5a48158..378f1212a 100644 --- a/rslib/src/storage/card/mod.rs +++ b/rslib/src/storage/card/mod.rs @@ -123,7 +123,6 @@ impl super::SqliteStorage { /// Add or update card, using the provided ID. Used when syncing. pub(crate) fn add_or_update_card(&self, card: &Card) -> Result<()> { - let now = TimestampMillis::now().0; let mut stmt = self.db.prepare_cached(include_str!("add_or_update.sql"))?; stmt.execute(params![ card.id, @@ -226,11 +225,6 @@ impl super::SqliteStorage { new_usn: Usn, limit: usize, ) -> Result> { - let mut out = vec![]; - if limit == 0 { - return Ok(out); - } - let entries: Vec = self .db .prepare_cached(concat!( @@ -241,11 +235,12 @@ impl super::SqliteStorage { row_to_card(r).map(Into::into).map_err(Into::into) })? .collect::>()?; - - let ids: Vec<_> = entries.iter().map(|e| e.id).collect(); - self.db - .prepare_cached("update cards set usn=? where usn=-1")? - .execute(&[new_usn])?; + let mut stmt = self + .db + .prepare_cached("update cards set usn=? where id=?")?; + for entry in &entries { + stmt.execute(params![new_usn, entry.id])?; + } Ok(entries) } diff --git a/rslib/src/storage/mod.rs b/rslib/src/storage/mod.rs index 2cd8da66e..8560f777c 100644 --- a/rslib/src/storage/mod.rs +++ b/rslib/src/storage/mod.rs @@ -10,6 +10,7 @@ mod note; mod notetype; mod revlog; mod sqlite; +mod sync_check; mod tag; mod upgrades; diff --git a/rslib/src/storage/note/mod.rs b/rslib/src/storage/note/mod.rs index 1ffbccf21..1fe0ee6e6 100644 --- a/rslib/src/storage/note/mod.rs +++ b/rslib/src/storage/note/mod.rs @@ -83,7 +83,6 @@ impl super::SqliteStorage { /// Add or update the provided note, preserving ID. Used by the syncing code. pub(crate) fn add_or_update_note(&self, note: &Note) -> Result<()> { - let now = TimestampMillis::now().0; let mut stmt = self.db.prepare_cached(include_str!("add_or_update.sql"))?; stmt.execute(params![ note.id, @@ -139,21 +138,17 @@ impl super::SqliteStorage { new_usn: Usn, limit: usize, ) -> Result> { - let mut out = vec![]; - if limit == 0 { - return Ok(out); - } - let entries: Vec = self .db .prepare_cached(concat!(include_str!("get.sql"), " where usn=-1 limit ?"))? .query_and_then(&[limit as u32], |r| row_to_note(r).map(Into::into))? .collect::>()?; - - let ids: Vec<_> = entries.iter().map(|e| e.id).collect(); - self.db - .prepare_cached("update notes set usn=? where usn=-1")? - .execute(&[new_usn])?; + let mut stmt = self + .db + .prepare_cached("update notes set usn=? where id=?")?; + for entry in &entries { + stmt.execute(params![new_usn, entry.id])?; + } Ok(entries) } diff --git a/rslib/src/storage/revlog/mod.rs b/rslib/src/storage/revlog/mod.rs index e1faa00c9..84af63b61 100644 --- a/rslib/src/storage/revlog/mod.rs +++ b/rslib/src/storage/revlog/mod.rs @@ -43,11 +43,6 @@ impl SqliteStorage { new_usn: Usn, limit: usize, ) -> Result> { - let mut out = vec![]; - if limit == 0 { - return Ok(out); - } - let entries: Vec = self .db .prepare_cached(concat!(include_str!("get.sql"), " where usn=-1 limit ?"))? @@ -66,10 +61,12 @@ impl SqliteStorage { })? .collect::>()?; - let ids: Vec<_> = entries.iter().map(|e| e.id).collect(); - self.db - .prepare_cached("update revlog set usn=? where usn=-1")? - .execute(&[new_usn])?; + let mut stmt = self + .db + .prepare_cached("update revlog set usn=? where id=?")?; + for entry in &entries { + stmt.execute(params![new_usn, entry.id])?; + } Ok(entries) } diff --git a/rslib/src/storage/sync_check.rs b/rslib/src/storage/sync_check.rs new file mode 100644 index 000000000..80afc3ec4 --- /dev/null +++ b/rslib/src/storage/sync_check.rs @@ -0,0 +1,60 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use super::*; +use crate::{ + err::SyncErrorKind, + prelude::*, + sync::{SanityCheckCounts, SanityCheckDueCounts}, +}; +use rusqlite::NO_PARAMS; + +impl SqliteStorage { + fn table_has_usn(&self, table: &str) -> Result { + Ok(self + .db + .prepare(&format!("select null from {} where usn=-1", table))? + .query(NO_PARAMS)? + .next()? + .is_some()) + } + + fn table_count(&self, table: &str) -> Result { + self.db + .query_row(&format!("select count() from {}", table), NO_PARAMS, |r| { + r.get(0) + }) + .map_err(Into::into) + } + + pub(crate) fn sanity_check_info(&self) -> Result { + for table in &[ + "cards", + "notes", + "revlog", + "graves", + "decks", + "deck_config", + "tags", + "notetypes", + ] { + if self.table_has_usn(table)? { + return Err(AnkiError::SyncError { + info: format!("table had usn=-1: {}", table), + kind: SyncErrorKind::Other, + }); + } + } + + Ok(SanityCheckCounts { + counts: SanityCheckDueCounts::default(), + cards: self.table_count("cards")?, + notes: self.table_count("notes")?, + revlog: self.table_count("revlog")?, + graves: self.table_count("graves")?, + notetypes: self.table_count("notetypes")?, + decks: self.table_count("decks")?, + deck_config: self.table_count("deck_config")?, + }) + } +} diff --git a/rslib/src/sync/http_client.rs b/rslib/src/sync/http_client.rs index 089f61b8f..55ca3d672 100644 --- a/rslib/src/sync/http_client.rs +++ b/rslib/src/sync/http_client.rs @@ -69,20 +69,21 @@ struct ApplyChunkIn { #[derive(Serialize, Deserialize, Debug)] struct SanityCheckIn { client: SanityCheckCounts, + full: bool, } #[derive(Serialize)] struct Empty {} impl HTTPSyncClient { - pub fn new<'a>(hkey: Option, endpoint_suffix: &str) -> HTTPSyncClient { + pub fn new<'a>(hkey: Option, host_number: u32) -> HTTPSyncClient { let client = Client::builder() .connect_timeout(Duration::from_secs(30)) .timeout(Duration::from_secs(60)) .build() .unwrap(); let skey = guid(); - let endpoint = endpoint(&endpoint_suffix); + let endpoint = sync_endpoint(host_number); HTTPSyncClient { hkey, skey, @@ -204,7 +205,7 @@ impl HTTPSyncClient { } pub(crate) async fn sanity_check(&self, client: SanityCheckCounts) -> Result { - let input = SanityCheckIn { client }; + let input = SanityCheckIn { client, full: true }; self.json_request_deserialized("sanityCheck2", &input).await } @@ -319,10 +320,15 @@ where } } -fn endpoint(suffix: &str) -> String { +fn sync_endpoint(host_number: u32) -> String { if let Ok(endpoint) = std::env::var("SYNC_ENDPOINT") { endpoint } else { + let suffix = if host_number > 0 { + format!("{}", host_number) + } else { + "".to_string() + }; format!("https://sync{}.ankiweb.net/sync/", suffix) } } @@ -334,7 +340,7 @@ mod test { use tokio::runtime::Runtime; async fn http_client_inner(username: String, password: String) -> Result<()> { - let mut syncer = HTTPSyncClient::new(None, ""); + let mut syncer = HTTPSyncClient::new(None, 0); assert!(matches!( syncer.login("nosuchuser", "nosuchpass").await, @@ -346,7 +352,7 @@ mod test { assert!(syncer.login(&username, &password).await.is_ok()); - syncer.meta().await?; + let _meta = syncer.meta().await?; // aborting before a start is a conflict assert!(matches!( @@ -393,7 +399,8 @@ mod test { }) .await?; - syncer.finish().await?; + // failed sanity check will have cleaned up; can't finish + // syncer.finish().await?; use tempfile::tempdir; diff --git a/rslib/src/sync/mod.rs b/rslib/src/sync/mod.rs index e328d30ef..f2006e957 100644 --- a/rslib/src/sync/mod.rs +++ b/rslib/src/sync/mod.rs @@ -44,7 +44,9 @@ pub struct SyncMeta { #[serde(rename = "cont")] should_continue: bool, #[serde(rename = "hostNum")] - shard_number: u32, + host_number: u32, + #[serde(default)] + empty: bool, } #[derive(Serialize, Deserialize, Debug, Default)] @@ -159,68 +161,80 @@ enum SanityCheckStatus { #[derive(Serialize_tuple, Deserialize, Debug)] pub struct SanityCheckCounts { - counts: SanityCheckDueCounts, - cards: u32, - notes: u32, - revlog: u32, - graves: u32, + pub counts: SanityCheckDueCounts, + pub cards: u32, + pub notes: u32, + pub revlog: u32, + pub graves: u32, #[serde(rename = "models")] - notetypes: u32, - decks: u32, - deck_config: u32, + pub notetypes: u32, + pub decks: u32, + pub deck_config: u32, } -#[derive(Serialize_tuple, Deserialize, Debug)] +#[derive(Serialize_tuple, Deserialize, Debug, Default)] pub struct SanityCheckDueCounts { - new: u32, - learn: u32, - review: u32, + pub new: u32, + pub learn: u32, + pub review: u32, } #[derive(Debug, Default)] pub struct FullSyncProgress { - transferred_bytes: usize, - total_bytes: usize, + pub transferred_bytes: usize, + pub total_bytes: usize, } -pub enum SyncState { +#[derive(PartialEq)] +pub enum SyncActionRequired { NoChanges, FullSyncRequired, - NormalSyncRequired(NormalSyncMeta), + NormalSyncRequired, } -pub struct NormalSyncMeta { +struct SyncState { + required: SyncActionRequired, local_is_newer: bool, local_usn: Usn, remote_usn: Usn, server_message: String, - shard_number: u32, + host_number: u32, } -struct SyncDriver<'a> { +pub struct SyncOutput { + pub required: SyncActionRequired, + pub server_message: String, + pub host_number: u32, +} + +pub struct SyncAuth { + pub hkey: String, + pub host_number: u32, +} + +struct NormalSyncer<'a> { col: &'a mut Collection, remote: HTTPSyncClient, } -impl SyncDriver<'_> { - async fn from_login<'a>( - col: &'a mut Collection, - username: &str, - password: &str, - ) -> Result> { - let mut remote = HTTPSyncClient::new(None, ""); - remote.login(username, password).await?; - Ok(SyncDriver { col, remote }) +impl NormalSyncer<'_> { + /// Create a new syncing instance. If host_number is unavailable, use 0. + pub fn new<'a>(col: &'a mut Collection, auth: SyncAuth) -> NormalSyncer<'a> { + NormalSyncer { + col, + remote: HTTPSyncClient::new(Some(auth.hkey), auth.host_number), + } } - fn from_hkey<'a>( - col: &'a mut Collection, - hkey: String, - endpoint_suffix: &str, - ) -> SyncDriver<'a> { - SyncDriver { - col, - remote: HTTPSyncClient::new(Some(hkey), endpoint_suffix), + pub async fn sync(&mut self) -> Result { + let state: SyncState = self.get_sync_state().await?; + match state.required { + SyncActionRequired::NoChanges => Ok(state.into()), + SyncActionRequired::FullSyncRequired => Ok(state.into()), + SyncActionRequired::NormalSyncRequired => { + // fixme: transaction + self.normal_sync_inner(state).await + } } } @@ -242,61 +256,57 @@ impl SyncDriver<'_> { }); } - if remote.modified == local.modified { - return Ok(SyncState::NoChanges); - } + let required = if remote.modified == local.modified { + SyncActionRequired::NoChanges + } else if remote.schema != local.schema { + SyncActionRequired::FullSyncRequired + } else { + SyncActionRequired::NormalSyncRequired + }; - if remote.schema != local.schema { - return Ok(SyncState::FullSyncRequired); - } - - Ok(SyncState::NormalSyncRequired(NormalSyncMeta { + Ok(SyncState { + required, local_is_newer: local.modified > remote.modified, local_usn: local.usn, remote_usn: remote.usn, server_message: remote.server_message, - shard_number: remote.shard_number, - })) + host_number: remote.host_number, + }) } /// Sync. Caller must have created a transaction, and should call - /// abort on - pub(crate) async fn sync(&mut self, meta: NormalSyncMeta) -> Result<()> { - self.col.basic_check_for_sync()?; - self.start_and_process_deletions(&meta).await?; - self.process_unchunked_changes(meta.remote_usn, meta.local_is_newer) + /// abort on failure. + async fn normal_sync_inner(&mut self, mut state: SyncState) -> Result { + self.start_and_process_deletions(&state).await?; + self.process_unchunked_changes(state.remote_usn, state.local_is_newer) .await?; self.process_chunks_from_server().await?; - self.send_chunks_to_server(meta.remote_usn).await?; + self.send_chunks_to_server(state.remote_usn).await?; self.sanity_check().await?; - self.finalize(meta).await?; - Ok(()) - } - - /// Return the remote client for use in a full sync. - fn into_remote(self) -> HTTPSyncClient { - self.remote + self.finalize(&state).await?; + state.required = SyncActionRequired::NoChanges; + Ok(state.into()) } // The following operations assume a transaction has been set up. - async fn start_and_process_deletions(&self, meta: &NormalSyncMeta) -> Result<()> { + async fn start_and_process_deletions(&self, state: &SyncState) -> Result<()> { let removed_on_remote = self .remote .start( - meta.local_usn, + state.local_usn, self.col.get_local_mins_west(), - meta.local_is_newer, + state.local_is_newer, ) .await?; - let mut locally_removed = self.col.storage.take_pending_graves(meta.remote_usn)?; + let mut locally_removed = self.col.storage.take_pending_graves(state.remote_usn)?; while let Some(chunk) = locally_removed.take_chunk() { self.remote.apply_graves(chunk).await?; } - self.col.apply_graves(removed_on_remote, meta.local_usn)?; + self.col.apply_graves(removed_on_remote, state.local_usn)?; Ok(()) } @@ -343,7 +353,7 @@ impl SyncDriver<'_> { /// Caller should force full sync after rolling back. async fn sanity_check(&self) -> Result<()> { - let local_counts = self.col.sanity_check_info()?; + let local_counts = self.col.storage.sanity_check_info()?; let out: SanityCheckOut = self.remote.sanity_check(local_counts).await?; if out.status != SanityCheckStatus::Ok { Err(AnkiError::SyncError { @@ -355,9 +365,9 @@ impl SyncDriver<'_> { } } - async fn finalize(&self, meta: NormalSyncMeta) -> Result<()> { + async fn finalize(&self, state: &SyncState) -> Result<()> { let new_server_mtime = self.remote.finish().await?; - self.col.finalize_sync(meta, new_server_mtime) + self.col.finalize_sync(state, new_server_mtime) } } @@ -387,7 +397,70 @@ impl Graves { } } +pub async fn sync_login(username: &str, password: &str) -> Result { + let mut remote = HTTPSyncClient::new(None, 0); + remote.login(username, password).await?; + Ok(SyncAuth { + hkey: remote.hkey().to_string(), + host_number: 0, + }) +} + impl Collection { + // fixme: upload only, download only case + pub async fn get_sync_status(&mut self, auth: SyncAuth) -> Result { + NormalSyncer::new(self, auth) + .get_sync_state() + .await + .map(Into::into) + } + + pub async fn normal_sync(&mut self, auth: SyncAuth) -> Result { + // fixme: server abort on failure + NormalSyncer::new(self, auth).sync().await + } + + /// Upload collection to AnkiWeb. Caller must re-open afterwards. + pub async fn full_upload(mut self, auth: SyncAuth, progress_fn: F) -> Result<()> + where + F: Fn(&FullSyncProgress) + Send + Sync + 'static, + { + self.before_upload()?; + let col_path = self.col_path.clone(); + self.close(true)?; + let mut remote = HTTPSyncClient::new(Some(auth.hkey), auth.host_number); + remote.upload(&col_path, progress_fn).await?; + Ok(()) + } + + /// Download collection from AnkiWeb. Caller must re-open afterwards. + pub async fn full_download(self, auth: SyncAuth, progress_fn: F) -> Result<()> + where + F: Fn(&FullSyncProgress), + { + let col_path = self.col_path.clone(); + let folder = col_path.parent().unwrap(); + self.close(false)?; + let remote = HTTPSyncClient::new(Some(auth.hkey), auth.host_number); + let out_file = remote.download(folder, progress_fn).await?; + // check file ok + let db = rusqlite::Connection::open(out_file.path())?; + let check_result: String = db.pragma_query_value(None, "integrity_check", |r| r.get(0))?; + if check_result != "ok" { + return Err(AnkiError::SyncError { + info: "download corrupt".into(), + kind: SyncErrorKind::Other, + }); + } + // overwrite existing collection atomically + out_file + .persist(&col_path) + .map_err(|e| AnkiError::IOError { + info: format!("download save failed: {}", e), + })?; + Ok(()) + } + fn sync_meta(&self) -> Result { Ok(SyncMeta { modified: self.storage.get_modified_time()?, @@ -396,14 +469,11 @@ impl Collection { current_time: TimestampSecs::now(), server_message: "".into(), should_continue: true, - shard_number: 0, + host_number: 0, + empty: false, }) } - fn basic_check_for_sync(&self) -> Result<()> { - todo!(); - } - fn apply_graves(&self, graves: Graves, local_usn: Usn) -> Result<()> { for nid in graves.notes { self.storage.remove_note(nid)?; @@ -438,7 +508,7 @@ impl Collection { ..Default::default() }; if local_is_newer { - changes.config = Some(todo!()); + changes.config = Some(self.changed_config()?); changes.creation_stamp = Some(self.storage.creation_stamp()?); } @@ -476,7 +546,7 @@ impl Collection { /// Currently this is all config, as legacy clients overwrite the local items /// with the provided value. - fn changed_config(&self, new_usn: Usn) -> Result> { + fn changed_config(&self) -> Result> { let conf = self.storage.get_all_config()?; self.storage.clear_config_usns()?; Ok(conf) @@ -502,7 +572,7 @@ impl Collection { } fn merge_notetypes(&mut self, notetypes: Vec) -> Result<()> { - for mut nt in notetypes { + for nt in notetypes { let nt: NoteType = nt.into(); let proceed = if let Some(existing_nt) = self.storage.get_notetype(nt.id)? { if existing_nt.mtime_secs < nt.mtime_secs { @@ -530,7 +600,7 @@ impl Collection { } fn merge_decks(&mut self, decks: Vec) -> Result<()> { - for mut deck in decks { + for deck in decks { let proceed = if let Some(existing_deck) = self.storage.get_deck(deck.id())? { existing_deck.mtime_secs < deck.common().mtime } else { @@ -546,7 +616,7 @@ impl Collection { } fn merge_deck_config(&self, dconf: Vec) -> Result<()> { - for mut conf in dconf { + for conf in dconf { let proceed = if let Some(existing_conf) = self.storage.get_deck_config(conf.id)? { existing_conf.mtime_secs < conf.mtime } else { @@ -655,14 +725,9 @@ impl Collection { // Final steps //---------------------------------------------------------------- - fn sanity_check_info(&self) -> Result { - self.basic_check_for_sync()?; - todo!(); - } - - fn finalize_sync(&self, meta: NormalSyncMeta, new_server_mtime: TimestampMillis) -> Result<()> { + fn finalize_sync(&self, state: &SyncState, new_server_mtime: TimestampMillis) -> Result<()> { self.storage.set_last_sync(new_server_mtime)?; - let mut usn = meta.remote_usn; + let mut usn = state.remote_usn; usn.0 += 1; self.storage.set_usn(usn)?; self.storage.set_modified_time(new_server_mtime) @@ -752,3 +817,13 @@ impl From for NoteEntry { } } } + +impl From for SyncOutput { + fn from(s: SyncState) -> Self { + SyncOutput { + required: s.required, + server_message: s.server_message, + host_number: s.host_number, + } + } +} diff --git a/rspy/src/lib.rs b/rspy/src/lib.rs index 970db7bef..c423b27b0 100644 --- a/rspy/src/lib.rs +++ b/rspy/src/lib.rs @@ -91,7 +91,7 @@ fn want_release_gil(method: u32) -> bool { BackendMethod::RestoreTrash => true, BackendMethod::OpenCollection => true, BackendMethod::CloseCollection => true, - BackendMethod::AbortMediaSync => true, + BackendMethod::AbortSync => true, BackendMethod::BeforeUpload => true, BackendMethod::TranslateString => false, BackendMethod::FormatTimespan => false,