rework usn handling in sync

This commit is contained in:
Damien Elmes 2020-05-30 20:21:44 +10:00
parent 5102255ff9
commit 529152aef6
15 changed files with 357 additions and 228 deletions

View file

@ -22,6 +22,7 @@ pub mod notes;
pub mod notetype; pub mod notetype;
mod preferences; mod preferences;
pub mod prelude; pub mod prelude;
pub mod revlog;
pub mod sched; pub mod sched;
pub mod search; pub mod search;
pub mod serde; pub mod serde;

View file

@ -9,6 +9,7 @@ pub use crate::{
err::{AnkiError, Result}, err::{AnkiError, Result},
notes::NoteID, notes::NoteID,
notetype::NoteTypeID, notetype::NoteTypeID,
revlog::RevlogID,
timestamp::{TimestampMillis, TimestampSecs}, timestamp::{TimestampMillis, TimestampSecs},
types::Usn, types::Usn,
}; };

6
rslib/src/revlog.rs Normal file
View file

@ -0,0 +1,6 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use crate::define_newtype;
define_newtype!(RevlogID, i64);

View file

@ -6,7 +6,6 @@ use crate::{
decks::DeckID, decks::DeckID,
err::Result, err::Result,
notes::NoteID, notes::NoteID,
sync::CardEntry,
timestamp::{TimestampMillis, TimestampSecs}, timestamp::{TimestampMillis, TimestampSecs},
types::Usn, types::Usn,
}; };
@ -220,31 +219,6 @@ impl super::SqliteStorage {
Ok(()) Ok(())
} }
pub(crate) fn take_cards_pending_sync(
&self,
new_usn: Usn,
limit: usize,
) -> Result<Vec<CardEntry>> {
let entries: Vec<CardEntry> = self
.db
.prepare_cached(concat!(
include_str!("get_card.sql"),
" where usn=-1 limit ?"
))?
.query_and_then(&[limit as u32], |r| {
row_to_card(r).map(Into::into).map_err(Into::into)
})?
.collect::<Result<_>>()?;
let mut stmt = self
.db
.prepare_cached("update cards set usn=? where id=?")?;
for entry in &entries {
stmt.execute(params![new_usn, entry.id])?;
}
Ok(entries)
}
pub(crate) fn have_at_least_one_card(&self) -> Result<bool> { pub(crate) fn have_at_least_one_card(&self) -> Result<bool> {
self.db self.db
.prepare_cached("select null from cards")? .prepare_cached("select null from cards")?

View file

@ -9,7 +9,6 @@ use crate::{
decks::{Deck, DeckCommon, DeckID, DeckKindProto, DeckSchema11, DueCounts}, decks::{Deck, DeckCommon, DeckID, DeckKindProto, DeckSchema11, DueCounts},
err::{AnkiError, DBErrorKind, Result}, err::{AnkiError, DBErrorKind, Result},
i18n::{I18n, TR}, i18n::{I18n, TR},
prelude::*,
timestamp::TimestampMillis, timestamp::TimestampMillis,
}; };
use prost::Message; use prost::Message;
@ -233,19 +232,6 @@ impl SqliteStorage {
Ok(()) Ok(())
} }
pub(crate) fn take_deck_ids_pending_sync(&self, new_usn: Usn) -> Result<Vec<DeckID>> {
let ids: Vec<DeckID> = self
.db
.prepare("select id from decks where usn=-1")?
.query_and_then(NO_PARAMS, |r| r.get(0))?
.collect::<std::result::Result<_, rusqlite::Error>>()?;
self.db
.prepare("update decks set usn=? where usn=-1")?
.execute(&[new_usn])?;
Ok(ids)
}
// Upgrading/downgrading/legacy // Upgrading/downgrading/legacy
pub(super) fn add_default_deck(&self, i18n: &I18n) -> Result<()> { pub(super) fn add_default_deck(&self, i18n: &I18n) -> Result<()> {

View file

@ -6,7 +6,6 @@ use crate::{
deckconf::{DeckConf, DeckConfID, DeckConfSchema11, DeckConfigInner}, deckconf::{DeckConf, DeckConfID, DeckConfSchema11, DeckConfigInner},
err::Result, err::Result,
i18n::{I18n, TR}, i18n::{I18n, TR},
prelude::*,
}; };
use prost::Message; use prost::Message;
use rusqlite::{params, Row, NO_PARAMS}; use rusqlite::{params, Row, NO_PARAMS};
@ -103,22 +102,6 @@ impl SqliteStorage {
Ok(()) Ok(())
} }
pub(crate) fn take_deck_config_ids_pending_sync(
&self,
new_usn: Usn,
) -> Result<Vec<DeckConfID>> {
let ids: Vec<DeckConfID> = self
.db
.prepare("select id from deck_config where usn=-1")?
.query_and_then(NO_PARAMS, |r| r.get(0))?
.collect::<std::result::Result<_, rusqlite::Error>>()?;
self.db
.prepare("update deck_config set usn=? where usn=-1")?
.execute(&[new_usn])?;
Ok(ids)
}
// Creating/upgrading/downgrading // Creating/upgrading/downgrading
pub(super) fn add_default_deck_config(&self, i18n: &I18n) -> Result<()> { pub(super) fn add_default_deck_config(&self, i18n: &I18n) -> Result<()> {

View file

@ -47,11 +47,12 @@ impl SqliteStorage {
self.add_grave(did.0, GraveKind::Deck, usn) self.add_grave(did.0, GraveKind::Deck, usn)
} }
pub(crate) fn take_pending_graves(&self, new_usn: Usn) -> Result<Graves> { pub(crate) fn pending_graves(&self, pending_usn: Usn) -> Result<Graves> {
let mut stmt = self let mut stmt = self.db.prepare(&format!(
.db "select oid, type from graves where {}",
.prepare("select oid, type from graves where usn=-1")?; pending_usn.pending_object_clause()
let mut rows = stmt.query(NO_PARAMS)?; ))?;
let mut rows = stmt.query(&[pending_usn])?;
let mut graves = Graves::default(); let mut graves = Graves::default();
while let Some(row) = rows.next()? { while let Some(row) = rows.next()? {
let oid: i64 = row.get(0)?; let oid: i64 = row.get(0)?;
@ -63,11 +64,14 @@ impl SqliteStorage {
GraveKind::Deck => graves.decks.push(DeckID(oid)), GraveKind::Deck => graves.decks.push(DeckID(oid)),
} }
} }
Ok(graves)
}
// fixme: graves is missing an index
pub(crate) fn update_pending_grave_usns(&self, new_usn: Usn) -> Result<()> {
self.db self.db
.prepare("update graves set usn=? where usn=-1")? .prepare("update graves set usn=? where usn=-1")?
.execute(&[new_usn])?; .execute(&[new_usn])?;
Ok(())
Ok(graves)
} }
} }

