diff --git a/rslib/src/lib.rs b/rslib/src/lib.rs index 4a43ec4e4..8a9d4a319 100644 --- a/rslib/src/lib.rs +++ b/rslib/src/lib.rs @@ -22,6 +22,7 @@ pub mod notes; pub mod notetype; mod preferences; pub mod prelude; +pub mod revlog; pub mod sched; pub mod search; pub mod serde; diff --git a/rslib/src/prelude.rs b/rslib/src/prelude.rs index 352cde359..2cc165a95 100644 --- a/rslib/src/prelude.rs +++ b/rslib/src/prelude.rs @@ -9,6 +9,7 @@ pub use crate::{ err::{AnkiError, Result}, notes::NoteID, notetype::NoteTypeID, + revlog::RevlogID, timestamp::{TimestampMillis, TimestampSecs}, types::Usn, }; diff --git a/rslib/src/revlog.rs b/rslib/src/revlog.rs new file mode 100644 index 000000000..58ac235a4 --- /dev/null +++ b/rslib/src/revlog.rs @@ -0,0 +1,6 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use crate::define_newtype; + +define_newtype!(RevlogID, i64); diff --git a/rslib/src/storage/card/mod.rs b/rslib/src/storage/card/mod.rs index 31d1cdbfd..134689578 100644 --- a/rslib/src/storage/card/mod.rs +++ b/rslib/src/storage/card/mod.rs @@ -6,7 +6,6 @@ use crate::{ decks::DeckID, err::Result, notes::NoteID, - sync::CardEntry, timestamp::{TimestampMillis, TimestampSecs}, types::Usn, }; @@ -220,31 +219,6 @@ impl super::SqliteStorage { Ok(()) } - pub(crate) fn take_cards_pending_sync( - &self, - new_usn: Usn, - limit: usize, - ) -> Result> { - let entries: Vec = self - .db - .prepare_cached(concat!( - include_str!("get_card.sql"), - " where usn=-1 limit ?" - ))? - .query_and_then(&[limit as u32], |r| { - row_to_card(r).map(Into::into).map_err(Into::into) - })? - .collect::>()?; - let mut stmt = self - .db - .prepare_cached("update cards set usn=? where id=?")?; - for entry in &entries { - stmt.execute(params![new_usn, entry.id])?; - } - - Ok(entries) - } - pub(crate) fn have_at_least_one_card(&self) -> Result { self.db .prepare_cached("select null from cards")? diff --git a/rslib/src/storage/deck/mod.rs b/rslib/src/storage/deck/mod.rs index a6badf265..ceedeb770 100644 --- a/rslib/src/storage/deck/mod.rs +++ b/rslib/src/storage/deck/mod.rs @@ -9,7 +9,6 @@ use crate::{ decks::{Deck, DeckCommon, DeckID, DeckKindProto, DeckSchema11, DueCounts}, err::{AnkiError, DBErrorKind, Result}, i18n::{I18n, TR}, - prelude::*, timestamp::TimestampMillis, }; use prost::Message; @@ -233,19 +232,6 @@ impl SqliteStorage { Ok(()) } - pub(crate) fn take_deck_ids_pending_sync(&self, new_usn: Usn) -> Result> { - let ids: Vec = self - .db - .prepare("select id from decks where usn=-1")? - .query_and_then(NO_PARAMS, |r| r.get(0))? - .collect::>()?; - self.db - .prepare("update decks set usn=? where usn=-1")? - .execute(&[new_usn])?; - - Ok(ids) - } - // Upgrading/downgrading/legacy pub(super) fn add_default_deck(&self, i18n: &I18n) -> Result<()> { diff --git a/rslib/src/storage/deckconf/mod.rs b/rslib/src/storage/deckconf/mod.rs index 440811b1a..11cde03ec 100644 --- a/rslib/src/storage/deckconf/mod.rs +++ b/rslib/src/storage/deckconf/mod.rs @@ -6,7 +6,6 @@ use crate::{ deckconf::{DeckConf, DeckConfID, DeckConfSchema11, DeckConfigInner}, err::Result, i18n::{I18n, TR}, - prelude::*, }; use prost::Message; use rusqlite::{params, Row, NO_PARAMS}; @@ -103,22 +102,6 @@ impl SqliteStorage { Ok(()) } - pub(crate) fn take_deck_config_ids_pending_sync( - &self, - new_usn: Usn, - ) -> Result> { - let ids: Vec = self - .db - .prepare("select id from deck_config where usn=-1")? - .query_and_then(NO_PARAMS, |r| r.get(0))? - .collect::>()?; - self.db - .prepare("update deck_config set usn=? where usn=-1")? - .execute(&[new_usn])?; - - Ok(ids) - } - // Creating/upgrading/downgrading pub(super) fn add_default_deck_config(&self, i18n: &I18n) -> Result<()> { diff --git a/rslib/src/storage/graves/mod.rs b/rslib/src/storage/graves/mod.rs index d25799e55..5a18602bd 100644 --- a/rslib/src/storage/graves/mod.rs +++ b/rslib/src/storage/graves/mod.rs @@ -47,11 +47,12 @@ impl SqliteStorage { self.add_grave(did.0, GraveKind::Deck, usn) } - pub(crate) fn take_pending_graves(&self, new_usn: Usn) -> Result { - let mut stmt = self - .db - .prepare("select oid, type from graves where usn=-1")?; - let mut rows = stmt.query(NO_PARAMS)?; + pub(crate) fn pending_graves(&self, pending_usn: Usn) -> Result { + let mut stmt = self.db.prepare(&format!( + "select oid, type from graves where {}", + pending_usn.pending_object_clause() + ))?; + let mut rows = stmt.query(&[pending_usn])?; let mut graves = Graves::default(); while let Some(row) = rows.next()? { let oid: i64 = row.get(0)?; @@ -63,11 +64,14 @@ impl SqliteStorage { GraveKind::Deck => graves.decks.push(DeckID(oid)), } } + Ok(graves) + } + // fixme: graves is missing an index + pub(crate) fn update_pending_grave_usns(&self, new_usn: Usn) -> Result<()> { self.db .prepare("update graves set usn=? where usn=-1")? .execute(&[new_usn])?; - - Ok(graves) + Ok(()) } } diff --git a/rslib/src/storage/mod.rs b/rslib/src/storage/mod.rs index 8560f777c..f9749989f 100644 --- a/rslib/src/storage/mod.rs +++ b/rslib/src/storage/mod.rs @@ -10,6 +10,7 @@ mod note; mod notetype; mod revlog; mod sqlite; +mod sync; mod sync_check; mod tag; mod upgrades; diff --git a/rslib/src/storage/note/mod.rs b/rslib/src/storage/note/mod.rs index 1fe0ee6e6..36d4f4b16 100644 --- a/rslib/src/storage/note/mod.rs +++ b/rslib/src/storage/note/mod.rs @@ -5,8 +5,6 @@ use crate::{ err::Result, notes::{Note, NoteID}, notetype::NoteTypeID, - prelude::*, - sync::NoteEntry, tags::{join_tags, split_tags}, timestamp::TimestampMillis, }; @@ -132,24 +130,4 @@ impl super::SqliteStorage { .query_and_then(params![csum, ntid, nid], |r| r.get(0).map_err(Into::into))? .collect() } - - pub(crate) fn take_notes_pending_sync( - &self, - new_usn: Usn, - limit: usize, - ) -> Result> { - let entries: Vec = self - .db - .prepare_cached(concat!(include_str!("get.sql"), " where usn=-1 limit ?"))? - .query_and_then(&[limit as u32], |r| row_to_note(r).map(Into::into))? - .collect::>()?; - let mut stmt = self - .db - .prepare_cached("update notes set usn=? where id=?")?; - for entry in &entries { - stmt.execute(params![new_usn, entry.id])?; - } - - Ok(entries) - } } diff --git a/rslib/src/storage/notetype/mod.rs b/rslib/src/storage/notetype/mod.rs index f1f3fda08..9cacd93b8 100644 --- a/rslib/src/storage/notetype/mod.rs +++ b/rslib/src/storage/notetype/mod.rs @@ -10,7 +10,6 @@ use crate::{ NoteTypeConfig, }, notetype::{NoteType, NoteTypeID, NoteTypeSchema11}, - prelude::*, timestamp::TimestampMillis, }; use prost::Message; @@ -329,19 +328,6 @@ and ord in ", Ok(()) } - pub(crate) fn take_notetype_ids_pending_sync(&self, new_usn: Usn) -> Result> { - let ids: Vec = self - .db - .prepare("select id from notetypes where usn=-1")? - .query_and_then(NO_PARAMS, |r| r.get(0))? - .collect::>()?; - self.db - .prepare("update notetypes set usn=? where usn=-1")? - .execute(&[new_usn])?; - - Ok(ids) - } - // Upgrading/downgrading/legacy pub(crate) fn get_all_notetypes_as_schema11( diff --git a/rslib/src/storage/revlog/mod.rs b/rslib/src/storage/revlog/mod.rs index 84af63b61..e3712a7b4 100644 --- a/rslib/src/storage/revlog/mod.rs +++ b/rslib/src/storage/revlog/mod.rs @@ -38,15 +38,10 @@ impl SqliteStorage { Ok(()) } - pub(crate) fn take_revlog_pending_sync( - &self, - new_usn: Usn, - limit: usize, - ) -> Result> { - let entries: Vec = self - .db - .prepare_cached(concat!(include_str!("get.sql"), " where usn=-1 limit ?"))? - .query_and_then(&[limit as u32], |row| { + pub(crate) fn get_revlog_entry(&self, id: RevlogID) -> Result> { + self.db + .prepare_cached(concat!(include_str!("get.sql"), " where id=?"))? + .query_and_then(&[id], |row| { Ok(ReviewLogEntry { id: row.get(0)?, cid: row.get(1)?, @@ -59,15 +54,7 @@ impl SqliteStorage { kind: row.get(8)?, }) })? - .collect::>()?; - - let mut stmt = self - .db - .prepare_cached("update revlog set usn=? where id=?")?; - for entry in &entries { - stmt.execute(params![new_usn, entry.id])?; - } - - Ok(entries) + .next() + .transpose() } } diff --git a/rslib/src/storage/sqlite.rs b/rslib/src/storage/sqlite.rs index 2323c0f94..90305a3a3 100644 --- a/rslib/src/storage/sqlite.rs +++ b/rslib/src/storage/sqlite.rs @@ -5,7 +5,7 @@ use crate::config::schema11_config_as_string; use crate::err::Result; use crate::err::{AnkiError, DBErrorKind}; use crate::timestamp::{TimestampMillis, TimestampSecs}; -use crate::{i18n::I18n, sched::cutoff::v1_creation_date, text::without_combining, types::Usn}; +use crate::{i18n::I18n, sched::cutoff::v1_creation_date, text::without_combining}; use regex::Regex; use rusqlite::{functions::FunctionFlags, params, Connection, NO_PARAMS}; use std::cmp::Ordering; @@ -277,31 +277,6 @@ impl SqliteStorage { .map_err(Into::into) } - pub(crate) fn usn(&self, server: bool) -> Result { - if server { - Ok(Usn(self - .db - .prepare_cached("select usn from col")? - .query_row(NO_PARAMS, |row| row.get(0))?)) - } else { - Ok(Usn(-1)) - } - } - - pub(crate) fn set_usn(&self, usn: Usn) -> Result<()> { - self.db - .prepare_cached("update col set usn = ?")? - .execute(&[usn])?; - Ok(()) - } - - pub(crate) fn increment_usn(&self) -> Result<()> { - self.db - .prepare_cached("update col set usn = usn + 1")? - .execute(NO_PARAMS)?; - Ok(()) - } - pub(crate) fn creation_stamp(&self) -> Result { self.db .prepare_cached("select crt from col")? diff --git a/rslib/src/storage/sync.rs b/rslib/src/storage/sync.rs new file mode 100644 index 000000000..6ec8fb19d --- /dev/null +++ b/rslib/src/storage/sync.rs @@ -0,0 +1,61 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use super::*; +use crate::prelude::*; +use rusqlite::{params, types::FromSql, ToSql, NO_PARAMS}; + +impl SqliteStorage { + pub(crate) fn usn(&self, server: bool) -> Result { + if server { + Ok(Usn(self + .db + .prepare_cached("select usn from col")? + .query_row(NO_PARAMS, |row| row.get(0))?)) + } else { + Ok(Usn(-1)) + } + } + + pub(crate) fn set_usn(&self, usn: Usn) -> Result<()> { + self.db + .prepare_cached("update col set usn = ?")? + .execute(&[usn])?; + Ok(()) + } + + pub(crate) fn increment_usn(&self) -> Result<()> { + self.db + .prepare_cached("update col set usn = usn + 1")? + .execute(NO_PARAMS)?; + Ok(()) + } + + pub(crate) fn objects_pending_sync(&self, table: &str, usn: Usn) -> Result> { + self.db + .prepare_cached(&format!( + "select id from {} where {}", + table, + usn.pending_object_clause() + ))? + .query_and_then(&[usn], |r| r.get(0).map_err(Into::into))? + .collect() + } + + pub(crate) fn maybe_update_object_usns( + &self, + table: &str, + ids: &[I], + new_usn: Option, + ) -> Result<()> { + if let Some(new_usn) = new_usn { + let mut stmt = self + .db + .prepare_cached(&format!("update {} set usn=? where id=?", table))?; + for id in ids { + stmt.execute(params![new_usn, id])?; + } + } + Ok(()) + } +} diff --git a/rslib/src/storage/tag/mod.rs b/rslib/src/storage/tag/mod.rs index 3d462eb83..4d8da1984 100644 --- a/rslib/src/storage/tag/mod.rs +++ b/rslib/src/storage/tag/mod.rs @@ -45,15 +45,24 @@ impl SqliteStorage { // fixme: in the future we could just register tags as part of the sync // instead of sending the tag list separately - pub(crate) fn take_changed_tags(&self, usn: Usn) -> Result> { - let tags: Vec = self - .db - .prepare("select tag from tags where usn=-1")? - .query_map(NO_PARAMS, |row| row.get(0))? - .collect::>()?; + pub(crate) fn tags_pending_sync(&self, usn: Usn) -> Result> { self.db - .execute("update tags set usn=? where usn=-1", &[&usn])?; - Ok(tags) + .prepare_cached(&format!( + "select tag from tags where {}", + usn.pending_object_clause() + ))? + .query_and_then(&[usn], |r| r.get(0).map_err(Into::into))? + .collect() + } + + pub(crate) fn update_tag_usns(&self, tags: &[String], new_usn: Usn) -> Result<()> { + let mut stmt = self + .db + .prepare_cached("update tags set usn=? where tag=?")?; + for tag in tags { + stmt.execute(params![new_usn, tag])?; + } + Ok(()) } // Upgrading/downgrading diff --git a/rslib/src/sync/mod.rs b/rslib/src/sync/mod.rs index b00a98add..4c25e2f7e 100644 --- a/rslib/src/sync/mod.rs +++ b/rslib/src/sync/mod.rs @@ -88,6 +88,12 @@ pub struct Chunk { notes: Vec, } +struct ChunkableIDs { + revlog: Vec, + cards: Vec, + notes: Vec, +} + #[derive(Serialize_tuple, Deserialize, Debug)] pub struct ReviewLogEntry { pub id: TimestampMillis, @@ -195,8 +201,13 @@ pub enum SyncActionRequired { struct SyncState { required: SyncActionRequired, local_is_newer: bool, - local_usn: Usn, - remote_usn: Usn, + usn_at_last_sync: Usn, + // latest usn, used for adding new items + latest_usn: Usn, + // usn to use when locating pending objects + pending_usn: Usn, + // usn to replace pending items with - the same as latest_usn in the client case + new_usn: Option, server_message: String, host_number: u32, } @@ -217,6 +228,17 @@ struct NormalSyncer<'a> { remote: HTTPSyncClient, } +impl Usn { + /// Used when gathering pending objects during sync. + pub(crate) fn pending_object_clause(self) -> &'static str { + if self.0 == -1 { + "usn = ?" + } else { + "usn >= ?" + } + } +} + impl NormalSyncer<'_> { /// Create a new syncing instance. If host_number is unavailable, use 0. pub fn new(col: &mut Collection, auth: SyncAuth) -> NormalSyncer<'_> { @@ -296,8 +318,10 @@ impl NormalSyncer<'_> { Ok(SyncState { required, local_is_newer: local.modified > remote.modified, - local_usn: local.usn, - remote_usn: remote.usn, + usn_at_last_sync: local.usn, + latest_usn: remote.usn, + pending_usn: Usn(-1), + new_usn: Some(remote.usn), server_message: remote.server_message, host_number: remote.host_number, }) @@ -306,12 +330,17 @@ impl NormalSyncer<'_> { /// Sync. Caller must have created a transaction, and should call /// abort on failure. async fn normal_sync_inner(&mut self, mut state: SyncState) -> Result { + debug!(self.col.log, "start"); self.start_and_process_deletions(&state).await?; - self.process_unchunked_changes(state.remote_usn, state.local_is_newer) - .await?; + debug!(self.col.log, "unchunked changes"); + self.process_unchunked_changes(&state).await?; + debug!(self.col.log, "begin stream from server"); self.process_chunks_from_server().await?; - self.send_chunks_to_server(state.remote_usn).await?; + debug!(self.col.log, "begin stream to server"); + self.send_chunks_to_server(&state).await?; + debug!(self.col.log, "sanity check"); self.sanity_check().await?; + debug!(self.col.log, "finalize"); self.finalize(&state).await?; state.required = SyncActionRequired::NoChanges; Ok(state.into()) @@ -320,22 +349,37 @@ impl NormalSyncer<'_> { // The following operations assume a transaction has been set up. async fn start_and_process_deletions(&self, state: &SyncState) -> Result<()> { - let removed_on_remote = self + let removed_on_remote: Graves = self .remote .start( - state.local_usn, + state.usn_at_last_sync, self.col.get_local_mins_west(), state.local_is_newer, ) .await?; - let mut locally_removed = self.col.storage.take_pending_graves(state.remote_usn)?; + debug!(self.col.log, "removed on remote"; + "cards"=>removed_on_remote.cards.len(), + "notes"=>removed_on_remote.notes.len(), + "decks"=>removed_on_remote.decks.len()); + + let mut locally_removed = self.col.storage.pending_graves(state.pending_usn)?; + if let Some(new_usn) = state.new_usn { + self.col.storage.update_pending_grave_usns(new_usn)?; + } + + debug!(self.col.log, "locally removed "; + "cards"=>locally_removed.cards.len(), + "notes"=>locally_removed.notes.len(), + "decks"=>locally_removed.decks.len()); while let Some(chunk) = locally_removed.take_chunk() { + debug!(self.col.log, "sending graves chunk"); self.remote.apply_graves(chunk).await?; } - self.col.apply_graves(removed_on_remote, state.local_usn)?; + self.col.apply_graves(removed_on_remote, state.latest_usn)?; + debug!(self.col.log, "applied server graves"); Ok(()) } @@ -344,21 +388,42 @@ impl NormalSyncer<'_> { // the large deck trees and note types some users would create. They should be chunked // in the future, like other objects. Syncing tags explicitly is also probably of limited // usefulness. - async fn process_unchunked_changes( - &mut self, - remote_usn: Usn, - local_is_newer: bool, - ) -> Result<()> { - let local_changes = self - .col - .local_unchunked_changes(remote_usn, local_is_newer)?; + async fn process_unchunked_changes(&mut self, state: &SyncState) -> Result<()> { + debug!(self.col.log, "gathering local changes"); + let local_changes = self.col.local_unchunked_changes( + state.pending_usn, + state.new_usn, + state.local_is_newer, + )?; + debug!(self.col.log, "sending"; + "notetypes"=>local_changes.notetypes.len(), + "decks"=>local_changes.decks_and_config.decks.len(), + "deck config"=>local_changes.decks_and_config.config.len(), + "tags"=>local_changes.tags.len(), + ); + let remote_changes = self.remote.apply_changes(local_changes).await?; - self.col.apply_changes(remote_changes, remote_usn) + debug!(self.col.log, "received"; + "notetypes"=>remote_changes.notetypes.len(), + "decks"=>remote_changes.decks_and_config.decks.len(), + "deck config"=>remote_changes.decks_and_config.config.len(), + "tags"=>remote_changes.tags.len(), + ); + + self.col.apply_changes(remote_changes, state.latest_usn) } async fn process_chunks_from_server(&mut self) -> Result<()> { loop { let chunk: Chunk = self.remote.chunk().await?; + + debug!(self.col.log, "received"; + "done"=>chunk.done, + "cards"=>chunk.cards.len(), + "notes"=>chunk.notes.len(), + "revlog"=>chunk.revlog.len(), + ); + let done = chunk.done; self.col.apply_chunk(chunk)?; @@ -368,10 +433,20 @@ impl NormalSyncer<'_> { } } - async fn send_chunks_to_server(&self, server_usn: Usn) -> Result<()> { + async fn send_chunks_to_server(&self, state: &SyncState) -> Result<()> { + let mut ids = self.col.get_chunkable_ids(state.pending_usn)?; + loop { - let chunk: Chunk = self.col.get_chunk(server_usn)?; + let chunk: Chunk = self.col.get_chunk(&mut ids, state.new_usn)?; let done = chunk.done; + + debug!(self.col.log, "sending"; + "done"=>chunk.done, + "cards"=>chunk.cards.len(), + "notes"=>chunk.notes.len(), + "revlog"=>chunk.revlog.len(), + ); + self.remote.apply_chunk(chunk).await?; if done { @@ -383,11 +458,14 @@ impl NormalSyncer<'_> { /// Caller should force full sync after rolling back. async fn sanity_check(&mut self) -> Result<()> { let mut local_counts = self.col.storage.sanity_check_info()?; - debug!(self.col.log, "gathered local counts"); self.col.add_due_counts(&mut local_counts.counts)?; + debug!( + self.col.log, + "gathered local counts; waiting for server reply" + ); let out: SanityCheckOut = self.remote.sanity_check(local_counts).await?; - debug!(self.col.log, "get server reply"); + debug!(self.col.log, "got server reply"); if out.status != SanityCheckStatus::Ok { Err(AnkiError::SyncError { info: format!("local {:?}\nremote {:?}", out.client, out.server), @@ -506,18 +584,18 @@ impl Collection { }) } - fn apply_graves(&self, graves: Graves, local_usn: Usn) -> Result<()> { + fn apply_graves(&self, graves: Graves, latest_usn: Usn) -> Result<()> { for nid in graves.notes { self.storage.remove_note(nid)?; - self.storage.add_note_grave(nid, local_usn)?; + self.storage.add_note_grave(nid, latest_usn)?; } for cid in graves.cards { self.storage.remove_card(cid)?; - self.storage.add_card_grave(cid, local_usn)?; + self.storage.add_card_grave(cid, latest_usn)?; } for did in graves.decks { self.storage.remove_deck(did)?; - self.storage.add_deck_grave(did, local_usn)?; + self.storage.add_deck_grave(did, latest_usn)?; } Ok(()) } @@ -527,16 +605,17 @@ impl Collection { fn local_unchunked_changes( &self, - remote_usn: Usn, + pending_usn: Usn, + new_usn: Option, local_is_newer: bool, ) -> Result { let mut changes = UnchunkedChanges { - notetypes: self.changed_notetypes(remote_usn)?, + notetypes: self.changed_notetypes(pending_usn, new_usn)?, decks_and_config: DecksAndConfig { - decks: self.changed_decks(remote_usn)?, - config: self.changed_deck_config(remote_usn)?, + decks: self.changed_decks(pending_usn, new_usn)?, + config: self.changed_deck_config(pending_usn, new_usn)?, }, - tags: self.changed_tags(remote_usn)?, + tags: self.changed_tags(pending_usn, new_usn)?, ..Default::default() }; if local_is_newer { @@ -547,33 +626,69 @@ impl Collection { Ok(changes) } - fn changed_notetypes(&self, new_usn: Usn) -> Result> { - let ids = self.storage.take_notetype_ids_pending_sync(new_usn)?; - ids.into_iter() - .map(|id| self.storage.get_notetype(id).map(|opt| opt.unwrap().into())) - .collect() - } - - fn changed_decks(&self, new_usn: Usn) -> Result> { - let ids = self.storage.take_deck_ids_pending_sync(new_usn)?; - ids.into_iter() - .map(|id| self.storage.get_deck(id).map(|opt| opt.unwrap().into())) - .collect() - } - - fn changed_deck_config(&self, new_usn: Usn) -> Result> { - let ids = self.storage.take_deck_config_ids_pending_sync(new_usn)?; + fn changed_notetypes( + &self, + pending_usn: Usn, + new_usn: Option, + ) -> Result> { + let ids = self + .storage + .objects_pending_sync("notetypes", pending_usn)?; + self.storage + .maybe_update_object_usns("notetypes", &ids, new_usn)?; ids.into_iter() .map(|id| { - self.storage - .get_deck_config(id) - .map(|opt| opt.unwrap().into()) + self.storage.get_notetype(id).map(|opt| { + let mut nt: NoteTypeSchema11 = opt.unwrap().into(); + nt.usn = new_usn.unwrap_or(nt.usn); + nt + }) }) .collect() } - fn changed_tags(&self, new_usn: Usn) -> Result> { - self.storage.take_changed_tags(new_usn) + fn changed_decks(&self, pending_usn: Usn, new_usn: Option) -> Result> { + let ids = self.storage.objects_pending_sync("decks", pending_usn)?; + self.storage + .maybe_update_object_usns("decks", &ids, new_usn)?; + ids.into_iter() + .map(|id| { + self.storage.get_deck(id).map(|opt| { + let mut deck = opt.unwrap(); + deck.usn = new_usn.unwrap_or(deck.usn); + deck.into() + }) + }) + .collect() + } + + fn changed_deck_config( + &self, + pending_usn: Usn, + new_usn: Option, + ) -> Result> { + let ids = self + .storage + .objects_pending_sync("deck_config", pending_usn)?; + self.storage + .maybe_update_object_usns("deck_config", &ids, new_usn)?; + ids.into_iter() + .map(|id| { + self.storage.get_deck_config(id).map(|opt| { + let mut conf: DeckConfSchema11 = opt.unwrap().into(); + conf.usn = new_usn.unwrap_or(conf.usn); + conf + }) + }) + .collect() + } + + fn changed_tags(&self, pending_usn: Usn, new_usn: Option) -> Result> { + let changed = self.storage.tags_pending_sync(pending_usn)?; + if let Some(usn) = new_usn { + self.storage.update_tag_usns(&changed, usn)?; + } + Ok(changed) } /// Currently this is all config, as legacy clients overwrite the local items @@ -587,17 +702,17 @@ impl Collection { // Remote->local unchunked changes //---------------------------------------------------------------- - fn apply_changes(&mut self, remote: UnchunkedChanges, remote_usn: Usn) -> Result<()> { + fn apply_changes(&mut self, remote: UnchunkedChanges, latest_usn: Usn) -> Result<()> { self.merge_notetypes(remote.notetypes)?; self.merge_decks(remote.decks_and_config.decks)?; self.merge_deck_config(remote.decks_and_config.config)?; - self.merge_tags(remote.tags, remote_usn)?; + self.merge_tags(remote.tags, latest_usn)?; if let Some(crt) = remote.creation_stamp { self.storage.set_creation_stamp(crt)?; } if let Some(config) = remote.config { self.storage - .set_all_config(config, remote_usn, TimestampSecs::now())?; + .set_all_config(config, latest_usn, TimestampSecs::now())?; } Ok(()) @@ -662,9 +777,9 @@ impl Collection { Ok(()) } - fn merge_tags(&self, tags: Vec, new_usn: Usn) -> Result<()> { + fn merge_tags(&self, tags: Vec, latest_usn: Usn) -> Result<()> { for tag in tags { - self.register_tag(&tag, new_usn)?; + self.register_tag(&tag, latest_usn)?; } Ok(()) } @@ -732,25 +847,87 @@ impl Collection { // Local->remote chunks //---------------------------------------------------------------- - fn get_chunk(&self, server_usn: Usn) -> Result { - let mut chunk = Chunk::default(); - chunk.revlog = self - .storage - .take_revlog_pending_sync(server_usn, CHUNK_SIZE)?; - chunk.cards = self - .storage - .take_cards_pending_sync(server_usn, CHUNK_SIZE)?; - if !chunk.revlog.is_empty() || !chunk.cards.is_empty() { - return Ok(chunk); - } + fn get_chunkable_ids(&self, pending_usn: Usn) -> Result { + Ok(ChunkableIDs { + revlog: self.storage.objects_pending_sync("revlog", pending_usn)?, + cards: self.storage.objects_pending_sync("cards", pending_usn)?, + notes: self.storage.objects_pending_sync("notes", pending_usn)?, + }) + } - chunk.notes = self - .storage - .take_notes_pending_sync(server_usn, CHUNK_SIZE)?; - if chunk.notes.is_empty() { + /// Fetch a chunk of ids from `ids`, returning the referenced objects. + fn get_chunk(&self, ids: &mut ChunkableIDs, new_usn: Option) -> Result { + // get a bunch of IDs + let mut limit = CHUNK_SIZE as i32; + let mut revlog_ids = vec![]; + let mut card_ids = vec![]; + let mut note_ids = vec![]; + let mut chunk = Chunk::default(); + while limit > 0 { + let last_limit = limit; + if let Some(id) = ids.revlog.pop() { + revlog_ids.push(id); + limit -= 1; + } + if let Some(id) = ids.notes.pop() { + note_ids.push(id); + limit -= 1; + } + if let Some(id) = ids.cards.pop() { + card_ids.push(id); + limit -= 1; + } + if limit == last_limit { + // all empty + break; + } + } + if limit > 0 { chunk.done = true; } + // remove pending status + if !self.server { + self.storage + .maybe_update_object_usns("revlog", &revlog_ids, new_usn)?; + self.storage + .maybe_update_object_usns("cards", &card_ids, new_usn)?; + self.storage + .maybe_update_object_usns("notes", ¬e_ids, new_usn)?; + } + + // the fetch associated objects, and return + chunk.revlog = revlog_ids + .into_iter() + .map(|id| { + self.storage.get_revlog_entry(id).map(|e| { + let mut e = e.unwrap(); + e.usn = new_usn.unwrap_or(e.usn); + e + }) + }) + .collect::>()?; + chunk.cards = card_ids + .into_iter() + .map(|id| { + self.storage.get_card(id).map(|e| { + let mut e: CardEntry = e.unwrap().into(); + e.usn = new_usn.unwrap_or(e.usn); + e + }) + }) + .collect::>()?; + chunk.notes = note_ids + .into_iter() + .map(|id| { + self.storage.get_note(id).map(|e| { + let mut e: NoteEntry = e.unwrap().into(); + e.usn = new_usn.unwrap_or(e.usn); + e + }) + }) + .collect::>()?; + Ok(chunk) } @@ -768,7 +945,7 @@ impl Collection { fn finalize_sync(&self, state: &SyncState, new_server_mtime: TimestampMillis) -> Result<()> { self.storage.set_last_sync(new_server_mtime)?; - let mut usn = state.remote_usn; + let mut usn = state.latest_usn; usn.0 += 1; self.storage.set_usn(usn)?; self.storage.set_modified_time(new_server_mtime)