Anki/rslib/src/sync/collection/tests.rs
Damien Elmes 45af4e3d9e Refactor progress handling (#2549)
Previously it was Backend's responsibility to store the last progress,
and when calling routines in Collection, one had to construct and pass
in a Fn, which wasn't the most ergonomic. This PR adds the last progress
state to the collection, so that the routines no longer need a separate
progress arg, and makes some other tweaks to improve ergonomics.

ThrottlingProgressHandler has been tweaked so that it now stores the
current state, so that callers don't need to store it separately. When
a long-running routine starts, it calls col.new_progress_handler(),
which automatically initializes the data to defaults, and updates the
shared UI state, so we no longer need to manually update the state at
the start of an operation.

The backend shares the Arc<Mutex<>> with the collection, so it can get
at the current state, and so we can update the state when importing a
backup.

Other tweaks:

- The current Incrementor was awkward to use in the media check, which
uses a single incrementing value across multiple method calls, so I've
added a simpler alternative for such cases. The old incrementor method
has been kept, but implemented directly on ThrottlingProgressHandler.
- The full sync code was passing the progress handler in a complicated
way that may once have been required, but no longer is.
- On the Qt side, timers are now stopped before deletion, or they keep
running for a few seconds.
- I left the ChangeTracker using a closure, as it's used for both importing
and syncing.
2023-06-19 13:48:32 +10:00

748 lines
22 KiB
Rust

// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
#![cfg(test)]
use std::future::Future;
use axum::http::StatusCode;
use once_cell::sync::Lazy;
use reqwest::Url;
use serde_json::json;
use tempfile::tempdir;
use tempfile::TempDir;
use tokio::sync::Mutex;
use tokio::sync::MutexGuard;
use tracing::Instrument;
use tracing::Span;
use wiremock::matchers::method;
use wiremock::matchers::path;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use crate::card::CardQueue;
use crate::collection::CollectionBuilder;
use crate::deckconfig::DeckConfig;
use crate::decks::DeckKind;
use crate::error::SyncError;
use crate::error::SyncErrorKind;
use crate::log::set_global_logger;
use crate::notetype::all_stock_notetypes;
use crate::prelude::*;
use crate::revlog::RevlogEntry;
use crate::search::SortMode;
use crate::sync::collection::graves::ApplyGravesRequest;
use crate::sync::collection::meta::MetaRequest;
use crate::sync::collection::normal::NormalSyncer;
use crate::sync::collection::normal::SyncActionRequired;
use crate::sync::collection::normal::SyncOutput;
use crate::sync::collection::protocol::EmptyInput;
use crate::sync::collection::protocol::SyncProtocol;
use crate::sync::collection::start::StartRequest;
use crate::sync::collection::upload::UploadResponse;
use crate::sync::collection::upload::CORRUPT_MESSAGE;
use crate::sync::http_client::HttpSyncClient;
use crate::sync::http_server::SimpleServer;
use crate::sync::login::HostKeyRequest;
use crate::sync::login::SyncAuth;
use crate::sync::request::IntoSyncRequest;
struct TestAuth {
username: String,
password: String,
host_key: String,
}
static AUTH: Lazy<TestAuth> = Lazy::new(|| {
if let Ok(auth) = std::env::var("TEST_AUTH") {
let mut auth = auth.split(':');
TestAuth {
username: auth.next().unwrap().into(),
password: auth.next().unwrap().into(),
host_key: auth.next().unwrap().into(),
}
} else {
TestAuth {
username: "user".to_string(),
password: "pass".to_string(),
host_key: "b2619aa1529dfdc4248e6edbf3c1b2a2b014cf6d".to_string(),
}
}
});
pub(in crate::sync) async fn with_active_server<F, O>(op: F) -> Result<()>
where
F: FnOnce(HttpSyncClient) -> O,
O: Future<Output = Result<()>>,
{
let _ = set_global_logger(None);
// start server
let base_folder = tempdir()?;
std::env::set_var("SYNC_USER1", "user:pass");
let (addr, server_fut) = SimpleServer::make_server(None, base_folder.path()).unwrap();
tokio::spawn(server_fut.instrument(Span::current()));
// when not using ephemeral servers, tests need to be serialized
static LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
let _lock: MutexGuard<()>;
// setup client to connect to it
let endpoint = if let Ok(endpoint) = std::env::var("TEST_ENDPOINT") {
_lock = LOCK.lock().await;
endpoint
} else {
format!("http://{}/", addr)
};
let endpoint = Url::try_from(endpoint.as_str()).unwrap();
let auth = SyncAuth {
hkey: AUTH.host_key.clone(),
endpoint: Some(endpoint),
io_timeout_secs: None,
};
let client = HttpSyncClient::new(auth);
op(client).await
}
fn unwrap_sync_err_kind(err: AnkiError) -> SyncErrorKind {
let AnkiError::SyncError { source: SyncError { kind, .. } } = err else {
panic!("not sync err: {err:?}");
};
kind
}
#[tokio::test]
async fn host_key() -> Result<()> {
with_active_server(|mut client| async move {
let err = client
.host_key(
HostKeyRequest {
username: "bad".to_string(),
password: "bad".to_string(),
}
.try_into_sync_request()?,
)
.await
.unwrap_err();
assert_eq!(err.code, StatusCode::FORBIDDEN);
assert_eq!(
unwrap_sync_err_kind(AnkiError::from(err)),
SyncErrorKind::AuthFailed
);
// hkey should be automatically set after successful login
client.sync_key = String::new();
let resp = client
.host_key(
HostKeyRequest {
username: AUTH.username.clone(),
password: AUTH.password.clone(),
}
.try_into_sync_request()?,
)
.await?
.json()?;
assert_eq!(resp.key, *AUTH.host_key);
Ok(())
})
.await
}
#[tokio::test]
async fn meta() -> Result<()> {
with_active_server(|client| async move {
// unsupported sync version
assert_eq!(
SyncProtocol::meta(
&client,
MetaRequest {
sync_version: 0,
client_version: "".to_string(),
}
.try_into_sync_request()?,
)
.await
.unwrap_err()
.code,
StatusCode::NOT_IMPLEMENTED
);
Ok(())
})
.await
}
#[tokio::test]
async fn aborting_is_idempotent() -> Result<()> {
with_active_server(|mut client| async move {
// abort is a no-op if no sync in progress
client.abort(EmptyInput::request()).await?;
// start a sync
let _graves = client
.start(
StartRequest {
client_usn: Default::default(),
local_is_newer: false,
deprecated_client_graves: None,
}
.try_into_sync_request()?,
)
.await?;
// an abort request with the wrong key is ignored
let orig_key = client.skey().to_string();
client.set_skey("aabbccdd".into());
client.abort(EmptyInput::request()).await?;
// it should succeed with the correct key
client.set_skey(orig_key);
client.abort(EmptyInput::request()).await?;
Ok(())
})
.await
}
#[tokio::test]
async fn new_syncs_cancel_old_ones() -> Result<()> {
with_active_server(|mut client| async move {
let ctx = SyncTestContext::new(client.clone());
// start a sync
let req = StartRequest {
client_usn: Default::default(),
local_is_newer: false,
deprecated_client_graves: None,
}
.try_into_sync_request()?;
let _ = client.start(req.clone()).await?;
// a new sync aborts the previous one
let orig_key = client.skey().to_string();
client.set_skey("1".into());
let _ = client.start(req.clone()).await?;
// old sync can no longer proceed
client.set_skey(orig_key);
let graves_req = ApplyGravesRequest::default().try_into_sync_request()?;
assert_eq!(
client
.apply_graves(graves_req.clone())
.await
.unwrap_err()
.code,
StatusCode::CONFLICT
);
// with the correct key, it can continue
client.set_skey("1".into());
client.apply_graves(graves_req.clone()).await?;
// but a full upload will break the lock
ctx.full_upload(ctx.col1()).await;
assert_eq!(
client
.apply_graves(graves_req.clone())
.await
.unwrap_err()
.code,
StatusCode::CONFLICT
);
// likewise with download
let _ = client.start(req.clone()).await?;
ctx.full_download(ctx.col1()).await;
assert_eq!(
client
.apply_graves(graves_req.clone())
.await
.unwrap_err()
.code,
StatusCode::CONFLICT
);
Ok(())
})
.await
}
#[tokio::test]
async fn sync_roundtrip() -> Result<()> {
with_active_server(|client| async move {
let ctx = SyncTestContext::new(client);
upload_download(&ctx).await?;
regular_sync(&ctx).await?;
Ok(())
})
.await
}
#[tokio::test]
async fn sanity_check_should_roll_back_and_force_full_sync() -> Result<()> {
with_active_server(|client| async move {
let ctx = SyncTestContext::new(client);
upload_download(&ctx).await?;
let mut col1 = ctx.col1();
// add a deck but don't mark it as requiring a sync, which will trigger the
// sanity check to fail
let mut deck = col1.get_or_create_normal_deck("unsynced deck")?;
col1.add_or_update_deck(&mut deck)?;
col1.storage
.db
.execute("update decks set usn=0 where id=?", [deck.id])?;
// the sync should fail
let err = NormalSyncer::new(&mut col1, ctx.cloned_client())
.sync()
.await
.unwrap_err();
assert!(matches!(
err,
AnkiError::SyncError {
source: SyncError {
kind: SyncErrorKind::SanityCheckFailed { .. },
..
}
}
));
// the server should have rolled back
let mut col2 = ctx.col2();
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
// and the client should have forced a one-way sync
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(
out.required,
SyncActionRequired::FullSyncRequired {
upload_ok: true,
download_ok: true,
}
);
Ok(())
})
.await
}
#[tokio::test]
async fn sync_errors_should_prompt_db_check() -> Result<()> {
with_active_server(|client| async move {
let ctx = SyncTestContext::new(client);
upload_download(&ctx).await?;
let mut col1 = ctx.col1();
// Add a a new notetype, and a note that uses it, but don't mark the notetype as
// requiring a sync, which will cause the sync to fail as the note is added.
let mut nt = all_stock_notetypes(&col1.tr).remove(0);
nt.name = "new".into();
col1.add_notetype(&mut nt, false)?;
let mut note = nt.new_note();
note.set_field(0, "test")?;
col1.add_note(&mut note, DeckId(1))?;
col1.storage.db.execute("update notetypes set usn=0", [])?;
// the sync should fail
let err = NormalSyncer::new(&mut col1, ctx.cloned_client())
.sync()
.await
.unwrap_err();
let AnkiError::SyncError { source: SyncError { info: _, kind } } = err else { panic!() };
assert_eq!(kind, SyncErrorKind::DatabaseCheckRequired);
// the server should have rolled back
let mut col2 = ctx.col2();
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
// and the client should be able to sync again without a forced one-way sync
let err = NormalSyncer::new(&mut col1, ctx.cloned_client())
.sync()
.await
.unwrap_err();
let AnkiError::SyncError { source: SyncError { info: _, kind } } = err else { panic!() };
assert_eq!(kind, SyncErrorKind::DatabaseCheckRequired);
Ok(())
})
.await
}
/// Old AnkiMobile versions sent grave ids as strings
#[tokio::test]
async fn string_grave_ids_are_handled() -> Result<()> {
with_active_server(|client| async move {
let req = json!({
"minUsn": 0,
"lnewer": false,
"graves": {
"cards": vec!["1"],
"decks": vec!["2", "3"],
"notes": vec!["4"],
}
});
let req = serde_json::to_vec(&req)
.unwrap()
.try_into_sync_request()
.unwrap();
// should not return err 400
client.start(req.into_output_type()).await.unwrap();
client.abort(EmptyInput::request()).await?;
Ok(())
})
.await?;
// a missing value should be handled
with_active_server(|client| async move {
let req = json!({
"minUsn": 0,
"lnewer": false,
});
let req = serde_json::to_vec(&req)
.unwrap()
.try_into_sync_request()
.unwrap();
client.start(req.into_output_type()).await.unwrap();
client.abort(EmptyInput::request()).await?;
Ok(())
})
.await
}
#[tokio::test]
async fn invalid_uploads_should_be_handled() -> Result<()> {
with_active_server(|client| async move {
let ctx = SyncTestContext::new(client);
let res = ctx
.client
.upload(b"fake data".to_vec().try_into_sync_request()?)
.await?;
assert_eq!(
res.upload_response(),
UploadResponse::Err(CORRUPT_MESSAGE.into())
);
Ok(())
})
.await
}
#[tokio::test]
async fn meta_redirect_is_handled() -> Result<()> {
with_active_server(|client| async move {
let mock_server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/sync/meta"))
.respond_with(
ResponseTemplate::new(308).insert_header("location", client.endpoint.as_str()),
)
.mount(&mock_server)
.await;
// starting from in-sync state
let mut ctx = SyncTestContext::new(client);
upload_download(&ctx).await?;
// add another note to trigger a normal sync
let mut col1 = ctx.col1();
col1_setup(&mut col1);
// switch to bad endpoint
let orig_url = ctx.client.endpoint.to_string();
ctx.client.endpoint = Url::try_from(mock_server.uri().as_str()).unwrap();
// sync should succeed
let out = ctx.normal_sync(&mut col1).await;
// client should have received new endpoint
assert_eq!(out.new_endpoint, Some(orig_url));
// client should not have tried the old endpoint more than once
assert_eq!(mock_server.received_requests().await.unwrap().len(), 1);
Ok(())
})
.await
}
pub(in crate::sync) struct SyncTestContext {
pub folder: TempDir,
pub client: HttpSyncClient,
}
impl SyncTestContext {
pub fn new(client: HttpSyncClient) -> Self {
Self {
folder: tempdir().expect("create temp dir"),
client,
}
}
pub fn col1(&self) -> Collection {
let base = self.folder.path();
CollectionBuilder::new(base.join("col1.anki2"))
.with_desktop_media_paths()
.build()
.unwrap()
}
pub fn col2(&self) -> Collection {
let base = self.folder.path();
CollectionBuilder::new(base.join("col2.anki2"))
.with_desktop_media_paths()
.build()
.unwrap()
}
async fn normal_sync(&self, col: &mut Collection) -> SyncOutput {
NormalSyncer::new(col, self.cloned_client())
.sync()
.await
.unwrap()
}
async fn full_upload(&self, col: Collection) {
col.full_upload_with_server(self.cloned_client())
.await
.unwrap()
}
async fn full_download(&self, col: Collection) {
col.full_download_with_server(self.cloned_client())
.await
.unwrap()
}
fn cloned_client(&self) -> HttpSyncClient {
self.client.clone()
}
}
// Setup + full syncs
/////////////////////
fn col1_setup(col: &mut Collection) {
let nt = col.get_notetype_by_name("Basic").unwrap().unwrap();
let mut note = nt.new_note();
note.set_field(0, "1").unwrap();
col.add_note(&mut note, DeckId(1)).unwrap();
}
async fn upload_download(ctx: &SyncTestContext) -> Result<()> {
let mut col1 = ctx.col1();
col1_setup(&mut col1);
let out = ctx.normal_sync(&mut col1).await;
assert!(matches!(
out.required,
SyncActionRequired::FullSyncRequired { .. }
));
ctx.full_upload(col1).await;
// another collection
let mut col2 = ctx.col2();
// won't allow ankiweb clobber
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(
out.required,
SyncActionRequired::FullSyncRequired {
upload_ok: false,
download_ok: true,
}
);
// fetch so we're in sync
ctx.full_download(col2).await;
Ok(())
}
// Regular syncs
/////////////////////
async fn regular_sync(ctx: &SyncTestContext) -> Result<()> {
// add a deck
let mut col1 = ctx.col1();
let mut col2 = ctx.col2();
let mut deck = col1.get_or_create_normal_deck("new deck")?;
// give it a new option group
let mut dconf = DeckConfig {
name: "new dconf".into(),
..Default::default()
};
col1.add_or_update_deck_config(&mut dconf)?;
if let DeckKind::Normal(deck) = &mut deck.kind {
deck.config_id = dconf.id.0;
}
col1.add_or_update_deck(&mut deck)?;
// and a new notetype
let mut nt = all_stock_notetypes(&col1.tr).remove(0);
nt.name = "new".into();
col1.add_notetype(&mut nt, false)?;
// add another note+card+tag
let mut note = nt.new_note();
note.set_field(0, "2")?;
note.tags.push("tag".into());
col1.add_note(&mut note, deck.id)?;
// mock revlog entry
col1.storage.add_revlog_entry(
&RevlogEntry {
id: RevlogId(123),
cid: CardId(456),
usn: Usn(-1),
interval: 10,
..Default::default()
},
true,
)?;
// config + creation
col1.set_config("test", &"test1")?;
// bumping this will affect 'last studied at' on decks at the moment
// col1.storage.set_creation_stamp(TimestampSecs(12345))?;
// and sync our changes
let remote_meta = ctx
.client
.meta(MetaRequest::request())
.await
.unwrap()
.json()
.unwrap();
let out = col1.sync_meta()?.compared_to_remote(remote_meta, None);
assert_eq!(out.required, SyncActionRequired::NormalSyncRequired);
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
// sync the other collection
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
let ntid = nt.id;
let deckid = deck.id;
let dconfid = dconf.id;
let noteid = note.id;
let cardid = col1.search_cards(note.id, SortMode::NoOrder)?[0];
let revlogid = RevlogId(123);
let compare_sides = |col1: &mut Collection, col2: &mut Collection| -> Result<()> {
assert_eq!(
col1.get_notetype(ntid)?.unwrap(),
col2.get_notetype(ntid)?.unwrap()
);
assert_eq!(
col1.get_deck(deckid)?.unwrap(),
col2.get_deck(deckid)?.unwrap()
);
assert_eq!(
col1.get_deck_config(dconfid, false)?.unwrap(),
col2.get_deck_config(dconfid, false)?.unwrap()
);
assert_eq!(
col1.storage.get_note(noteid)?.unwrap(),
col2.storage.get_note(noteid)?.unwrap()
);
assert_eq!(
col1.storage.get_card(cardid)?.unwrap(),
col2.storage.get_card(cardid)?.unwrap()
);
assert_eq!(
col1.storage.get_revlog_entry(revlogid)?,
col2.storage.get_revlog_entry(revlogid)?,
);
assert_eq!(
col1.storage.get_all_config()?,
col2.storage.get_all_config()?
);
assert_eq!(
col1.storage.creation_stamp()?,
col2.storage.creation_stamp()?
);
// server doesn't send tag usns, so we can only compare tags, not usns,
// as the usns may not match
assert_eq!(
col1.storage
.all_tags()?
.into_iter()
.map(|t| t.name)
.collect::<Vec<_>>(),
col2.storage
.all_tags()?
.into_iter()
.map(|t| t.name)
.collect::<Vec<_>>()
);
std::thread::sleep(std::time::Duration::from_millis(1));
Ok(())
};
// make sure everything has been transferred across
compare_sides(&mut col1, &mut col2)?;
// make some modifications
let mut note = col2.storage.get_note(note.id)?.unwrap();
note.set_field(1, "new")?;
note.tags.push("tag2".into());
col2.update_note(&mut note)?;
col2.get_and_update_card(cardid, |card| {
card.queue = CardQueue::Review;
Ok(())
})?;
let mut deck = col2.storage.get_deck(deck.id)?.unwrap();
deck.name = NativeDeckName::from_native_str("newer");
col2.add_or_update_deck(&mut deck)?;
let mut nt = col2.storage.get_notetype(nt.id)?.unwrap();
nt.name = "newer".into();
col2.update_notetype(&mut nt, false)?;
// sync the changes back
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
// should still match
compare_sides(&mut col1, &mut col2)?;
// deletions should sync too
for table in &["cards", "notes", "decks"] {
assert_eq!(
col1.storage
.db_scalar::<u8>(&format!("select count() from {}", table))?,
2
);
}
// fixme: inconsistent usn arg
std::thread::sleep(std::time::Duration::from_millis(1));
col1.remove_cards_and_orphaned_notes(&[cardid])?;
let usn = col1.usn()?;
col1.remove_note_only_undoable(noteid, usn)?;
col1.remove_decks_and_child_decks(&[deckid])?;
let out = ctx.normal_sync(&mut col1).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
let out = ctx.normal_sync(&mut col2).await;
assert_eq!(out.required, SyncActionRequired::NoChanges);
for table in &["cards", "notes", "decks"] {
assert_eq!(
col2.storage
.db_scalar::<u8>(&format!("select count() from {}", table))?,
1
);
}
// removing things like a notetype forces a full sync
std::thread::sleep(std::time::Duration::from_millis(1));
col2.remove_notetype(ntid)?;
let out = ctx.normal_sync(&mut col2).await;
assert!(matches!(
out.required,
SyncActionRequired::FullSyncRequired { .. }
));
Ok(())
}