start on HTTP client for syncing

This commit is contained in:
Damien Elmes 2020-05-26 17:18:50 +10:00
parent 6204a86879
commit 4fcb10bfa9
8 changed files with 506 additions and 12 deletions

View file

@ -46,6 +46,7 @@ futures = "0.3.4"
rand = "0.7.3" rand = "0.7.3"
num-integer = "0.1.42" num-integer = "0.1.42"
itertools = "0.9.0" itertools = "0.9.0"
flate2 = "1.0.14"
[target.'cfg(target_vendor="apple")'.dependencies.rusqlite] [target.'cfg(target_vendor="apple")'.dependencies.rusqlite]
version = "0.23.1" version = "0.23.1"

View file

@ -3,13 +3,8 @@
#![deny(unused_must_use)] #![deny(unused_must_use)]
mod backend_proto;
pub fn version() -> &'static str {
include_str!("../../meta/version").trim()
}
pub mod backend; pub mod backend;
mod backend_proto;
pub mod card; pub mod card;
pub mod cloze; pub mod cloze;
pub mod collection; pub mod collection;
@ -26,10 +21,12 @@ pub mod media;
pub mod notes; pub mod notes;
pub mod notetype; pub mod notetype;
mod preferences; mod preferences;
pub mod prelude;
pub mod sched; pub mod sched;
pub mod search; pub mod search;
pub mod serde; pub mod serde;
pub mod storage; pub mod storage;
mod sync;
pub mod tags; pub mod tags;
pub mod template; pub mod template;
pub mod template_filters; pub mod template_filters;
@ -37,3 +34,4 @@ pub mod text;
pub mod timestamp; pub mod timestamp;
pub mod types; pub mod types;
pub mod undo; pub mod undo;
pub mod version;

View file

@ -21,6 +21,7 @@ use std::io::{Read, Write};
use std::path::Path; use std::path::Path;
use std::{io, time}; use std::{io, time};
use time::Duration; use time::Duration;
use version::sync_client_version;
static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_FILES: usize = 25;
static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize;
@ -244,7 +245,7 @@ where
let resp = self let resp = self
.client .client
.get(&url) .get(&url)
.query(&[("k", hkey), ("v", &version_string())]) .query(&[("k", hkey), ("v", &sync_client_version())])
.send() .send()
.await? .await?
.error_for_status()?; .error_for_status()?;
@ -809,10 +810,6 @@ fn zip_files<'a>(
Ok(Some(w.into_inner())) Ok(Some(w.into_inner()))
} }
fn version_string() -> String {
format!("anki,{},{}", version(), std::env::consts::OS)
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use crate::err::Result; use crate::err::Result;

View file

@ -209,7 +209,7 @@ pub(crate) fn field_checksum(text: &str) -> u32 {
u32::from_be_bytes(digest[..4].try_into().unwrap()) u32::from_be_bytes(digest[..4].try_into().unwrap())
} }
fn guid() -> String { pub(crate) fn guid() -> String {
anki_base91(rand::random()) anki_base91(rand::random())
} }

15
rslib/src/prelude.rs Normal file
View file

@ -0,0 +1,15 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
pub use crate::{
card::CardID,
collection::Collection,
deckconf::DeckConfID,
decks::DeckID,
err::{AnkiError, Result},
notes::NoteID,
notetype::NoteTypeID,
timestamp::{TimestampMillis, TimestampSecs},
types::Usn,
};
pub use slog::{debug, Logger};

View file

