move sync JSON into separate file; add enum wrapper

This commit is contained in:
Damien Elmes 2021-01-10 16:20:02 +10:00
parent fc216401e5
commit 31f941267c
3 changed files with 227 additions and 186 deletions

88
rslib/src/sync/http.rs Normal file
View file

@ -0,0 +1,88 @@
use super::{Chunk, Graves, SanityCheckCounts, UnchunkedChanges};
use crate::prelude::*;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub enum SyncRequest {
HostKey(HostKeyIn),
Meta(MetaIn),
Start(StartIn),
ApplyGraves(ApplyGravesIn),
ApplyChanges(ApplyChangesIn),
Chunk,
ApplyChunk(ApplyChunkIn),
#[serde(rename = "sanityCheck2")]
SanityCheck(SanityCheckIn),
Finish,
Abort,
}
impl SyncRequest {
/// Return method name and payload bytes.
pub(crate) fn to_method_and_json(&self) -> Result<(&'static str, Vec<u8>)> {
use serde_json::to_vec;
Ok(match self {
SyncRequest::HostKey(v) => ("hostKey", to_vec(&v)?),
SyncRequest::Meta(v) => ("meta", to_vec(&v)?),
SyncRequest::Start(v) => ("start", to_vec(&v)?),
SyncRequest::ApplyGraves(v) => ("applyGraves", to_vec(&v)?),
SyncRequest::ApplyChanges(v) => ("applyChanges", to_vec(&v)?),
SyncRequest::Chunk => ("chunk", b"{}".to_vec()),
SyncRequest::ApplyChunk(v) => ("applyChunk", to_vec(&v)?),
SyncRequest::SanityCheck(v) => ("sanityCheck2", to_vec(&v)?),
SyncRequest::Finish => ("finish", b"{}".to_vec()),
SyncRequest::Abort => ("abort", b"{}".to_vec()),
})
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct HostKeyIn {
#[serde(rename = "u")]
pub username: String,
#[serde(rename = "p")]
pub password: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct HostKeyOut {
pub key: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetaIn {
#[serde(rename = "v")]
pub sync_version: u8,
#[serde(rename = "cv")]
pub client_version: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct StartIn {
#[serde(rename = "minUsn")]
pub client_usn: Usn,
#[serde(rename = "offset", default)]
pub minutes_west: Option<i32>,
#[serde(rename = "lnewer")]
pub local_is_newer: bool,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ApplyGravesIn {
pub chunk: Graves,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ApplyChangesIn {
pub changes: UnchunkedChanges,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ApplyChunkIn {
pub chunk: Chunk,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SanityCheckIn {
pub client: SanityCheckCounts,
pub full: bool,
}

View file

@ -2,11 +2,29 @@
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
use super::server::SyncServer; use super::server::SyncServer;
use super::*; use super::{
Chunk, FullSyncProgress, Graves, SanityCheckCounts, SanityCheckOut, SyncMeta, UnchunkedChanges,
};
use crate::prelude::*;
use crate::{err::SyncErrorKind, notes::guid, version::sync_client_version};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::Stream; use futures::Stream;
use futures::StreamExt;
use reqwest::Body; use reqwest::Body;
use reqwest::{multipart, Client, Response};
use serde::de::DeserializeOwned;
use super::http::{
ApplyChangesIn, ApplyChunkIn, ApplyGravesIn, HostKeyIn, HostKeyOut, MetaIn, SanityCheckIn,
StartIn, SyncRequest,
};
use std::io::prelude::*;
use std::path::Path;
use std::time::Duration;
use tempfile::NamedTempFile;
// fixme: 100mb limit // fixme: 100mb limit
@ -22,61 +40,6 @@ pub struct HTTPSyncClient {
full_sync_progress_fn: Option<FullSyncProgressFn>, full_sync_progress_fn: Option<FullSyncProgressFn>,
} }
#[derive(Serialize)]
struct HostKeyIn<'a> {
#[serde(rename = "u")]
username: &'a str,
#[serde(rename = "p")]
password: &'a str,
}
#[derive(Deserialize)]
struct HostKeyOut {
key: String,
}
#[derive(Serialize)]
struct MetaIn<'a> {
#[serde(rename = "v")]
sync_version: u8,
#[serde(rename = "cv")]
client_version: &'a str,
}
#[derive(Serialize, Deserialize, Debug)]
struct StartIn {
#[serde(rename = "minUsn")]
local_usn: Usn,
#[serde(rename = "offset")]
minutes_west: Option<i32>,
// only used to modify behaviour of changes()
#[serde(rename = "lnewer")]
local_is_newer: bool,
// used by 2.0 clients
#[serde(skip_serializing_if = "Option::is_none")]
local_graves: Option<Graves>,
}
#[derive(Serialize, Deserialize, Debug)]
struct ApplyGravesIn {
chunk: Graves,
}
#[derive(Serialize, Deserialize, Debug)]
struct ApplyChangesIn {
changes: UnchunkedChanges,
}
#[derive(Serialize, Deserialize, Debug)]
struct ApplyChunkIn {
chunk: Chunk,
}
#[derive(Serialize, Deserialize, Debug)]
struct SanityCheckIn {
client: SanityCheckCounts,
full: bool,
}
pub struct Timeouts { pub struct Timeouts {
pub connect_secs: u64, pub connect_secs: u64,
pub request_secs: u64, pub request_secs: u64,
@ -99,158 +62,64 @@ impl Timeouts {
} }
} }
} }
#[derive(Serialize)]
struct Empty {}
impl HTTPSyncClient {
pub fn new(hkey: Option<String>, host_number: u32) -> HTTPSyncClient {
let timeouts = Timeouts::new();
let client = Client::builder()
.connect_timeout(Duration::from_secs(timeouts.connect_secs))
.timeout(Duration::from_secs(timeouts.request_secs))
.io_timeout(Duration::from_secs(timeouts.io_secs))
.build()
.unwrap();
let skey = guid();
let endpoint = sync_endpoint(host_number);
HTTPSyncClient {
hkey,
skey,
client,
endpoint,
full_sync_progress_fn: None,
}
}
pub fn set_full_sync_progress_fn(&mut self, func: Option<FullSyncProgressFn>) {
self.full_sync_progress_fn = func;
}
async fn json_request<T>(&self, method: &str, json: &T, timeout_long: bool) -> Result<Response>
where
T: serde::Serialize,
{
let req_json = serde_json::to_vec(json)?;
let mut gz = GzEncoder::new(Vec::new(), Compression::fast());
gz.write_all(&req_json)?;
let part = multipart::Part::bytes(gz.finish()?);
self.request(method, part, timeout_long).await
}
async fn json_request_deserialized<T, T2>(&self, method: &str, json: &T) -> Result<T2>
where
T: Serialize,
T2: DeserializeOwned,
{
self.json_request(method, json, false)
.await?
.json()
.await
.map_err(Into::into)
}
async fn request(
&self,
method: &str,
data_part: multipart::Part,
timeout_long: bool,
) -> Result<Response> {
let data_part = data_part.file_name("data");
let mut form = multipart::Form::new()
.part("data", data_part)
.text("c", "1");
if let Some(hkey) = &self.hkey {
form = form.text("k", hkey.clone()).text("s", self.skey.clone());
}
let url = format!("{}{}", self.endpoint, method);
let mut req = self.client.post(&url).multipart(form);
if timeout_long {
req = req.timeout(Duration::from_secs(60 * 60));
}
req.send().await?.error_for_status().map_err(Into::into)
}
pub(crate) async fn login(&mut self, username: &str, password: &str) -> Result<()> {
let resp: HostKeyOut = self
.json_request_deserialized("hostKey", &HostKeyIn { username, password })
.await?;
self.hkey = Some(resp.key);
Ok(())
}
pub(crate) fn hkey(&self) -> &str {
self.hkey.as_ref().unwrap()
}
}
#[async_trait(?Send)] #[async_trait(?Send)]
impl SyncServer for HTTPSyncClient { impl SyncServer for HTTPSyncClient {
async fn meta(&self) -> Result<SyncMeta> { async fn meta(&self) -> Result<SyncMeta> {
let meta_in = MetaIn { let input = SyncRequest::Meta(MetaIn {
sync_version: SYNC_VERSION, sync_version: SYNC_VERSION,
client_version: sync_client_version(), client_version: sync_client_version().to_string(),
}; });
self.json_request_deserialized("meta", &meta_in).await self.json_request(&input).await
} }
async fn start( async fn start(
&mut self, &mut self,
local_usn: Usn, client_usn: Usn,
minutes_west: Option<i32>, minutes_west: Option<i32>,
local_is_newer: bool, local_is_newer: bool,
) -> Result<Graves> { ) -> Result<Graves> {
let input = StartIn { let input = SyncRequest::Start(StartIn {
local_usn, client_usn,
minutes_west, minutes_west,
local_is_newer, local_is_newer,
local_graves: None, });
}; self.json_request(&input).await
self.json_request_deserialized("start", &input).await
} }
async fn apply_graves(&mut self, chunk: Graves) -> Result<()> { async fn apply_graves(&mut self, chunk: Graves) -> Result<()> {
let input = ApplyGravesIn { chunk }; let input = SyncRequest::ApplyGraves(ApplyGravesIn { chunk });
let resp = self.json_request("applyGraves", &input, false).await?; self.json_request(&input).await
resp.error_for_status()?;
Ok(())
} }
async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result<UnchunkedChanges> { async fn apply_changes(&mut self, changes: UnchunkedChanges) -> Result<UnchunkedChanges> {
let input = ApplyChangesIn { changes }; let input = SyncRequest::ApplyChanges(ApplyChangesIn { changes });
self.json_request_deserialized("applyChanges", &input).await self.json_request(&input).await
} }
async fn chunk(&mut self) -> Result<Chunk> { async fn chunk(&mut self) -> Result<Chunk> {
self.json_request_deserialized("chunk", &Empty {}).await let input = SyncRequest::Chunk;
self.json_request(&input).await
} }
async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> { async fn apply_chunk(&mut self, chunk: Chunk) -> Result<()> {
let input = ApplyChunkIn { chunk }; let input = SyncRequest::ApplyChunk(ApplyChunkIn { chunk });
let resp = self.json_request("applyChunk", &input, false).await?; self.json_request(&input).await
resp.error_for_status()?;
Ok(())
} }
async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut> { async fn sanity_check(&mut self, client: SanityCheckCounts) -> Result<SanityCheckOut> {
let input = SanityCheckIn { client, full: true }; let input = SyncRequest::SanityCheck(SanityCheckIn { client, full: true });
self.json_request_deserialized("sanityCheck2", &input).await self.json_request(&input).await
} }
async fn finish(&mut self) -> Result<TimestampMillis> { async fn finish(&mut self) -> Result<TimestampMillis> {
Ok(self.json_request_deserialized("finish", &Empty {}).await?) let input = SyncRequest::Finish;
self.json_request(&input).await
} }
async fn abort(&mut self) -> Result<()> { async fn abort(&mut self) -> Result<()> {
let resp = self.json_request("abort", &Empty {}, false).await?; let input = SyncRequest::Abort;
resp.error_for_status()?; self.json_request(&input).await
Ok(())
} }
async fn full_upload(mut self: Box<Self>, col_path: &Path, _can_consume: bool) -> Result<()> { async fn full_upload(mut self: Box<Self>, col_path: &Path, _can_consume: bool) -> Result<()> {
@ -301,13 +170,101 @@ impl SyncServer for HTTPSyncClient {
} }
impl HTTPSyncClient { impl HTTPSyncClient {
pub fn new(hkey: Option<String>, host_number: u32) -> HTTPSyncClient {
let timeouts = Timeouts::new();
let client = Client::builder()
.connect_timeout(Duration::from_secs(timeouts.connect_secs))
.timeout(Duration::from_secs(timeouts.request_secs))
.io_timeout(Duration::from_secs(timeouts.io_secs))
.build()
.unwrap();
let skey = guid();
let endpoint = sync_endpoint(host_number);
HTTPSyncClient {
hkey,
skey,
client,
endpoint,
full_sync_progress_fn: None,
}
}
pub fn set_full_sync_progress_fn(&mut self, func: Option<FullSyncProgressFn>) {
self.full_sync_progress_fn = func;
}
async fn json_request<T>(&self, req: &SyncRequest) -> Result<T>
where
T: DeserializeOwned,
{
let (method, req_json) = req.to_method_and_json()?;
self.request_bytes(method, &req_json, false)
.await?
.json()
.await
.map_err(Into::into)
}
async fn request_bytes(
&self,
method: &str,
req: &[u8],
timeout_long: bool,
) -> Result<Response> {
let mut gz = GzEncoder::new(Vec::new(), Compression::fast());
gz.write_all(req)?;
let part = multipart::Part::bytes(gz.finish()?);
let resp = self.request(method, part, timeout_long).await?;
resp.error_for_status().map_err(Into::into)
}
async fn request(
&self,
method: &str,
data_part: multipart::Part,
timeout_long: bool,
) -> Result<Response> {
let data_part = data_part.file_name("data");
let mut form = multipart::Form::new()
.part("data", data_part)
.text("c", "1");
if let Some(hkey) = &self.hkey {
form = form.text("k", hkey.clone()).text("s", self.skey.clone());
}
let url = format!("{}{}", self.endpoint, method);
let mut req = self.client.post(&url).multipart(form);
if timeout_long {
req = req.timeout(Duration::from_secs(60 * 60));
}
req.send().await?.error_for_status().map_err(Into::into)
}
pub(crate) async fn login<S: Into<String>>(&mut self, username: S, password: S) -> Result<()> {
let input = SyncRequest::HostKey(HostKeyIn {
username: username.into(),
password: password.into(),
});
let output: HostKeyOut = self.json_request(&input).await?;
self.hkey = Some(output.key);
Ok(())
}
pub(crate) fn hkey(&self) -> &str {
self.hkey.as_ref().unwrap()
}
async fn download_inner( async fn download_inner(
&self, &self,
) -> Result<( ) -> Result<(
usize, usize,
impl Stream<Item = std::result::Result<Bytes, reqwest::Error>>, impl Stream<Item = std::result::Result<Bytes, reqwest::Error>>,
)> { )> {
let resp: reqwest::Response = self.json_request("download", &Empty {}, true).await?; let resp: reqwest::Response = self.request_bytes("download", b"{}", true).await?;
let len = resp.content_length().unwrap_or_default(); let len = resp.content_length().unwrap_or_default();
Ok((len as usize, resp.bytes_stream())) Ok((len as usize, resp.bytes_stream()))
} }
@ -386,7 +343,7 @@ fn sync_endpoint(host_number: u32) -> String {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::*; use super::*;
use crate::err::SyncErrorKind; use crate::{err::SyncErrorKind, sync::SanityCheckDueCounts};
use tokio::runtime::Runtime; use tokio::runtime::Runtime;
async fn http_client_inner(username: String, password: String) -> Result<()> { async fn http_client_inner(username: String, password: String) -> Result<()> {

View file

@ -1,6 +1,7 @@
// Copyright: Ankitects Pty Ltd and contributors // Copyright: Ankitects Pty Ltd and contributors
// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html // License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
pub mod http;
mod http_client; mod http_client;
mod server; mod server;
@ -10,30 +11,23 @@ use crate::{
deckconf::DeckConfSchema11, deckconf::DeckConfSchema11,
decks::DeckSchema11, decks::DeckSchema11,
err::SyncErrorKind, err::SyncErrorKind,
notes::{guid, Note}, notes::Note,
notetype::{NoteType, NoteTypeSchema11}, notetype::{NoteType, NoteTypeSchema11},
prelude::*, prelude::*,
revlog::RevlogEntry, revlog::RevlogEntry,
serde::{default_on_invalid, deserialize_int_from_number}, serde::{default_on_invalid, deserialize_int_from_number},
storage::open_and_check_sqlite_file, storage::open_and_check_sqlite_file,
tags::{join_tags, split_tags}, tags::{join_tags, split_tags},
version::sync_client_version,
}; };
use flate2::write::GzEncoder;
use flate2::Compression;
use futures::StreamExt;
pub use http_client::FullSyncProgressFn; pub use http_client::FullSyncProgressFn;
use http_client::HTTPSyncClient; use http_client::HTTPSyncClient;
pub use http_client::Timeouts; pub use http_client::Timeouts;
use itertools::Itertools; use itertools::Itertools;
use reqwest::{multipart, Client, Response}; use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
use serde_tuple::Serialize_tuple; use serde_tuple::Serialize_tuple;
use server::SyncServer; pub(crate) use server::SyncServer;
use std::io::prelude::*; use std::collections::HashMap;
use std::{collections::HashMap, path::Path, time::Duration};
use tempfile::NamedTempFile;
#[derive(Default, Debug, Clone, Copy)] #[derive(Default, Debug, Clone, Copy)]
pub struct NormalSyncProgress { pub struct NormalSyncProgress {
@ -1194,6 +1188,8 @@ impl From<SyncActionRequired> for sync_status_out::Required {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::path::Path;
use async_trait::async_trait; use async_trait::async_trait;
use lazy_static::lazy_static; use lazy_static::lazy_static;