From ec9abf1ce56e3fb769ac555ec25fdc15a2d68a7a Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Tue, 4 Feb 2020 12:46:57 +1000 Subject: [PATCH] pass in endpoint --- qt/aqt/mediasync.py | 21 +++++---- qt/aqt/profiles.py | 3 ++ rslib/src/backend.rs | 2 +- rslib/src/media/sync.rs | 99 ++++++++++++++++++++--------------------- 4 files changed, 62 insertions(+), 63 deletions(-) diff --git a/qt/aqt/mediasync.py b/qt/aqt/mediasync.py index b0af95d0d..1a57589ac 100644 --- a/qt/aqt/mediasync.py +++ b/qt/aqt/mediasync.py @@ -42,7 +42,6 @@ class MediaSyncState: # fixme: abort when closing collection/app -# fixme: shards # fixme: concurrent modifications during upload step # fixme: mediaSanity # fixme: corruptMediaDB @@ -110,25 +109,25 @@ class MediaSyncer: self._log_and_notify(_("Media syncing disabled.")) return - shard = None - self._log_and_notify(_("Media sync starting...")) self._sync_state = MediaSyncState() self._want_stop = False self._on_start_stop() + (media_folder, media_db) = media_paths_from_col_path(self.mw.col.path) + + def run() -> None: + self.mw.col.backend.sync_media(hkey, media_folder, media_db, self._endpoint()) + + self.mw.taskman.run_in_background(run, self._on_finished) + + def _endpoint(self) -> str: + shard = self.mw.pm.sync_shard() if shard is not None: shard_str = str(shard) else: shard_str = "" - endpoint = f"https://sync{shard_str}ankiweb.net" - - (media_folder, media_db) = media_paths_from_col_path(self.mw.col.path) - - def run() -> None: - self.mw.col.backend.sync_media(hkey, media_folder, media_db, endpoint) - - self.mw.taskman.run_in_background(run, self._on_finished) + return f"https://sync{shard_str}.ankiweb.net/msync/" def _log_and_notify(self, entry: LogEntry) -> None: entry_with_time = LogEntryWithTime(time=intTime(), entry=entry) diff --git a/qt/aqt/profiles.py b/qt/aqt/profiles.py index a0bd15d74..11b8878d1 100644 --- a/qt/aqt/profiles.py +++ b/qt/aqt/profiles.py @@ -521,6 +521,9 @@ please see: def media_syncing_enabled(self) -> bool: return self.profile["syncMedia"] + def sync_shard(self) -> Optional[int]: + return self.profile.get("hostNum") + ###################################################################### def apply_profile_options(self) -> None: diff --git a/rslib/src/backend.rs b/rslib/src/backend.rs index b5139b6fe..fed46faae 100644 --- a/rslib/src/backend.rs +++ b/rslib/src/backend.rs @@ -299,7 +299,7 @@ impl Backend { }; let mut rt = Runtime::new().unwrap(); - rt.block_on(sync_media(&mut mgr, &input.hkey, callback)) + rt.block_on(sync_media(&mut mgr, &input.hkey, callback, &input.endpoint)) } } diff --git a/rslib/src/media/sync.rs b/rslib/src/media/sync.rs index 563df2952..973bb0bbe 100644 --- a/rslib/src/media/sync.rs +++ b/rslib/src/media/sync.rs @@ -19,13 +19,7 @@ use std::io::{Read, Write}; use std::path::Path; use std::{io, time}; -// fixme: sync url // fixme: version string -// fixme: shards - -// fixme: refactor into a struct - -static SYNC_URL: &str = "https://sync.ankiweb.net/msync/"; static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; @@ -48,13 +42,14 @@ where skey: Option, client: Client, progress_cb: P, + endpoint: &'a str, } impl

SyncContext<'_, P> where P: Fn(Progress) -> bool, { - fn new(mgr: &MediaManager, progress_cb: P) -> SyncContext