View file

@ -10,6 +10,7 @@ mod note;
mod notetype; mod notetype;
mod revlog; mod revlog;
mod sqlite; mod sqlite;
mod sync;
mod sync_check; mod sync_check;
mod tag; mod tag;
mod upgrades; mod upgrades;

View file

@ -5,8 +5,6 @@ use crate::{
err::Result, err::Result,
notes::{Note, NoteID}, notes::{Note, NoteID},
notetype::NoteTypeID, notetype::NoteTypeID,
prelude::*,
sync::NoteEntry,
tags::{join_tags, split_tags}, tags::{join_tags, split_tags},
timestamp::TimestampMillis, timestamp::TimestampMillis,
}; };
@ -132,24 +130,4 @@ impl super::SqliteStorage {
.query_and_then(params![csum, ntid, nid], |r| r.get(0).map_err(Into::into))? .query_and_then(params![csum, ntid, nid], |r| r.get(0).map_err(Into::into))?
.collect() .collect()
} }
pub(crate) fn take_notes_pending_sync(
&self,
new_usn: Usn,
limit: usize,
) -> Result<Vec<NoteEntry>> {
let entries: Vec<NoteEntry> = self
.db
.prepare_cached(concat!(include_str!("get.sql"), " where usn=-1 limit ?"))?
.query_and_then(&[limit as u32], |r| row_to_note(r).map(Into::into))?
.collect::<Result<_>>()?;
let mut stmt = self
.db
.prepare_cached("update notes set usn=? where id=?")?;
for entry in &entries {
stmt.execute(params![new_usn, entry.id])?;
}
Ok(entries)
}
} }

View file