@ -0,0 +1,287 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use super::*;
static SYNC_VERSION: u8 = 10;
pub struct HTTPSyncClient<'a> {
hkey: Option<String>,
skey: String,
client: Client,
endpoint: &'a str,
}
#[derive(Serialize)]
struct HostKeyIn<'a> {
#[serde(rename = "u")]
username: &'a str,
#[serde(rename = "p")]
password: &'a str,
}
#[derive(Deserialize)]
struct HostKeyOut {
key: String,
}
#[derive(Serialize)]
struct MetaIn<'a> {
#[serde(rename = "v")]
sync_version: u8,
#[serde(rename = "cv")]
client_version: &'a str,
}
#[derive(Serialize, Deserialize, Debug)]
struct StartIn {
#[serde(rename = "minUsn")]
minimum_usn: Usn,
#[serde(rename = "offset")]
minutes_west: i32,
// only used to modify behaviour of changes()
#[serde(rename = "lnewer")]
client_is_newer: bool,
// used by 2.0 clients
#[serde(skip_serializing_if = "Option::is_none")]
client_graves: Option<Graves>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ApplyGravesIn {
chunk: Graves,
}
#[derive(Serialize, Deserialize, Debug)]
struct ApplyChangesIn {
changes: Changes,
}
#[derive(Serialize, Deserialize, Debug)]
struct ApplyChunkIn {
chunk: Chunk,
}
#[derive(Serialize, Deserialize, Debug)]
struct SanityCheckIn {
client: SanityCheckCounts,
}
#[derive(Serialize)]
struct Empty {}
impl HTTPSyncClient<'_> {
pub fn new<'a>(endpoint: &'a str) -> HTTPSyncClient<'a> {
let client = Client::builder()
.connect_timeout(Duration::from_secs(30))
.timeout(Duration::from_secs(60))
.build()
.unwrap();
let skey = guid();
HTTPSyncClient {
hkey: None,
skey,
client,
endpoint,
}
}
async fn json_request<T>(&self, method: &str, json: &T) -> Result<Response>
where
T: serde::Serialize,
{
let req_json = serde_json::to_vec(json)?;
let mut gz = GzEncoder::new(Vec::new(), Compression::fast());
gz.write_all(&req_json)?;
let part = multipart::Part::bytes(gz.finish()?);
self.request(method, part).await
}
async fn json_request_deserialized<T, T2>(&self, method: &str, json: &T) -> Result<T2>
where
T: Serialize,
T2: DeserializeOwned,
{
self.json_request(method, json)
.await?
.json()
.await
.map_err(Into::into)
}
async fn request(&self, method: &str, data_part: multipart::Part) -> Result<Response> {
let data_part = data_part.file_name("data");
let mut form = multipart::Form::new()
.part("data", data_part)
.text("c", "1");
if let Some(hkey) = &self.hkey {
form = form.text("k", hkey.clone()).text("s", self.skey.clone());
}
let url = format!("{}{}", self.endpoint, method);
let req = self.client.post(&url).multipart(form);
req.send().await?.error_for_status().map_err(Into::into)
}
async fn login(&mut self, username: &str, password: &str) -> Result<()> {
let resp: HostKeyOut = self
.json_request_deserialized("hostKey", &HostKeyIn { username, password })
.await?;
self.hkey = Some(resp.key);
Ok(())
}
pub(crate) fn hkey(&self) -> &str {
self.hkey.as_ref().unwrap()
}
async fn meta(&mut self) -> Result<ServerMeta> {
let meta_in = MetaIn {
sync_version: SYNC_VERSION,
client_version: sync_client_version(),
};
self.json_request_deserialized("meta", &meta_in).await
}
async fn start(&mut self, input: &StartIn) -> Result<Graves> {
self.json_request_deserialized("start", input).await
}
async fn apply_graves(&mut self, chunk: Graves) -> Result<()> {
let input = ApplyGravesIn { chunk };
let resp = self.json_request("applyGraves", &input).await?;
resp.error_for_status()?;
Ok(())
}
async fn apply_changes(&mut self, changes: Changes) -> Result<Changes> {
let input = ApplyChangesIn { changes };
self.json_request_deserialized("applyChanges", &input).await
}
async fn chunk(&mut self) -> Result<Chunk> {
self.json_request_deserialized("chunk", &Empty {}).await
}
async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> {
let input = ApplyChunkIn { chunk };
let resp = self.json_request("applyChunk", &input).await?;
resp.error_for_status()?;
Ok(())
}
async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut> {
let input = SanityCheckIn { client };
self.json_request_deserialized("sanityCheck2", &input).await
}
async fn finish(&mut self) -> Result<()> {
let resp = self.json_request("finish", &Empty {}).await?;
resp.error_for_status()?;
Ok(())
}
async fn abort(&mut self) -> Result<()> {
let resp = self.json_request("abort", &Empty {}).await?;
resp.error_for_status()?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::err::SyncErrorKind;
use tokio::runtime::Runtime;
static ENDPOINT: &'static str = "https://sync.ankiweb.net/sync/";
async fn http_client_inner(username: String, password: String) -> Result<()> {
let mut syncer = HTTPSyncClient::new(ENDPOINT);
assert!(matches!(
syncer.login("nosuchuser", "nosuchpass").await,
Err(AnkiError::SyncError {
kind: SyncErrorKind::AuthFailed,
..
})
));
assert!(syncer.login(&username, &password).await.is_ok());
syncer.meta().await?;
// aborting before a start is a conflict
assert!(matches!(
syncer.abort().await,
Err(AnkiError::SyncError {
kind: SyncErrorKind::Conflict,
..
})
));
let input = StartIn {
minimum_usn: Usn(0),
minutes_west: 0,
client_is_newer: true,
client_graves: None,
};
let _graves = syncer.start(&input).await?;
// aborting should now work
syncer.abort().await?;
// start again, and continue
let _graves = syncer.start(&input).await?;
syncer.apply_graves(Graves::default()).await?;
let _changes = syncer.apply_changes(Changes::default()).await?;
let _chunk = syncer.chunk().await?;
syncer
.apply_chunk(Chunk {
done: true,
..Default::default()
})
.await?;
let _out = syncer
.sanity_check(SanityCheckCounts {
counts: SanityCheckDueCounts {
new: 0,
learn: 0,
review: 0,
},
cards: 0,
notes: 0,
revlog: 0,
graves: 0,
notetypes: 0,
decks: 0,
deck_config: 0,
})
.await?;
syncer.finish().await?;
Ok(())
}
#[test]
fn http_client() -> Result<()> {
let user = match std::env::var("TEST_SYNC_USER") {
Ok(s) => s,
Err(_) => {
return Ok(());
}
};
let pass = std::env::var("TEST_SYNC_PASS").unwrap();
let mut rt = Runtime::new().unwrap();
rt.block_on(http_client_inner(user, pass))
}
}