{ + fn new<'a>(mgr: &'a MediaManager, progress_cb: P, endpoint: &'a str) -> SyncContext<'a, P> { let client = Client::builder() .connect_timeout(time::Duration::from_secs(30)) .build() @@ -67,6 +62,7 @@ where skey: None, client, progress_cb, + endpoint, } } @@ -75,7 +71,7 @@ where } async fn sync_begin(&self, hkey: &str) -> Result<(String, i32)> { - let url = format!("{}/begin", SYNC_URL); + let url = format!("{}/begin", self.endpoint); let resp = self .client @@ -100,7 +96,7 @@ where loop { debug!("fetching record batch starting from usn {}", last_usn); - let batch = fetch_record_batch(&self.client, self.skey(), last_usn).await?; + let batch = self.fetch_record_batch(last_usn).await?; if batch.is_empty() { debug!("empty batch, done"); break; @@ -125,7 +121,7 @@ where .take(SYNC_MAX_FILES) .map(ToOwned::to_owned) .collect(); - let zip_data = fetch_zip(&self.client, self.skey(), batch.as_slice()).await?; + let zip_data = self.fetch_zip(batch.as_slice()).await?; let download_batch = extract_into_media_folder(self.mgr.media_folder.as_path(), zip_data)? .into_iter(); @@ -161,7 +157,7 @@ where let file_count = pending.iter().filter(|e| e.sha1.is_some()).count(); let zip_data = zip_files(&self.mgr.media_folder, &pending)?; - send_zip_data(&self.client, self.skey(), zip_data).await?; + self.send_zip_data(zip_data).await?; self.progress(Progress::Uploaded { files: file_count, @@ -177,7 +173,7 @@ where } async fn finalize_sync(&mut self) -> Result<()> { - let url = format!("{}/mediaSanity", SYNC_URL); + let url = format!("{}mediaSanity", self.endpoint); let local = self.ctx.count()?; let obj = FinalizeRequest { local }; @@ -207,14 +203,51 @@ where Err(AnkiError::Interrupted) } } + + async fn fetch_record_batch(&self, last_usn: i32) -> Result> { + let url = format!("{}mediaChanges", self.endpoint); + + let req = RecordBatchRequest { last_usn }; + let resp = ankiweb_json_request(&self.client, &url, &req, self.skey()).await?; + let res: RecordBatchResult = resp.json().await?; + + if let Some(batch) = res.data { + Ok(batch) + } else { + Err(AnkiError::AnkiWebMiscError { info: res.err }) + } + } + + async fn fetch_zip(&self, files: &[&String]) -> Result { + let url = format!("{}downloadFiles", self.endpoint); + + debug!("requesting files: {:?}", files); + + let req = ZipRequest { files }; + let resp = ankiweb_json_request(&self.client, &url, &req, self.skey()).await?; + resp.bytes().await.map_err(Into::into) + } + + async fn send_zip_data(&self, data: Vec) -> Result<()> { + let url = format!("{}uploadChanges", self.endpoint); + + ankiweb_bytes_request(&self.client, &url, data, self.skey()).await?; + + Ok(()) + } } #[allow(clippy::useless_let_if_seq)] -pub async fn sync_media(mgr: &mut MediaManager, hkey: &str, progress_cb: F) -> Result<()> +pub async fn sync_media( + mgr: &mut MediaManager, + hkey: &str, + progress_cb: F, + endpoint: &str, +) -> Result<()> where F: Fn(Progress) -> bool, { - let mut sctx = SyncContext::new(mgr, progress_cb); + let mut sctx = SyncContext::new(mgr, progress_cb, endpoint); // make sure media DB is up to date register_changes(&mut sctx.ctx, mgr.media_folder.as_path())?; @@ -432,40 +465,12 @@ async fn ankiweb_request( .map_err(rewrite_forbidden) } -async fn fetch_record_batch( - client: &Client, - skey: &str, - last_usn: i32, -) -> Result> { - let url = format!("{}/mediaChanges", SYNC_URL); - - let req = RecordBatchRequest { last_usn }; - let resp = ankiweb_json_request(client, &url, &req, skey).await?; - let res: RecordBatchResult = resp.json().await?; - - if let Some(batch) = res.data { - Ok(batch) - } else { - Err(AnkiError::AnkiWebMiscError { info: res.err }) - } -} - #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct ZipRequest<'a> { files: &'a [&'a String], } -async fn fetch_zip(client: &Client, skey: &str, files: &[&String]) -> Result { - let url = format!("{}/downloadFiles", SYNC_URL); - - debug!("requesting files: {:?}", files); - - let req = ZipRequest { files }; - let resp = ankiweb_json_request(client, &url, &req, skey).await?; - resp.bytes().await.map_err(Into::into) -} - fn extract_into_media_folder(media_folder: &Path, zip: Bytes) -> Result> { let reader = io::Cursor::new(zip); let mut zip = zip::ZipArchive::new(reader)?; @@ -614,14 +619,6 @@ fn zip_files(media_folder: &Path, files: &[MediaEntry]) -> Result> { Ok(w.into_inner()) } -async fn send_zip_data(client: &Client, skey: &str, data: Vec) -> Result<()> { - let url = format!("{}/uploadChanges", SYNC_URL); - - ankiweb_bytes_request(client, &url, data, skey).await?; - - Ok(()) -} - #[derive(Serialize)] struct FinalizeRequest { local: u32, @@ -656,7 +653,7 @@ mod test { let mut mgr = MediaManager::new(&media_dir, &media_db)?; - sync_media(&mut mgr, hkey, progress).await?; + sync_media(&mut mgr, hkey, progress, "https://sync.ankiweb.net/msync/").await?; Ok(()) }