@ -10,7 +10,6 @@ use crate::{
NoteTypeConfig, NoteTypeConfig,
}, },
notetype::{NoteType, NoteTypeID, NoteTypeSchema11}, notetype::{NoteType, NoteTypeID, NoteTypeSchema11},
prelude::*,
timestamp::TimestampMillis, timestamp::TimestampMillis,
}; };
use prost::Message; use prost::Message;
@ -329,19 +328,6 @@ and ord in ",
Ok(()) Ok(())
} }
pub(crate) fn take_notetype_ids_pending_sync(&self, new_usn: Usn) -> Result<Vec<NoteTypeID>> {
let ids: Vec<NoteTypeID> = self
.db
.prepare("select id from notetypes where usn=-1")?
.query_and_then(NO_PARAMS, |r| r.get(0))?
.collect::<std::result::Result<_, rusqlite::Error>>()?;
self.db
.prepare("update notetypes set usn=? where usn=-1")?
.execute(&[new_usn])?;
Ok(ids)
}
// Upgrading/downgrading/legacy // Upgrading/downgrading/legacy
pub(crate) fn get_all_notetypes_as_schema11( pub(crate) fn get_all_notetypes_as_schema11(

View file

@ -38,15 +38,10 @@ impl SqliteStorage {
Ok(()) Ok(())
} }
pub(crate) fn take_revlog_pending_sync( pub(crate) fn get_revlog_entry(&self, id: RevlogID) -> Result<Option<ReviewLogEntry>> {
&self, self.db
new_usn: Usn, .prepare_cached(concat!(include_str!("get.sql"), " where id=?"))?
limit: usize, .query_and_then(&[id], |row| {
) -> Result<Vec<ReviewLogEntry>> {
let entries: Vec<ReviewLogEntry> = self
.db
.prepare_cached(concat!(include_str!("get.sql"), " where usn=-1 limit ?"))?
.query_and_then(&[limit as u32], |row| {
Ok(ReviewLogEntry { Ok(ReviewLogEntry {
id: row.get(0)?, id: row.get(0)?,
cid: row.get(1)?, cid: row.get(1)?,
@ -59,15 +54,7 @@ impl SqliteStorage {
kind: row.get(8)?, kind: row.get(8)?,
}) })
})? })?
.collect::<Result<_>>()?; .next()
.transpose()
let mut stmt = self
.db
.prepare_cached("update revlog set usn=? where id=?")?;
for entry in &entries {
stmt.execute(params![new_usn, entry.id])?;
}
Ok(entries)
} }
} }

View file