172
rslib/src/sync/mod.rs Normal file
View file

@ -0,0 +1,172 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
mod http_client;
use crate::{
card::{CardQueue, CardType},
deckconf::DeckConfSchema11,
decks::DeckSchema11,
notes::guid,
notetype::NoteTypeSchema11,
prelude::*,
version::sync_client_version,
};
use flate2::write::GzEncoder;
use flate2::Compression;
use reqwest::{multipart, Client, Response};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value;
use serde_tuple::Serialize_tuple;
use std::io::prelude::*;
use std::{collections::HashMap, time::Duration};
#[derive(Default, Debug)]
pub struct SyncProgress {}
#[derive(Serialize, Deserialize, Debug)]
struct ServerMeta {
#[serde(rename = "mod")]
modified: TimestampMillis,
#[serde(rename = "scm")]
schema: TimestampMillis,
usn: Usn,
#[serde(rename = "ts")]
current_time: TimestampSecs,
#[serde(rename = "msg")]
server_message: String,
#[serde(rename = "cont")]
should_continue: bool,
#[serde(rename = "hostNum")]
shard_number: u32,
}
#[derive(Serialize, Deserialize, Debug, Default)]
struct Graves {
cards: Vec<CardID>,
decks: Vec<DeckID>,
notes: Vec<NoteID>,
}
#[derive(Serialize_tuple, Deserialize, Debug, Default)]
struct DecksAndConfig {
decks: Vec<DeckSchema11>,
config: Vec<DeckConfSchema11>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
struct Changes {
#[serde(rename = "models")]
notetypes: Vec<NoteTypeSchema11>,
#[serde(rename = "decks")]
decks_and_config: DecksAndConfig,
tags: Vec<String>,
// the following are only sent if local is newer
#[serde(skip_serializing_if = "Option::is_none", rename = "conf")]
config: Option<HashMap<String, Value>>,
#[serde(skip_serializing_if = "Option::is_none", rename = "crt")]
creation_stamp: Option<TimestampSecs>,
}
#[derive(Serialize, Deserialize, Debug, Default)]
struct Chunk {
done: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
revlog: Vec<ReviewLogEntry>,
#[serde(skip_serializing_if = "Vec::is_empty")]
cards: Vec<CardEntry>,
#[serde(skip_serializing_if = "Vec::is_empty")]
notes: Vec<NoteEntry>,
}
#[derive(Serialize_tuple, Deserialize, Debug)]
struct ReviewLogEntry {
id: TimestampMillis,
cid: CardID,
usn: Usn,
ease: u8,
#[serde(rename = "ivl")]
interval: i32,
#[serde(rename = "lastIvl")]
last_interval: i32,
factor: u32,
time: u32,
#[serde(rename = "type")]
kind: u8,
}
#[derive(Serialize_tuple, Deserialize, Debug)]
struct NoteEntry {
id: NoteID,
guid: String,
#[serde(rename = "mid")]
ntid: NoteTypeID,
#[serde(rename = "mod")]
mtime: TimestampSecs,
usn: Usn,
tags: String,
fields: String,
sfld: String, // always empty
csum: String, // always empty
flags: u32,
data: String,
}
#[derive(Serialize_tuple, Deserialize, Debug)]
struct CardEntry {
id: CardID,
nid: NoteID,
did: DeckID,
ord: u16,
mtime: TimestampSecs,
usn: Usn,
ctype: CardType,
queue: CardQueue,
due: i32,
ivl: u32,
factor: u16,
reps: u32,
lapses: u32,
left: u32,
odue: i32,
odid: DeckID,
flags: u8,
data: String,
}
#[derive(Serialize, Deserialize, Debug)]
struct SanityCheckOut {
status: SanityCheckStatus,
#[serde(rename = "c")]
client: Option<SanityCheckCounts>,
#[serde(rename = "s")]
server: Option<SanityCheckCounts>,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "lowercase")]
enum SanityCheckStatus {
Ok,
Bad,
}
#[derive(Serialize_tuple, Deserialize, Debug)]
struct SanityCheckCounts {
counts: SanityCheckDueCounts,
cards: u32,
notes: u32,
revlog: u32,
graves: u32,
#[serde(rename = "models")]
notetypes: u32,
decks: u32,
deck_config: u32,
}
#[derive(Serialize_tuple, Deserialize, Debug)]
struct SanityCheckDueCounts {
new: u32,
learn: u32,
review: u32,
}

24
rslib/src/version.rs Normal file
View file

@ -0,0 +1,24 @@
// Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use lazy_static::lazy_static;
pub fn version() -> &'static str {
include_str!("../../meta/version").trim()
}
pub fn buildhash() -> &'static str {
include_str!("../../meta/buildhash")
}
pub(crate) fn sync_client_version() -> &'static str {
lazy_static! {
static ref VER: String = format!(
"anki,{} ({}),{}",
version(),
buildhash(),
std::env::consts::OS
);
}
&VER
}