diff --git a/rslib/src/backend/err.rs b/rslib/src/backend/err.rs new file mode 100644 index 000000000..3cf310956 --- /dev/null +++ b/rslib/src/backend/err.rs @@ -0,0 +1,70 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use crate::{ + backend_proto as pb, + err::{AnkiError, NetworkErrorKind, SyncErrorKind}, + prelude::*, +}; + +/// Convert an Anki error to a protobuf error. +pub(super) fn anki_error_to_proto_error(err: AnkiError, i18n: &I18n) -> pb::BackendError { + use pb::backend_error::Value as V; + let localized = err.localized_description(i18n); + let value = match err { + AnkiError::InvalidInput { .. } => V::InvalidInput(pb::Empty {}), + AnkiError::TemplateError { .. } => V::TemplateParse(pb::Empty {}), + AnkiError::IOError { .. } => V::IoError(pb::Empty {}), + AnkiError::DBError { .. } => V::DbError(pb::Empty {}), + AnkiError::NetworkError { kind, .. } => { + V::NetworkError(pb::NetworkError { kind: kind.into() }) + } + AnkiError::SyncError { kind, .. } => V::SyncError(pb::SyncError { kind: kind.into() }), + AnkiError::Interrupted => V::Interrupted(pb::Empty {}), + AnkiError::CollectionNotOpen => V::InvalidInput(pb::Empty {}), + AnkiError::CollectionAlreadyOpen => V::InvalidInput(pb::Empty {}), + AnkiError::JSONError { info } => V::JsonError(info), + AnkiError::ProtoError { info } => V::ProtoError(info), + AnkiError::NotFound => V::NotFoundError(pb::Empty {}), + AnkiError::Existing => V::Exists(pb::Empty {}), + AnkiError::DeckIsFiltered => V::DeckIsFiltered(pb::Empty {}), + AnkiError::SearchError(_) => V::InvalidInput(pb::Empty {}), + AnkiError::TemplateSaveError { .. } => V::TemplateParse(pb::Empty {}), + AnkiError::ParseNumError => V::InvalidInput(pb::Empty {}), + }; + + pb::BackendError { + value: Some(value), + localized, + } +} + +impl std::convert::From for i32 { + fn from(e: NetworkErrorKind) -> Self { + use pb::network_error::NetworkErrorKind as V; + (match e { + NetworkErrorKind::Offline => V::Offline, + NetworkErrorKind::Timeout => V::Timeout, + NetworkErrorKind::ProxyAuth => V::ProxyAuth, + NetworkErrorKind::Other => V::Other, + }) as i32 + } +} + +impl std::convert::From for i32 { + fn from(e: SyncErrorKind) -> Self { + use pb::sync_error::SyncErrorKind as V; + (match e { + SyncErrorKind::Conflict => V::Conflict, + SyncErrorKind::ServerError => V::ServerError, + SyncErrorKind::ClientTooOld => V::ClientTooOld, + SyncErrorKind::AuthFailed => V::AuthFailed, + SyncErrorKind::ServerMessage => V::ServerMessage, + SyncErrorKind::ResyncRequired => V::ResyncRequired, + SyncErrorKind::DatabaseCheckRequired => V::DatabaseCheckRequired, + SyncErrorKind::Other => V::Other, + SyncErrorKind::ClockIncorrect => V::ClockIncorrect, + SyncErrorKind::SyncNotStarted => V::SyncNotStarted, + }) as i32 + } +} diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index ac3d669e8..c8ee24a57 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -5,11 +5,13 @@ mod adding; mod card; mod config; mod dbproxy; +mod err; mod generic; mod http_sync_server; mod progress; mod scheduler; mod search; +mod sync; pub use crate::backend_proto::BackendMethod; use crate::{ @@ -23,7 +25,7 @@ use crate::{ collection::{open_collection, Collection}, deckconf::{DeckConf, DeckConfSchema11}, decks::{Deck, DeckID, DeckSchema11}, - err::{AnkiError, NetworkErrorKind, Result, SyncErrorKind}, + err::{AnkiError, Result}, i18n::I18n, latex::{extract_latex, extract_latex_expanding_clozes, ExtractedLatex}, log, @@ -43,24 +45,20 @@ use crate::{ }, search::{concatenate_searches, replace_search_node, write_nodes, Node}, stats::studied_today, - sync::{ - get_remote_sync_meta, http::SyncRequest, sync_abort, sync_login, FullSyncProgress, - LocalServer, NormalSyncProgress, SyncActionRequired, SyncAuth, SyncMeta, SyncOutput, - }, + sync::{http::SyncRequest, LocalServer}, template::RenderedNode, text::{extract_av_tags, sanitize_html_no_images, strip_av_tags, AVTag}, timestamp::TimestampSecs, undo::UndoableOpKind, }; use fluent::FluentValue; -use futures::future::{AbortHandle, AbortRegistration, Abortable}; +use futures::future::AbortHandle; use log::error; use once_cell::sync::OnceCell; -use pb::{sync_status_out, BackendService}; +use pb::BackendService; use progress::{AbortHandleSlot, Progress}; use prost::Message; use serde_json::Value as JsonValue; -use slog::warn; use std::{collections::HashSet, convert::TryInto}; use std::{ result, @@ -68,7 +66,11 @@ use std::{ }; use tokio::runtime::{self, Runtime}; -use self::progress::{progress_to_proto, ProgressState}; +use self::{ + err::anki_error_to_proto_error, + progress::{progress_to_proto, ProgressState}, + sync::RemoteSyncStatus, +}; pub struct Backend { col: Arc>>, @@ -89,81 +91,6 @@ struct BackendState { http_sync_server: Option, } -#[derive(Default, Debug)] -pub(crate) struct RemoteSyncStatus { - last_check: TimestampSecs, - last_response: sync_status_out::Required, -} - -impl RemoteSyncStatus { - fn update(&mut self, required: sync_status_out::Required) { - self.last_check = TimestampSecs::now(); - self.last_response = required - } -} - -/// Convert an Anki error to a protobuf error. -fn anki_error_to_proto_error(err: AnkiError, i18n: &I18n) -> pb::BackendError { - use pb::backend_error::Value as V; - let localized = err.localized_description(i18n); - let value = match err { - AnkiError::InvalidInput { .. } => V::InvalidInput(pb::Empty {}), - AnkiError::TemplateError { .. } => V::TemplateParse(pb::Empty {}), - AnkiError::IOError { .. } => V::IoError(pb::Empty {}), - AnkiError::DBError { .. } => V::DbError(pb::Empty {}), - AnkiError::NetworkError { kind, .. } => { - V::NetworkError(pb::NetworkError { kind: kind.into() }) - } - AnkiError::SyncError { kind, .. } => V::SyncError(pb::SyncError { kind: kind.into() }), - AnkiError::Interrupted => V::Interrupted(Empty {}), - AnkiError::CollectionNotOpen => V::InvalidInput(pb::Empty {}), - AnkiError::CollectionAlreadyOpen => V::InvalidInput(pb::Empty {}), - AnkiError::JSONError { info } => V::JsonError(info), - AnkiError::ProtoError { info } => V::ProtoError(info), - AnkiError::NotFound => V::NotFoundError(Empty {}), - AnkiError::Existing => V::Exists(Empty {}), - AnkiError::DeckIsFiltered => V::DeckIsFiltered(Empty {}), - AnkiError::SearchError(_) => V::InvalidInput(pb::Empty {}), - AnkiError::TemplateSaveError { .. } => V::TemplateParse(pb::Empty {}), - AnkiError::ParseNumError => V::InvalidInput(pb::Empty {}), - }; - - pb::BackendError { - value: Some(value), - localized, - } -} - -impl std::convert::From for i32 { - fn from(e: NetworkErrorKind) -> Self { - use pb::network_error::NetworkErrorKind as V; - (match e { - NetworkErrorKind::Offline => V::Offline, - NetworkErrorKind::Timeout => V::Timeout, - NetworkErrorKind::ProxyAuth => V::ProxyAuth, - NetworkErrorKind::Other => V::Other, - }) as i32 - } -} - -impl std::convert::From for i32 { - fn from(e: SyncErrorKind) -> Self { - use pb::sync_error::SyncErrorKind as V; - (match e { - SyncErrorKind::Conflict => V::Conflict, - SyncErrorKind::ServerError => V::ServerError, - SyncErrorKind::ClientTooOld => V::ClientTooOld, - SyncErrorKind::AuthFailed => V::AuthFailed, - SyncErrorKind::ServerMessage => V::ServerMessage, - SyncErrorKind::ResyncRequired => V::ResyncRequired, - SyncErrorKind::DatabaseCheckRequired => V::DatabaseCheckRequired, - SyncErrorKind::Other => V::Other, - SyncErrorKind::ClockIncorrect => V::ClockIncorrect, - SyncErrorKind::SyncNotStarted => V::SyncNotStarted, - }) as i32 - } -} - pub fn init_backend(init_msg: &[u8]) -> std::result::Result { let input: pb::BackendInit = match pb::BackendInit::decode(init_msg) { Ok(req) => req, @@ -1456,243 +1383,6 @@ impl Backend { .clone() } - fn sync_abort_handle( - &self, - ) -> BackendResult<( - scopeguard::ScopeGuard, - AbortRegistration, - )> { - let (abort_handle, abort_reg) = AbortHandle::new_pair(); - - // Register the new abort_handle. - let old_handle = self.sync_abort.lock().unwrap().replace(abort_handle); - if old_handle.is_some() { - // NOTE: In the future we would ideally be able to handle multiple - // abort handles by just iterating over them all in - // abort_sync). But for now, just log a warning if there was - // already one present -- but don't abort it either. - let log = self.with_col(|col| Ok(col.log.clone()))?; - warn!( - log, - "new sync_abort handle registered, but old one was still present (old sync job might not be cancelled on abort)" - ); - } - // Clear the abort handle after the caller is done and drops the guard. - let guard = scopeguard::guard(Arc::clone(&self.sync_abort), |sync_abort| { - sync_abort.lock().unwrap().take(); - }); - Ok((guard, abort_reg)) - } - - fn sync_media_inner(&self, input: pb::SyncAuth) -> Result<()> { - // mark media sync as active - let (abort_handle, abort_reg) = AbortHandle::new_pair(); - { - let mut guard = self.state.lock().unwrap(); - if guard.media_sync_abort.is_some() { - // media sync is already active - return Ok(()); - } else { - guard.media_sync_abort = Some(abort_handle); - } - } - - // get required info from collection - let mut guard = self.col.lock().unwrap(); - let col = guard.as_mut().unwrap(); - let folder = col.media_folder.clone(); - let db = col.media_db.clone(); - let log = col.log.clone(); - drop(guard); - - // start the sync - let mut handler = self.new_progress_handler(); - let progress_fn = move |progress| handler.update(progress, true); - - let mgr = MediaManager::new(&folder, &db)?; - let rt = self.runtime_handle(); - let sync_fut = mgr.sync_media(progress_fn, input.host_number, &input.hkey, log); - let abortable_sync = Abortable::new(sync_fut, abort_reg); - let result = rt.block_on(abortable_sync); - - // mark inactive - self.state.lock().unwrap().media_sync_abort.take(); - - // return result - match result { - Ok(sync_result) => sync_result, - Err(_) => { - // aborted sync - Err(AnkiError::Interrupted) - } - } - } - - /// Abort the media sync. Won't return until aborted. - fn abort_media_sync_and_wait(&self) { - let guard = self.state.lock().unwrap(); - if let Some(handle) = &guard.media_sync_abort { - handle.abort(); - self.progress_state.lock().unwrap().want_abort = true; - } - drop(guard); - - // block until it aborts - while self.state.lock().unwrap().media_sync_abort.is_some() { - std::thread::sleep(std::time::Duration::from_millis(100)); - self.progress_state.lock().unwrap().want_abort = true; - } - } - - fn sync_login_inner(&self, input: pb::SyncLoginIn) -> BackendResult { - let (_guard, abort_reg) = self.sync_abort_handle()?; - - let rt = self.runtime_handle(); - let sync_fut = sync_login(&input.username, &input.password); - let abortable_sync = Abortable::new(sync_fut, abort_reg); - let ret = match rt.block_on(abortable_sync) { - Ok(sync_result) => sync_result, - Err(_) => Err(AnkiError::Interrupted), - }; - ret.map(|a| pb::SyncAuth { - hkey: a.hkey, - host_number: a.host_number, - }) - } - - fn sync_status_inner(&self, input: pb::SyncAuth) -> BackendResult { - // any local changes mean we can skip the network round-trip - let req = self.with_col(|col| col.get_local_sync_status())?; - if req != pb::sync_status_out::Required::NoChanges { - return Ok(req.into()); - } - - // return cached server response if only a short time has elapsed - { - let guard = self.state.lock().unwrap(); - if guard.remote_sync_status.last_check.elapsed_secs() < 300 { - return Ok(guard.remote_sync_status.last_response.into()); - } - } - - // fetch and cache result - let rt = self.runtime_handle(); - let time_at_check_begin = TimestampSecs::now(); - let remote: SyncMeta = rt.block_on(get_remote_sync_meta(input.into()))?; - let response = self.with_col(|col| col.get_sync_status(remote).map(Into::into))?; - - { - let mut guard = self.state.lock().unwrap(); - // On startup, the sync status check will block on network access, and then automatic syncing begins, - // taking hold of the mutex. By the time we reach here, our network status may be out of date, - // so we discard it if stale. - if guard.remote_sync_status.last_check < time_at_check_begin { - guard.remote_sync_status.last_check = time_at_check_begin; - guard.remote_sync_status.last_response = response; - } - } - - Ok(response.into()) - } - - fn sync_collection_inner(&self, input: pb::SyncAuth) -> BackendResult { - let (_guard, abort_reg) = self.sync_abort_handle()?; - - let rt = self.runtime_handle(); - let input_copy = input.clone(); - - let ret = self.with_col(|col| { - let mut handler = self.new_progress_handler(); - let progress_fn = move |progress: NormalSyncProgress, throttle: bool| { - handler.update(progress, throttle); - }; - - let sync_fut = col.normal_sync(input.into(), progress_fn); - let abortable_sync = Abortable::new(sync_fut, abort_reg); - - match rt.block_on(abortable_sync) { - Ok(sync_result) => sync_result, - Err(_) => { - // if the user aborted, we'll need to clean up the transaction - col.storage.rollback_trx()?; - // and tell AnkiWeb to clean up - let _handle = std::thread::spawn(move || { - let _ = rt.block_on(sync_abort(input_copy.hkey, input_copy.host_number)); - }); - - Err(AnkiError::Interrupted) - } - } - }); - - let output: SyncOutput = ret?; - self.state - .lock() - .unwrap() - .remote_sync_status - .update(output.required.into()); - Ok(output.into()) - } - - fn full_sync_inner(&self, input: pb::SyncAuth, upload: bool) -> Result<()> { - self.abort_media_sync_and_wait(); - - let rt = self.runtime_handle(); - - let mut col = self.col.lock().unwrap(); - if col.is_none() { - return Err(AnkiError::CollectionNotOpen); - } - - let col_inner = col.take().unwrap(); - - let (_guard, abort_reg) = self.sync_abort_handle()?; - - let col_path = col_inner.col_path.clone(); - let media_folder_path = col_inner.media_folder.clone(); - let media_db_path = col_inner.media_db.clone(); - let logger = col_inner.log.clone(); - - let mut handler = self.new_progress_handler(); - let progress_fn = move |progress: FullSyncProgress, throttle: bool| { - handler.update(progress, throttle); - }; - - let result = if upload { - let sync_fut = col_inner.full_upload(input.into(), Box::new(progress_fn)); - let abortable_sync = Abortable::new(sync_fut, abort_reg); - rt.block_on(abortable_sync) - } else { - let sync_fut = col_inner.full_download(input.into(), Box::new(progress_fn)); - let abortable_sync = Abortable::new(sync_fut, abort_reg); - rt.block_on(abortable_sync) - }; - - // ensure re-opened regardless of outcome - col.replace(open_collection( - col_path, - media_folder_path, - media_db_path, - self.server, - self.i18n.clone(), - logger, - )?); - - match result { - Ok(sync_result) => { - if sync_result.is_ok() { - self.state - .lock() - .unwrap() - .remote_sync_status - .update(sync_status_out::Required::NoChanges); - } - sync_result - } - Err(_) => Err(AnkiError::Interrupted), - } - } - pub fn db_command(&self, input: &[u8]) -> Result> { self.with_col(|col| db_command_bytes(col, input)) } @@ -1788,41 +1478,3 @@ impl From for pb::SchedTimingTodayOu } } } - -impl From for pb::SyncCollectionOut { - fn from(o: SyncOutput) -> Self { - pb::SyncCollectionOut { - host_number: o.host_number, - server_message: o.server_message, - required: match o.required { - SyncActionRequired::NoChanges => { - pb::sync_collection_out::ChangesRequired::NoChanges as i32 - } - SyncActionRequired::FullSyncRequired { - upload_ok, - download_ok, - } => { - if !upload_ok { - pb::sync_collection_out::ChangesRequired::FullDownload as i32 - } else if !download_ok { - pb::sync_collection_out::ChangesRequired::FullUpload as i32 - } else { - pb::sync_collection_out::ChangesRequired::FullSync as i32 - } - } - SyncActionRequired::NormalSyncRequired => { - pb::sync_collection_out::ChangesRequired::NormalSync as i32 - } - }, - } - } -} - -impl From for SyncAuth { - fn from(a: pb::SyncAuth) -> Self { - SyncAuth { - hkey: a.hkey, - host_number: a.host_number, - } - } -} diff --git a/rslib/src/backend/sync.rs b/rslib/src/backend/sync.rs new file mode 100644 index 000000000..17c1fd5b0 --- /dev/null +++ b/rslib/src/backend/sync.rs @@ -0,0 +1,313 @@ +// Copyright: Ankitects Pty Ltd and contributors +// License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html + +use std::sync::Arc; + +use futures::future::{AbortHandle, AbortRegistration, Abortable}; +use slog::warn; + +use crate::{ + backend_proto as pb, + collection::open_collection, + media::MediaManager, + prelude::*, + sync::{ + get_remote_sync_meta, sync_abort, sync_login, FullSyncProgress, NormalSyncProgress, + SyncActionRequired, SyncAuth, SyncMeta, SyncOutput, + }, +}; + +use super::{progress::AbortHandleSlot, Backend}; + +#[derive(Default, Debug)] +pub(super) struct RemoteSyncStatus { + pub last_check: TimestampSecs, + pub last_response: pb::sync_status_out::Required, +} + +impl RemoteSyncStatus { + pub(super) fn update(&mut self, required: pb::sync_status_out::Required) { + self.last_check = TimestampSecs::now(); + self.last_response = required + } +} + +impl From for pb::SyncCollectionOut { + fn from(o: SyncOutput) -> Self { + pb::SyncCollectionOut { + host_number: o.host_number, + server_message: o.server_message, + required: match o.required { + SyncActionRequired::NoChanges => { + pb::sync_collection_out::ChangesRequired::NoChanges as i32 + } + SyncActionRequired::FullSyncRequired { + upload_ok, + download_ok, + } => { + if !upload_ok { + pb::sync_collection_out::ChangesRequired::FullDownload as i32 + } else if !download_ok { + pb::sync_collection_out::ChangesRequired::FullUpload as i32 + } else { + pb::sync_collection_out::ChangesRequired::FullSync as i32 + } + } + SyncActionRequired::NormalSyncRequired => { + pb::sync_collection_out::ChangesRequired::NormalSync as i32 + } + }, + } + } +} + +impl From for SyncAuth { + fn from(a: pb::SyncAuth) -> Self { + SyncAuth { + hkey: a.hkey, + host_number: a.host_number, + } + } +} + +impl Backend { + fn sync_abort_handle( + &self, + ) -> Result<( + scopeguard::ScopeGuard, + AbortRegistration, + )> { + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + + // Register the new abort_handle. + let old_handle = self.sync_abort.lock().unwrap().replace(abort_handle); + if old_handle.is_some() { + // NOTE: In the future we would ideally be able to handle multiple + // abort handles by just iterating over them all in + // abort_sync). But for now, just log a warning if there was + // already one present -- but don't abort it either. + let log = self.with_col(|col| Ok(col.log.clone()))?; + warn!( + log, + "new sync_abort handle registered, but old one was still present (old sync job might not be cancelled on abort)" + ); + } + // Clear the abort handle after the caller is done and drops the guard. + let guard = scopeguard::guard(Arc::clone(&self.sync_abort), |sync_abort| { + sync_abort.lock().unwrap().take(); + }); + Ok((guard, abort_reg)) + } + + pub(super) fn sync_media_inner(&self, input: pb::SyncAuth) -> Result<()> { + // mark media sync as active + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + { + let mut guard = self.state.lock().unwrap(); + if guard.media_sync_abort.is_some() { + // media sync is already active + return Ok(()); + } else { + guard.media_sync_abort = Some(abort_handle); + } + } + + // get required info from collection + let mut guard = self.col.lock().unwrap(); + let col = guard.as_mut().unwrap(); + let folder = col.media_folder.clone(); + let db = col.media_db.clone(); + let log = col.log.clone(); + drop(guard); + + // start the sync + let mut handler = self.new_progress_handler(); + let progress_fn = move |progress| handler.update(progress, true); + + let mgr = MediaManager::new(&folder, &db)?; + let rt = self.runtime_handle(); + let sync_fut = mgr.sync_media(progress_fn, input.host_number, &input.hkey, log); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + let result = rt.block_on(abortable_sync); + + // mark inactive + self.state.lock().unwrap().media_sync_abort.take(); + + // return result + match result { + Ok(sync_result) => sync_result, + Err(_) => { + // aborted sync + Err(AnkiError::Interrupted) + } + } + } + + /// Abort the media sync. Won't return until aborted. + pub(super) fn abort_media_sync_and_wait(&self) { + let guard = self.state.lock().unwrap(); + if let Some(handle) = &guard.media_sync_abort { + handle.abort(); + self.progress_state.lock().unwrap().want_abort = true; + } + drop(guard); + + // block until it aborts + while self.state.lock().unwrap().media_sync_abort.is_some() { + std::thread::sleep(std::time::Duration::from_millis(100)); + self.progress_state.lock().unwrap().want_abort = true; + } + } + + pub(super) fn sync_login_inner(&self, input: pb::SyncLoginIn) -> Result { + let (_guard, abort_reg) = self.sync_abort_handle()?; + + let rt = self.runtime_handle(); + let sync_fut = sync_login(&input.username, &input.password); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + let ret = match rt.block_on(abortable_sync) { + Ok(sync_result) => sync_result, + Err(_) => Err(AnkiError::Interrupted), + }; + ret.map(|a| pb::SyncAuth { + hkey: a.hkey, + host_number: a.host_number, + }) + } + + pub(super) fn sync_status_inner(&self, input: pb::SyncAuth) -> Result { + // any local changes mean we can skip the network round-trip + let req = self.with_col(|col| col.get_local_sync_status())?; + if req != pb::sync_status_out::Required::NoChanges { + return Ok(req.into()); + } + + // return cached server response if only a short time has elapsed + { + let guard = self.state.lock().unwrap(); + if guard.remote_sync_status.last_check.elapsed_secs() < 300 { + return Ok(guard.remote_sync_status.last_response.into()); + } + } + + // fetch and cache result + let rt = self.runtime_handle(); + let time_at_check_begin = TimestampSecs::now(); + let remote: SyncMeta = rt.block_on(get_remote_sync_meta(input.into()))?; + let response = self.with_col(|col| col.get_sync_status(remote).map(Into::into))?; + + { + let mut guard = self.state.lock().unwrap(); + // On startup, the sync status check will block on network access, and then automatic syncing begins, + // taking hold of the mutex. By the time we reach here, our network status may be out of date, + // so we discard it if stale. + if guard.remote_sync_status.last_check < time_at_check_begin { + guard.remote_sync_status.last_check = time_at_check_begin; + guard.remote_sync_status.last_response = response; + } + } + + Ok(response.into()) + } + + pub(super) fn sync_collection_inner( + &self, + input: pb::SyncAuth, + ) -> Result { + let (_guard, abort_reg) = self.sync_abort_handle()?; + + let rt = self.runtime_handle(); + let input_copy = input.clone(); + + let ret = self.with_col(|col| { + let mut handler = self.new_progress_handler(); + let progress_fn = move |progress: NormalSyncProgress, throttle: bool| { + handler.update(progress, throttle); + }; + + let sync_fut = col.normal_sync(input.into(), progress_fn); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + + match rt.block_on(abortable_sync) { + Ok(sync_result) => sync_result, + Err(_) => { + // if the user aborted, we'll need to clean up the transaction + col.storage.rollback_trx()?; + // and tell AnkiWeb to clean up + let _handle = std::thread::spawn(move || { + let _ = rt.block_on(sync_abort(input_copy.hkey, input_copy.host_number)); + }); + + Err(AnkiError::Interrupted) + } + } + }); + + let output: SyncOutput = ret?; + self.state + .lock() + .unwrap() + .remote_sync_status + .update(output.required.into()); + Ok(output.into()) + } + + pub(super) fn full_sync_inner(&self, input: pb::SyncAuth, upload: bool) -> Result<()> { + self.abort_media_sync_and_wait(); + + let rt = self.runtime_handle(); + + let mut col = self.col.lock().unwrap(); + if col.is_none() { + return Err(AnkiError::CollectionNotOpen); + } + + let col_inner = col.take().unwrap(); + + let (_guard, abort_reg) = self.sync_abort_handle()?; + + let col_path = col_inner.col_path.clone(); + let media_folder_path = col_inner.media_folder.clone(); + let media_db_path = col_inner.media_db.clone(); + let logger = col_inner.log.clone(); + + let mut handler = self.new_progress_handler(); + let progress_fn = move |progress: FullSyncProgress, throttle: bool| { + handler.update(progress, throttle); + }; + + let result = if upload { + let sync_fut = col_inner.full_upload(input.into(), Box::new(progress_fn)); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + rt.block_on(abortable_sync) + } else { + let sync_fut = col_inner.full_download(input.into(), Box::new(progress_fn)); + let abortable_sync = Abortable::new(sync_fut, abort_reg); + rt.block_on(abortable_sync) + }; + + // ensure re-opened regardless of outcome + col.replace(open_collection( + col_path, + media_folder_path, + media_db_path, + self.server, + self.i18n.clone(), + logger, + )?); + + match result { + Ok(sync_result) => { + if sync_result.is_ok() { + self.state + .lock() + .unwrap() + .remote_sync_status + .update(pb::sync_status_out::Required::NoChanges); + } + sync_result + } + Err(_) => Err(AnkiError::Interrupted), + } + } +} diff --git a/rslib/src/prelude.rs b/rslib/src/prelude.rs index c44c7e73a..2e0d589ac 100644 --- a/rslib/src/prelude.rs +++ b/rslib/src/prelude.rs @@ -8,7 +8,7 @@ pub use crate::{ deckconf::{DeckConf, DeckConfID}, decks::{Deck, DeckID, DeckKind}, err::{AnkiError, Result}, - i18n::{tr_args, tr_strs, TR}, + i18n::{tr_args, tr_strs, I18n, TR}, notes::{Note, NoteID}, notetype::{NoteType, NoteTypeID}, revlog::RevlogID,