@ -5,7 +5,7 @@ use crate::config::schema11_config_as_string;
use crate::err::Result; use crate::err::Result;
use crate::err::{AnkiError, DBErrorKind}; use crate::err::{AnkiError, DBErrorKind};
use crate::timestamp::{TimestampMillis, TimestampSecs}; use crate::timestamp::{TimestampMillis, TimestampSecs};
use crate::{i18n::I18n, sched::cutoff::v1_creation_date, text::without_combining, types::Usn}; use crate::{i18n::I18n, sched::cutoff::v1_creation_date, text::without_combining};
use regex::Regex; use regex::Regex;
use rusqlite::{functions::FunctionFlags, params, Connection, NO_PARAMS}; use rusqlite::{functions::FunctionFlags, params, Connection, NO_PARAMS};
use std::cmp::Ordering; use std::cmp::Ordering;
@ -277,31 +277,6 @@ impl SqliteStorage {
.map_err(Into::into) .map_err(Into::into)
} }
pub(crate) fn usn(&self, server: bool) -> Result<Usn> {
if server {
Ok(Usn(self
.db
.prepare_cached("select usn from col")?
.query_row(NO_PARAMS, |row| row.get(0))?))
} else {
Ok(Usn(-1))
}
}
pub(crate) fn set_usn(&self, usn: Usn) -> Result<()> {
self.db
.prepare_cached("update col set usn = ?")?
.execute(&[usn])?;
Ok(())
}
pub(crate) fn increment_usn(&self) -> Result<()> {
self.db
.prepare_cached("update col set usn = usn + 1")?
.execute(NO_PARAMS)?;
Ok(())
}
pub(crate) fn creation_stamp(&self) -> Result<TimestampSecs> { pub(crate) fn creation_stamp(&self) -> Result<TimestampSecs> {
self.db self.db
.prepare_cached("select crt from col")? .prepare_cached("select crt from col")?

61
rslib/src/storage/sync.rs Normal file
View file

@ -0,0 +1,61 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use super::*;
use crate::prelude::*;
use rusqlite::{params, types::FromSql, ToSql, NO_PARAMS};
impl SqliteStorage {
pub(crate) fn usn(&self, server: bool) -> Result<Usn> {
if server {
Ok(Usn(self
.db
.prepare_cached("select usn from col")?
.query_row(NO_PARAMS, |row| row.get(0))?))
} else {
Ok(Usn(-1))
}
}
pub(crate) fn set_usn(&self, usn: Usn) -> Result<()> {
self.db
.prepare_cached("update col set usn = ?")?
.execute(&[usn])?;
Ok(())
}
pub(crate) fn increment_usn(&self) -> Result<()> {
self.db
.prepare_cached("update col set usn = usn + 1")?
.execute(NO_PARAMS)?;
Ok(())
}
pub(crate) fn objects_pending_sync<T: FromSql>(&self, table: &str, usn: Usn) -> Result<Vec<T>> {
self.db
.prepare_cached(&format!(
"select id from {} where {}",
table,
usn.pending_object_clause()
))?
.query_and_then(&[usn], |r| r.get(0).map_err(Into::into))?
.collect()
}
pub(crate) fn maybe_update_object_usns<I: ToSql>(
&self,
table: &str,
ids: &[I],
new_usn: Option<Usn>,
) -> Result<()> {
if let Some(new_usn) = new_usn {
let mut stmt = self
.db
.prepare_cached(&format!("update {} set usn=? where id=?", table))?;
for id in ids {
stmt.execute(params![new_usn, id])?;
}
}
Ok(())
}
}

View file

@ -45,15 +45,24 @@ impl SqliteStorage {
// fixme: in the future we could just register tags as part of the sync // fixme: in the future we could just register tags as part of the sync
// instead of sending the tag list separately // instead of sending the tag list separately
pub(crate) fn take_changed_tags(&self, usn: Usn) -> Result<Vec<String>> { pub(crate) fn tags_pending_sync(&self, usn: Usn) -> Result<Vec<String>> {
let tags: Vec<String> = self
.db
.prepare("select tag from tags where usn=-1")?
.query_map(NO_PARAMS, |row| row.get(0))?
.collect::<std::result::Result<_, rusqlite::Error>>()?;
self.db self.db
.execute("update tags set usn=? where usn=-1", &[&usn])?; .prepare_cached(&format!(
Ok(tags) "select tag from tags where {}",
usn.pending_object_clause()
))?
.query_and_then(&[usn], |r| r.get(0).map_err(Into::into))?
.collect()
}
pub(crate) fn update_tag_usns(&self, tags: &[String], new_usn: Usn) -> Result<()> {
let mut stmt = self
.db
.prepare_cached("update tags set usn=? where tag=?")?;
for tag in tags {
stmt.execute(params![new_usn, tag])?;
}
Ok(())
} }
// Upgrading/downgrading // Upgrading/downgrading

View file

@ -88,6 +88,12 @@ pub struct Chunk {
notes: Vec<NoteEntry>, notes: Vec<NoteEntry>,
} }
struct ChunkableIDs {
revlog: Vec<RevlogID>,
cards: Vec<CardID>,
notes: Vec<NoteID>,
}
#[derive(Serialize_tuple, Deserialize, Debug)] #[derive(Serialize_tuple, Deserialize, Debug)]
pub struct ReviewLogEntry { pub struct ReviewLogEntry {
pub id: TimestampMillis, pub id: TimestampMillis,
@ -195,8 +201,13 @@ pub enum SyncActionRequired {
struct SyncState { struct SyncState {
required: SyncActionRequired, required: SyncActionRequired,
local_is_newer: bool, local_is_newer: bool,
local_usn: Usn, usn_at_last_sync: Usn,
remote_usn: Usn, // latest usn, used for adding new items
latest_usn: Usn,
// usn to use when locating pending objects
pending_usn: Usn,
// usn to replace pending items with - the same as latest_usn in the client case
new_usn: Option<Usn>,
server_message: String, server_message: String,
host_number: u32, host_number: u32,
} }
@ -217,6 +228,17 @@ struct NormalSyncer<'a> {
remote: HTTPSyncClient, remote: HTTPSyncClient,
} }
impl Usn {
/// Used when gathering pending objects during sync.
pub(crate) fn pending_object_clause(self) -> &'static str {
if self.0 == -1 {
"usn = ?"
} else {
"usn >= ?"
}
}
}
impl NormalSyncer<'_> { impl NormalSyncer<'_> {
/// Create a new syncing instance. If host_number is unavailable, use 0. /// Create a new syncing instance. If host_number is unavailable, use 0.
pub fn new(col: &mut Collection, auth: SyncAuth) -> NormalSyncer<'_> { pub fn new(col: &mut Collection, auth: SyncAuth) -> NormalSyncer<'_> {
@ -296,8 +318,10 @@ impl NormalSyncer<'_> {
Ok(SyncState { Ok(SyncState {
required, required,
local_is_newer: local.modified > remote.modified, local_is_newer: local.modified > remote.modified,
local_usn: local.usn, usn_at_last_sync: local.usn,
remote_usn: remote.usn, latest_usn: remote.usn,
pending_usn: Usn(-1),
new_usn: Some(remote.usn),
server_message: remote.server_message, server_message: remote.server_message,
host_number: remote.host_number, host_number: remote.host_number,
}) })
@ -306,12 +330,17 @@ impl NormalSyncer<'_> {
/// Sync. Caller must have created a transaction, and should call /// Sync. Caller must have created a transaction, and should call
/// abort on failure. /// abort on failure.
async fn normal_sync_inner(&mut self, mut state: SyncState) -> Result<SyncOutput> { async fn normal_sync_inner(&mut self, mut state: SyncState) -> Result<SyncOutput> {
debug!(self.col.log, "start");
self.start_and_process_deletions(&state).await?; self.start_and_process_deletions(&state).await?;
self.process_unchunked_changes(state.remote_usn, state.local_is_newer) debug!(self.col.log, "unchunked changes");
.await?; self.process_unchunked_changes(&state).await?;
debug!(self.col.log, "begin stream from server");
self.process_chunks_from_server().await?; self.process_chunks_from_server().await?;
self.send_chunks_to_server(state.remote_usn).await?; debug!(self.col.log, "begin stream to server");
self.send_chunks_to_server(&state).await?;
debug!(self.col.log, "sanity check");
self.sanity_check().await?; self.sanity_check().await?;
debug!(self.col.log, "finalize");
self.finalize(&state).await?; self.finalize(&state).await?;
state.required = SyncActionRequired::NoChanges; state.required = SyncActionRequired::NoChanges;
Ok(state.into()) Ok(state.into())
@ -320,22 +349,37 @@ impl NormalSyncer<'_> {
// The following operations assume a transaction has been set up. // The following operations assume a transaction has been set up.
async fn start_and_process_deletions(&self, state: &SyncState) -> Result<()> { async fn start_and_process_deletions(&self, state: &SyncState) -> Result<()> {
let removed_on_remote = self let removed_on_remote: Graves = self
.remote .remote
.start( .start(
state.local_usn, state.usn_at_last_sync,
self.col.get_local_mins_west(), self.col.get_local_mins_west(),
state.local_is_newer, state.local_is_newer,
) )
.await?; .await?;
let mut locally_removed = self.col.storage.take_pending_graves(state.remote_usn)?; debug!(self.col.log, "removed on remote";
"cards"=>removed_on_remote.cards.len(),
"notes"=>removed_on_remote.notes.len(),
"decks"=>removed_on_remote.decks.len());
let mut locally_removed = self.col.storage.pending_graves(state.pending_usn)?;
if let Some(new_usn) = state.new_usn {
self.col.storage.update_pending_grave_usns(new_usn)?;
}
debug!(self.col.log, "locally removed ";
"cards"=>locally_removed.cards.len(),
"notes"=>locally_removed.notes.len(),
"decks"=>locally_removed.decks.len());
while let Some(chunk) = locally_removed.take_chunk() { while let Some(chunk) = locally_removed.take_chunk() {
debug!(self.col.log, "sending graves chunk");
self.remote.apply_graves(chunk).await?; self.remote.apply_graves(chunk).await?;
} }
self.col.apply_graves(removed_on_remote, state.local_usn)?; self.col.apply_graves(removed_on_remote, state.latest_usn)?;
debug!(self.col.log, "applied server graves");
Ok(()) Ok(())
} }
@ -344,21 +388,42 @@ impl NormalSyncer<'_> {
// the large deck trees and note types some users would create. They should be chunked // the large deck trees and note types some users would create. They should be chunked
// in the future, like other objects. Syncing tags explicitly is also probably of limited // in the future, like other objects. Syncing tags explicitly is also probably of limited
// usefulness. // usefulness.
async fn process_unchunked_changes( async fn process_unchunked_changes(&mut self, state: &SyncState) -> Result<()> {
&mut self, debug!(self.col.log, "gathering local changes");
remote_usn: Usn, let local_changes = self.col.local_unchunked_changes(
local_is_newer: bool, state.pending_usn,
) -> Result<()> { state.new_usn,
let local_changes = self state.local_is_newer,
.col )?;
.local_unchunked_changes(remote_usn, local_is_newer)?; debug!(self.col.log, "sending";
"notetypes"=>local_changes.notetypes.len(),
"decks"=>local_changes.decks_and_config.decks.len(),
"deck config"=>local_changes.decks_and_config.config.len(),
"tags"=>local_changes.tags.len(),
);
let remote_changes = self.remote.apply_changes(local_changes).await?; let remote_changes = self.remote.apply_changes(local_changes).await?;
self.col.apply_changes(remote_changes, remote_usn) debug!(self.col.log, "received";
"notetypes"=>remote_changes.notetypes.len(),
"decks"=>remote_changes.decks_and_config.decks.len(),
"deck config"=>remote_changes.decks_and_config.config.len(),
"tags"=>remote_changes.tags.len(),
);
self.col.apply_changes(remote_changes, state.latest_usn)
} }
async fn process_chunks_from_server(&mut self) -> Result<()> { async fn process_chunks_from_server(&mut self) -> Result<()> {
loop { loop {
let chunk: Chunk = self.remote.chunk().await?; let chunk: Chunk = self.remote.chunk().await?;
debug!(self.col.log, "received";
"done"=>chunk.done,
"cards"=>chunk.cards.len(),
"notes"=>chunk.notes.len(),
"revlog"=>chunk.revlog.len(),
);
let done = chunk.done; let done = chunk.done;
self.col.apply_chunk(chunk)?; self.col.apply_chunk(chunk)?;
@ -368,10 +433,20 @@ impl NormalSyncer<'_> {
} }
} }
async fn send_chunks_to_server(&self, server_usn: Usn) -> Result<()> { async fn send_chunks_to_server(&self, state: &SyncState) -> Result<()> {
let mut ids = self.col.get_chunkable_ids(state.pending_usn)?;
loop { loop {
let chunk: Chunk = self.col.get_chunk(server_usn)?; let chunk: Chunk = self.col.get_chunk(&mut ids, state.new_usn)?;
let done = chunk.done; let done = chunk.done;
debug!(self.col.log, "sending";
"done"=>chunk.done,
"cards"=>chunk.cards.len(),
"notes"=>chunk.notes.len(),
"revlog"=>chunk.revlog.len(),
);
self.remote.apply_chunk(chunk).await?; self.remote.apply_chunk(chunk).await?;
if done { if done {
@ -383,11 +458,14 @@ impl NormalSyncer<'_> {
/// Caller should force full sync after rolling back. /// Caller should force full sync after rolling back.
async fn sanity_check(&mut self) -> Result<()> { async fn sanity_check(&mut self) -> Result<()> {
let mut local_counts = self.col.storage.sanity_check_info()?; let mut local_counts = self.col.storage.sanity_check_info()?;
debug!(self.col.log, "gathered local counts");
self.col.add_due_counts(&mut local_counts.counts)?; self.col.add_due_counts(&mut local_counts.counts)?;
debug!(
self.col.log,
"gathered local counts; waiting for server reply"
);
let out: SanityCheckOut = self.remote.sanity_check(local_counts).await?; let out: SanityCheckOut = self.remote.sanity_check(local_counts).await?;
debug!(self.col.log, "get server reply"); debug!(self.col.log, "got server reply");
if out.status != SanityCheckStatus::Ok { if out.status != SanityCheckStatus::Ok {
Err(AnkiError::SyncError { Err(AnkiError::SyncError {
info: format!("local {:?}\nremote {:?}", out.client, out.server), info: format!("local {:?}\nremote {:?}", out.client, out.server),
@ -506,18 +584,18 @@ impl Collection {
}) })
} }
fn apply_graves(&self, graves: Graves, local_usn: Usn) -> Result<()> { fn apply_graves(&self, graves: Graves, latest_usn: Usn) -> Result<()> {
for nid in graves.notes { for nid in graves.notes {
self.storage.remove_note(nid)?; self.storage.remove_note(nid)?;
self.storage.add_note_grave(nid, local_usn)?; self.storage.add_note_grave(nid, latest_usn)?;
} }
for cid in graves.cards { for cid in graves.cards {
self.storage.remove_card(cid)?; self.storage.remove_card(cid)?;
self.storage.add_card_grave(cid, local_usn)?; self.storage.add_card_grave(cid, latest_usn)?;
} }
for did in graves.decks { for did in graves.decks {
self.storage.remove_deck(did)?; self.storage.remove_deck(did)?;
self.storage.add_deck_grave(did, local_usn)?; self.storage.add_deck_grave(did, latest_usn)?;
} }
Ok(()) Ok(())
} }
@ -527,16 +605,17 @@ impl Collection {
fn local_unchunked_changes( fn local_unchunked_changes(
&self, &self,
remote_usn: Usn, pending_usn: Usn,
new_usn: Option<Usn>,
local_is_newer: bool, local_is_newer: bool,
) -> Result<UnchunkedChanges> { ) -> Result<UnchunkedChanges> {
let mut changes = UnchunkedChanges { let mut changes = UnchunkedChanges {
notetypes: self.changed_notetypes(remote_usn)?, notetypes: self.changed_notetypes(pending_usn, new_usn)?,
decks_and_config: DecksAndConfig { decks_and_config: DecksAndConfig {
decks: self.changed_decks(remote_usn)?, decks: self.changed_decks(pending_usn, new_usn)?,
config: self.changed_deck_config(remote_usn)?, config: self.changed_deck_config(pending_usn, new_usn)?,
}, },
tags: self.changed_tags(remote_usn)?, tags: self.changed_tags(pending_usn, new_usn)?,
..Default::default() ..Default::default()
}; };
if local_is_newer { if local_is_newer {
@ -547,33 +626,69 @@ impl Collection {
Ok(changes) Ok(changes)
} }
fn changed_notetypes(&self, new_usn: Usn) -> Result<Vec<NoteTypeSchema11>> { fn changed_notetypes(
let ids = self.storage.take_notetype_ids_pending_sync(new_usn)?; &self,
ids.into_iter() pending_usn: Usn,
.map(|id| self.storage.get_notetype(id).map(|opt| opt.unwrap().into())) new_usn: Option<Usn>,
.collect() ) -> Result<Vec<NoteTypeSchema11>> {
} let ids = self
.storage
fn changed_decks(&self, new_usn: Usn) -> Result<Vec<DeckSchema11>> { .objects_pending_sync("notetypes", pending_usn)?;
let ids = self.storage.take_deck_ids_pending_sync(new_usn)?; self.storage
ids.into_iter() .maybe_update_object_usns("notetypes", &ids, new_usn)?;
.map(|id| self.storage.get_deck(id).map(|opt| opt.unwrap().into()))
.collect()
}
fn changed_deck_config(&self, new_usn: Usn) -> Result<Vec<DeckConfSchema11>> {
let ids = self.storage.take_deck_config_ids_pending_sync(new_usn)?;
ids.into_iter() ids.into_iter()
.map(|id| { .map(|id| {
self.storage self.storage.get_notetype(id).map(|opt| {
.get_deck_config(id) let mut nt: NoteTypeSchema11 = opt.unwrap().into();
.map(|opt| opt.unwrap().into()) nt.usn = new_usn.unwrap_or(nt.usn);
nt
})
}) })
.collect() .collect()
} }
fn changed_tags(&self, new_usn: Usn) -> Result<Vec<String>> { fn changed_decks(&self, pending_usn: Usn, new_usn: Option<Usn>) -> Result<Vec<DeckSchema11>> {
self.storage.take_changed_tags(new_usn) let ids = self.storage.objects_pending_sync("decks", pending_usn)?;
self.storage
.maybe_update_object_usns("decks", &ids, new_usn)?;
ids.into_iter()
.map(|id| {
self.storage.get_deck(id).map(|opt| {
let mut deck = opt.unwrap();
deck.usn = new_usn.unwrap_or(deck.usn);
deck.into()
})
})
.collect()
}
fn changed_deck_config(
&self,
pending_usn: Usn,
new_usn: Option<Usn>,
) -> Result<Vec<DeckConfSchema11>> {
let ids = self
.storage
.objects_pending_sync("deck_config", pending_usn)?;
self.storage
.maybe_update_object_usns("deck_config", &ids, new_usn)?;
ids.into_iter()
.map(|id| {
self.storage.get_deck_config(id).map(|opt| {
let mut conf: DeckConfSchema11 = opt.unwrap().into();
conf.usn = new_usn.unwrap_or(conf.usn);
conf
})
})
.collect()
}
fn changed_tags(&self, pending_usn: Usn, new_usn: Option<Usn>) -> Result<Vec<String>> {
let changed = self.storage.tags_pending_sync(pending_usn)?;
if let Some(usn) = new_usn {
self.storage.update_tag_usns(&changed, usn)?;
}
Ok(changed)
} }
/// Currently this is all config, as legacy clients overwrite the local items /// Currently this is all config, as legacy clients overwrite the local items
@ -587,17 +702,17 @@ impl Collection {
// Remote->local unchunked changes // Remote->local unchunked changes
//---------------------------------------------------------------- //----------------------------------------------------------------
fn apply_changes(&mut self, remote: UnchunkedChanges, remote_usn: Usn) -> Result<()> { fn apply_changes(&mut self, remote: UnchunkedChanges, latest_usn: Usn) -> Result<()> {
self.merge_notetypes(remote.notetypes)?; self.merge_notetypes(remote.notetypes)?;
self.merge_decks(remote.decks_and_config.decks)?; self.merge_decks(remote.decks_and_config.decks)?;
self.merge_deck_config(remote.decks_and_config.config)?; self.merge_deck_config(remote.decks_and_config.config)?;
self.merge_tags(remote.tags, remote_usn)?; self.merge_tags(remote.tags, latest_usn)?;
if let Some(crt) = remote.creation_stamp { if let Some(crt) = remote.creation_stamp {
self.storage.set_creation_stamp(crt)?; self.storage.set_creation_stamp(crt)?;
} }
if let Some(config) = remote.config { if let Some(config) = remote.config {
self.storage self.storage
.set_all_config(config, remote_usn, TimestampSecs::now())?; .set_all_config(config, latest_usn, TimestampSecs::now())?;
} }
Ok(()) Ok(())
@ -662,9 +777,9 @@ impl Collection {
Ok(()) Ok(())
} }
fn merge_tags(&self, tags: Vec<String>, new_usn: Usn) -> Result<()> { fn merge_tags(&self, tags: Vec<String>, latest_usn: Usn) -> Result<()> {
for tag in tags { for tag in tags {
self.register_tag(&tag, new_usn)?; self.register_tag(&tag, latest_usn)?;
} }
Ok(()) Ok(())
} }
@ -732,25 +847,87 @@ impl Collection {
// Local->remote chunks // Local->remote chunks
//---------------------------------------------------------------- //----------------------------------------------------------------
fn get_chunk(&self, server_usn: Usn) -> Result<Chunk> { fn get_chunkable_ids(&self, pending_usn: Usn) -> Result<ChunkableIDs> {
let mut chunk = Chunk::default(); Ok(ChunkableIDs {
chunk.revlog = self revlog: self.storage.objects_pending_sync("revlog", pending_usn)?,
.storage cards: self.storage.objects_pending_sync("cards", pending_usn)?,
.take_revlog_pending_sync(server_usn, CHUNK_SIZE)?; notes: self.storage.objects_pending_sync("notes", pending_usn)?,
chunk.cards = self })
.storage }
.take_cards_pending_sync(server_usn, CHUNK_SIZE)?;
if !chunk.revlog.is_empty() || !chunk.cards.is_empty() {
return Ok(chunk);
}
chunk.notes = self /// Fetch a chunk of ids from `ids`, returning the referenced objects.
.storage fn get_chunk(&self, ids: &mut ChunkableIDs, new_usn: Option<Usn>) -> Result<Chunk> {
.take_notes_pending_sync(server_usn, CHUNK_SIZE)?; // get a bunch of IDs
if chunk.notes.is_empty() { let mut limit = CHUNK_SIZE as i32;
let mut revlog_ids = vec![];
let mut card_ids = vec![];
let mut note_ids = vec![];
let mut chunk = Chunk::default();
while limit > 0 {
let last_limit = limit;
if let Some(id) = ids.revlog.pop() {
revlog_ids.push(id);
limit -= 1;
}
if let Some(id) = ids.notes.pop() {
note_ids.push(id);
limit -= 1;
}
if let Some(id) = ids.cards.pop() {
card_ids.push(id);
limit -= 1;
}
if limit == last_limit {
// all empty
break;
}
}
if limit > 0 {
chunk.done = true; chunk.done = true;
} }
// remove pending status
if !self.server {
self.storage
.maybe_update_object_usns("revlog", &revlog_ids, new_usn)?;
self.storage
.maybe_update_object_usns("cards", &card_ids, new_usn)?;
self.storage
.maybe_update_object_usns("notes", &note_ids, new_usn)?;
}
// the fetch associated objects, and return
chunk.revlog = revlog_ids
.into_iter()
.map(|id| {
self.storage.get_revlog_entry(id).map(|e| {
let mut e = e.unwrap();
e.usn = new_usn.unwrap_or(e.usn);
e
})
})
.collect::<Result<_>>()?;
chunk.cards = card_ids
.into_iter()
.map(|id| {
self.storage.get_card(id).map(|e| {
let mut e: CardEntry = e.unwrap().into();
e.usn = new_usn.unwrap_or(e.usn);
e
})
})
.collect::<Result<_>>()?;
chunk.notes = note_ids
.into_iter()
.map(|id| {
self.storage.get_note(id).map(|e| {
let mut e: NoteEntry = e.unwrap().into();
e.usn = new_usn.unwrap_or(e.usn);
e
})
})
.collect::<Result<_>>()?;
Ok(chunk) Ok(chunk)
} }
@ -768,7 +945,7 @@ impl Collection {
fn finalize_sync(&self, state: &SyncState, new_server_mtime: TimestampMillis) -> Result<()> { fn finalize_sync(&self, state: &SyncState, new_server_mtime: TimestampMillis) -> Result<()> {
self.storage.set_last_sync(new_server_mtime)?; self.storage.set_last_sync(new_server_mtime)?;
let mut usn = state.remote_usn; let mut usn = state.latest_usn;
usn.0 += 1; usn.0 += 1;
self.storage.set_usn(usn)?; self.storage.set_usn(usn)?;
self.storage.set_modified_time(new_server_mtime) self.storage.set_modified_time(new_server_mtime)