From 312393e82560f845dfd361922a103f988b88cfbf Mon Sep 17 00:00:00 2001 From: Aleksa Sarai Date: Fri, 4 Sep 2020 16:20:39 +1000 Subject: [PATCH] rs: remove most &mut references in BackendService The previous implementation had some slightly questionable memory safety properties (older versions of PyO3 didn't uphold the Rust aliasing rules and would thus create multiple &mut references to #[pyclass] objects). This explains why Backend has internal Mutexs even though all of its methods took &mut self. The solution is to simply make all methods take &self, which luckily doesn't pose too make issues -- most of the code inside Backend already has sufficient locking. The only two things which needed to be explicitly handled where: 1. "self.runtime" which was fairly easy to handle. All usages of the Runtime only require an immutable reference to create a new Handle, so we could switch to OnceCell which provides lazy-initialisation semantics without needing a more heavy-handed Mutex. 2. "self.sync_abort" was simply wrapped in a Mutex<>, though some of the odd semantics of sync_abort (not being able to handle multiple processes synchronising at the same time) become pretty obvious with this change (for now we just log a warning in that case). In addition, switch to an RAII-style guard to make sure we don't forget to clear the abort_handle. As a result, we now no longer break Rust's aliasing rules and we can build with newer versions of PyO3 which have runtime checks for these things (and build on stable Rust). Signed-off-by: Aleksa Sarai --- rslib/Cargo.toml | 2 + rslib/build.rs | 4 +- rslib/src/backend/mod.rs | 311 +++++++++++++++++++-------------------- rspy/src/lib.rs | 8 +- 4 files changed, 163 insertions(+), 162 deletions(-) diff --git a/rslib/Cargo.toml b/rslib/Cargo.toml index fc3a3f437..4519dac2a 100644 --- a/rslib/Cargo.toml +++ b/rslib/Cargo.toml @@ -52,6 +52,8 @@ pin-project = "0.4.22" async-compression = { version = "0.3.5", features = ["stream", "gzip"] } askama = "0.10.1" hyper = "0.13.7" +once_cell = "1.4.1" +scopeguard = "1.1.0" [target.'cfg(target_vendor="apple")'.dependencies.rusqlite] version = "0.23.1" diff --git a/rslib/build.rs b/rslib/build.rs index 5c044fcc6..acc1ed09a 100644 --- a/rslib/build.rs +++ b/rslib/build.rs @@ -116,7 +116,7 @@ fn write_method_trait(buf: &mut String, service: &prost_build::Service) { use prost::Message; pub type BackendResult = std::result::Result; pub trait BackendService { - fn run_command_bytes2_inner(&mut self, method: u32, input: &[u8]) -> std::result::Result, crate::err::AnkiError> { + fn run_command_bytes2_inner(&self, method: u32, input: &[u8]) -> std::result::Result, crate::err::AnkiError> { match method { "#, ); @@ -146,7 +146,7 @@ pub trait BackendService { write!( buf, concat!( - " fn {method_name}(&mut self, input: {input_type}) -> ", + " fn {method_name}(&self, input: {input_type}) -> ", "BackendResult<{output_type}>;\n" ), method_name = method.name, diff --git a/rslib/src/backend/mod.rs b/rslib/src/backend/mod.rs index 3741bf93c..698d06135 100644 --- a/rslib/src/backend/mod.rs +++ b/rslib/src/backend/mod.rs @@ -44,11 +44,13 @@ use crate::{ types::Usn, }; use fluent::FluentValue; -use futures::future::{AbortHandle, Abortable}; +use futures::future::{AbortHandle, AbortRegistration, Abortable}; use log::error; +use once_cell::sync::OnceCell; use pb::{sync_status_out, BackendService}; use prost::Message; use serde_json::Value as JsonValue; +use slog::warn; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::{ @@ -85,13 +87,16 @@ struct ProgressState { last_progress: Option, } +// fixme: this should support multiple abort handles. +type AbortHandleSlot = Arc>>; + pub struct Backend { col: Arc>>, i18n: I18n, server: bool, - sync_abort: Option, + sync_abort: AbortHandleSlot, progress_state: Arc>, - runtime: Option, + runtime: OnceCell, state: Arc>, } @@ -267,12 +272,12 @@ impl From for DeckConfID { } impl BackendService for Backend { - fn latest_progress(&mut self, _input: Empty) -> BackendResult { + fn latest_progress(&self, _input: Empty) -> BackendResult { let progress = self.progress_state.lock().unwrap().last_progress; Ok(progress_to_proto(progress, &self.i18n)) } - fn set_wants_abort(&mut self, _input: Empty) -> BackendResult { + fn set_wants_abort(&self, _input: Empty) -> BackendResult { self.progress_state.lock().unwrap().want_abort = true; Ok(().into()) } @@ -280,7 +285,7 @@ impl BackendService for Backend { // card rendering fn render_existing_card( - &mut self, + &self, input: pb::RenderExistingCardIn, ) -> BackendResult { self.with_col(|col| { @@ -290,7 +295,7 @@ impl BackendService for Backend { } fn render_uncommitted_card( - &mut self, + &self, input: pb::RenderUncommittedCardIn, ) -> BackendResult { let schema11: CardTemplateSchema11 = serde_json::from_slice(&input.template)?; @@ -307,7 +312,7 @@ impl BackendService for Backend { }) } - fn get_empty_cards(&mut self, _input: pb::Empty) -> Result { + fn get_empty_cards(&self, _input: pb::Empty) -> Result { self.with_col(|col| { let mut empty = col.empty_cards()?; let report = col.empty_cards_report(&mut empty)?; @@ -327,16 +332,13 @@ impl BackendService for Backend { }) } - fn strip_av_tags(&mut self, input: pb::String) -> BackendResult { + fn strip_av_tags(&self, input: pb::String) -> BackendResult { Ok(pb::String { val: strip_av_tags(&input.val).into(), }) } - fn extract_av_tags( - &mut self, - input: pb::ExtractAvTagsIn, - ) -> BackendResult { + fn extract_av_tags(&self, input: pb::ExtractAvTagsIn) -> BackendResult { let (text, tags) = extract_av_tags(&input.text, input.question_side); let pt_tags = tags .into_iter() @@ -368,7 +370,7 @@ impl BackendService for Backend { }) } - fn extract_latex(&mut self, input: pb::ExtractLatexIn) -> BackendResult { + fn extract_latex(&self, input: pb::ExtractLatexIn) -> BackendResult { let func = if input.expand_clozes { extract_latex_expanding_clozes } else { @@ -391,7 +393,7 @@ impl BackendService for Backend { // searching //----------------------------------------------- - fn search_cards(&mut self, input: pb::SearchCardsIn) -> Result { + fn search_cards(&self, input: pb::SearchCardsIn) -> Result { self.with_col(|col| { let order = if let Some(order) = input.order { use pb::sort_order::Value as V; @@ -415,7 +417,7 @@ impl BackendService for Backend { }) } - fn search_notes(&mut self, input: pb::SearchNotesIn) -> Result { + fn search_notes(&self, input: pb::SearchNotesIn) -> Result { self.with_col(|col| { let nids = col.search_notes(&input.search)?; Ok(pb::SearchNotesOut { @@ -424,7 +426,7 @@ impl BackendService for Backend { }) } - fn find_and_replace(&mut self, input: pb::FindAndReplaceIn) -> BackendResult { + fn find_and_replace(&self, input: pb::FindAndReplaceIn) -> BackendResult { let mut search = if input.regex { input.search } else { @@ -451,7 +453,7 @@ impl BackendService for Backend { /// This behaves like _updateCutoff() in older code - it also unburies at the start of /// a new day. - fn sched_timing_today(&mut self, _input: pb::Empty) -> Result { + fn sched_timing_today(&self, _input: pb::Empty) -> Result { self.with_col(|col| { let timing = col.timing_today()?; col.unbury_if_day_rolled_over(timing)?; @@ -459,13 +461,13 @@ impl BackendService for Backend { }) } - fn local_minutes_west(&mut self, input: pb::Int64) -> BackendResult { + fn local_minutes_west(&self, input: pb::Int64) -> BackendResult { Ok(pb::Int32 { val: local_minutes_west_for_stamp(input.val), }) } - fn set_local_minutes_west(&mut self, input: pb::Int32) -> BackendResult { + fn set_local_minutes_west(&self, input: pb::Int32) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { col.set_local_mins_west(input.val).map(Into::into) @@ -474,19 +476,16 @@ impl BackendService for Backend { } /// Fetch data from DB and return rendered string. - fn studied_today(&mut self, _input: pb::Empty) -> BackendResult { + fn studied_today(&self, _input: pb::Empty) -> BackendResult { self.with_col(|col| col.studied_today().map(Into::into)) } /// Message rendering only, for old graphs. - fn studied_today_message( - &mut self, - input: pb::StudiedTodayMessageIn, - ) -> BackendResult { + fn studied_today_message(&self, input: pb::StudiedTodayMessageIn) -> BackendResult { Ok(studied_today(input.cards, input.seconds as f32, &self.i18n).into()) } - fn update_stats(&mut self, input: pb::UpdateStatsIn) -> BackendResult { + fn update_stats(&self, input: pb::UpdateStatsIn) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { let today = col.current_due_day(0)?; @@ -496,7 +495,7 @@ impl BackendService for Backend { }) } - fn extend_limits(&mut self, input: pb::ExtendLimitsIn) -> BackendResult { + fn extend_limits(&self, input: pb::ExtendLimitsIn) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { let today = col.current_due_day(0)?; @@ -513,18 +512,15 @@ impl BackendService for Backend { }) } - fn counts_for_deck_today( - &mut self, - input: pb::DeckId, - ) -> BackendResult { + fn counts_for_deck_today(&self, input: pb::DeckId) -> BackendResult { self.with_col(|col| col.counts_for_deck_today(input.did.into())) } - fn congrats_info(&mut self, _input: Empty) -> BackendResult { + fn congrats_info(&self, _input: Empty) -> BackendResult { self.with_col(|col| col.congrats_info()) } - fn restore_buried_and_suspended_cards(&mut self, input: pb::CardIDs) -> BackendResult { + fn restore_buried_and_suspended_cards(&self, input: pb::CardIDs) -> BackendResult { self.with_col(|col| { col.unbury_or_unsuspend_cards(&input.into_native()) .map(Into::into) @@ -532,7 +528,7 @@ impl BackendService for Backend { } fn unbury_cards_in_current_deck( - &mut self, + &self, input: pb::UnburyCardsInCurrentDeckIn, ) -> BackendResult { self.with_col(|col| { @@ -541,7 +537,7 @@ impl BackendService for Backend { }) } - fn bury_or_suspend_cards(&mut self, input: pb::BuryOrSuspendCardsIn) -> BackendResult { + fn bury_or_suspend_cards(&self, input: pb::BuryOrSuspendCardsIn) -> BackendResult { self.with_col(|col| { let mode = input.mode(); let cids: Vec<_> = input.card_ids.into_iter().map(CardID).collect(); @@ -549,16 +545,16 @@ impl BackendService for Backend { }) } - fn empty_filtered_deck(&mut self, input: pb::DeckId) -> BackendResult { + fn empty_filtered_deck(&self, input: pb::DeckId) -> BackendResult { self.with_col(|col| col.empty_filtered_deck(input.did.into()).map(Into::into)) } - fn rebuild_filtered_deck(&mut self, input: pb::DeckId) -> BackendResult { + fn rebuild_filtered_deck(&self, input: pb::DeckId) -> BackendResult { self.with_col(|col| col.rebuild_filtered_deck(input.did.into()).map(Into::into)) } fn schedule_cards_as_reviews( - &mut self, + &self, input: pb::ScheduleCardsAsReviewsIn, ) -> BackendResult { let cids: Vec<_> = input.card_ids.into_iter().map(CardID).collect(); @@ -569,14 +565,14 @@ impl BackendService for Backend { }) } - fn schedule_cards_as_new(&mut self, input: pb::CardIDs) -> BackendResult { + fn schedule_cards_as_new(&self, input: pb::CardIDs) -> BackendResult { self.with_col(|col| { col.reschedule_cards_as_new(&input.into_native()) .map(Into::into) }) } - fn sort_cards(&mut self, input: pb::SortCardsIn) -> BackendResult { + fn sort_cards(&self, input: pb::SortCardsIn) -> BackendResult { let cids: Vec<_> = input.card_ids.into_iter().map(CardID).collect(); let (start, step, random, shift) = ( input.starting_from, @@ -590,7 +586,7 @@ impl BackendService for Backend { }) } - fn sort_deck(&mut self, input: pb::SortDeckIn) -> BackendResult { + fn sort_deck(&self, input: pb::SortDeckIn) -> BackendResult { self.with_col(|col| { col.sort_deck(input.deck_id.into(), input.randomize) .map(Into::into) @@ -600,19 +596,19 @@ impl BackendService for Backend { // statistics //----------------------------------------------- - fn card_stats(&mut self, input: pb::CardId) -> BackendResult { + fn card_stats(&self, input: pb::CardId) -> BackendResult { self.with_col(|col| col.card_stats(input.into())) .map(Into::into) } - fn graphs(&mut self, input: pb::GraphsIn) -> BackendResult { + fn graphs(&self, input: pb::GraphsIn) -> BackendResult { self.with_col(|col| col.graph_data_for_search(&input.search, input.days)) } // decks //----------------------------------------------- - fn deck_tree(&mut self, input: pb::DeckTreeIn) -> Result { + fn deck_tree(&self, input: pb::DeckTreeIn) -> Result { let lim = if input.top_deck_id > 0 { Some(DeckID(input.top_deck_id)) } else { @@ -628,7 +624,7 @@ impl BackendService for Backend { }) } - fn deck_tree_legacy(&mut self, _input: pb::Empty) -> BackendResult { + fn deck_tree_legacy(&self, _input: pb::Empty) -> BackendResult { self.with_col(|col| { let tree = col.legacy_deck_tree()?; serde_json::to_vec(&tree) @@ -637,7 +633,7 @@ impl BackendService for Backend { }) } - fn get_deck_legacy(&mut self, input: pb::DeckId) -> Result { + fn get_deck_legacy(&self, input: pb::DeckId) -> Result { self.with_col(|col| { let deck: DeckSchema11 = col .storage @@ -650,7 +646,7 @@ impl BackendService for Backend { }) } - fn get_deck_id_by_name(&mut self, input: pb::String) -> Result { + fn get_deck_id_by_name(&self, input: pb::String) -> Result { self.with_col(|col| { col.get_deck_id(&input.val).and_then(|d| { d.ok_or(AnkiError::NotFound) @@ -659,7 +655,7 @@ impl BackendService for Backend { }) } - fn get_all_decks_legacy(&mut self, _input: Empty) -> BackendResult { + fn get_all_decks_legacy(&self, _input: Empty) -> BackendResult { self.with_col(|col| { let decks = col.storage.get_all_decks_as_schema11()?; serde_json::to_vec(&decks).map_err(Into::into) @@ -667,7 +663,7 @@ impl BackendService for Backend { .map(Into::into) } - fn get_deck_names(&mut self, input: pb::GetDeckNamesIn) -> Result { + fn get_deck_names(&self, input: pb::GetDeckNamesIn) -> Result { self.with_col(|col| { let names = if input.include_filtered { col.get_all_deck_names(input.skip_empty_default)? @@ -683,10 +679,7 @@ impl BackendService for Backend { }) } - fn add_or_update_deck_legacy( - &mut self, - input: pb::AddOrUpdateDeckLegacyIn, - ) -> Result { + fn add_or_update_deck_legacy(&self, input: pb::AddOrUpdateDeckLegacyIn) -> Result { self.with_col(|col| { let schema11: DeckSchema11 = serde_json::from_slice(&input.deck)?; let mut deck: Deck = schema11.into(); @@ -702,7 +695,7 @@ impl BackendService for Backend { }) } - fn new_deck_legacy(&mut self, input: pb::Bool) -> BackendResult { + fn new_deck_legacy(&self, input: pb::Bool) -> BackendResult { let deck = if input.val { Deck::new_filtered() } else { @@ -714,7 +707,7 @@ impl BackendService for Backend { .map(Into::into) } - fn remove_deck(&mut self, input: pb::DeckId) -> BackendResult { + fn remove_deck(&self, input: pb::DeckId) -> BackendResult { self.with_col(|col| col.remove_deck_and_child_decks(input.into())) .map(Into::into) } @@ -723,7 +716,7 @@ impl BackendService for Backend { //---------------------------------------------------- fn add_or_update_deck_config_legacy( - &mut self, + &self, input: AddOrUpdateDeckConfigLegacyIn, ) -> BackendResult { let conf: DeckConfSchema11 = serde_json::from_slice(&input.config)?; @@ -737,7 +730,7 @@ impl BackendService for Backend { .map(Into::into) } - fn all_deck_config_legacy(&mut self, _input: Empty) -> BackendResult { + fn all_deck_config_legacy(&self, _input: Empty) -> BackendResult { self.with_col(|col| { let conf: Vec = col .storage @@ -750,18 +743,18 @@ impl BackendService for Backend { .map(Into::into) } - fn new_deck_config_legacy(&mut self, _input: Empty) -> BackendResult { + fn new_deck_config_legacy(&self, _input: Empty) -> BackendResult { serde_json::to_vec(&DeckConfSchema11::default()) .map_err(Into::into) .map(Into::into) } - fn remove_deck_config(&mut self, input: pb::DeckConfigId) -> BackendResult { + fn remove_deck_config(&self, input: pb::DeckConfigId) -> BackendResult { self.with_col(|col| col.transact(None, |col| col.remove_deck_config(input.into()))) .map(Into::into) } - fn get_deck_config_legacy(&mut self, input: pb::DeckConfigId) -> BackendResult { + fn get_deck_config_legacy(&self, input: pb::DeckConfigId) -> BackendResult { self.with_col(|col| { let conf = col.get_deck_config(input.into(), true)?.unwrap(); let conf: DeckConfSchema11 = conf.into(); @@ -773,7 +766,7 @@ impl BackendService for Backend { // cards //------------------------------------------------------------------- - fn get_card(&mut self, input: pb::CardId) -> BackendResult { + fn get_card(&self, input: pb::CardId) -> BackendResult { self.with_col(|col| { col.storage .get_card(input.into()) @@ -782,7 +775,7 @@ impl BackendService for Backend { }) } - fn update_card(&mut self, input: pb::Card) -> BackendResult { + fn update_card(&self, input: pb::Card) -> BackendResult { let mut card = pbcard_to_native(input)?; self.with_col(|col| { col.transact(None, |ctx| { @@ -796,13 +789,13 @@ impl BackendService for Backend { .map(Into::into) } - fn add_card(&mut self, input: pb::Card) -> BackendResult { + fn add_card(&self, input: pb::Card) -> BackendResult { let mut card = pbcard_to_native(input)?; self.with_col(|col| col.transact(None, |ctx| ctx.add_card(&mut card)))?; Ok(pb::CardId { cid: card.id.0 }) } - fn remove_cards(&mut self, input: pb::RemoveCardsIn) -> BackendResult { + fn remove_cards(&self, input: pb::RemoveCardsIn) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { col.remove_cards_and_orphaned_notes( @@ -817,7 +810,7 @@ impl BackendService for Backend { }) } - fn set_deck(&mut self, input: pb::SetDeckIn) -> BackendResult { + fn set_deck(&self, input: pb::SetDeckIn) -> BackendResult { let cids: Vec<_> = input.card_ids.into_iter().map(CardID).collect(); let deck_id = input.deck_id.into(); self.with_col(|col| col.set_deck(&cids, deck_id).map(Into::into)) @@ -826,14 +819,14 @@ impl BackendService for Backend { // notes //------------------------------------------------------------------- - fn new_note(&mut self, input: pb::NoteTypeId) -> BackendResult { + fn new_note(&self, input: pb::NoteTypeId) -> BackendResult { self.with_col(|col| { let nt = col.get_notetype(input.into())?.ok_or(AnkiError::NotFound)?; Ok(nt.new_note().into()) }) } - fn add_note(&mut self, input: pb::AddNoteIn) -> BackendResult { + fn add_note(&self, input: pb::AddNoteIn) -> BackendResult { self.with_col(|col| { let mut note: Note = input.note.ok_or(AnkiError::NotFound)?.into(); col.add_note(&mut note, DeckID(input.deck_id)) @@ -841,7 +834,7 @@ impl BackendService for Backend { }) } - fn update_note(&mut self, input: pb::Note) -> BackendResult { + fn update_note(&self, input: pb::Note) -> BackendResult { self.with_col(|col| { let mut note: Note = input.into(); col.update_note(&mut note) @@ -849,7 +842,7 @@ impl BackendService for Backend { .map(Into::into) } - fn get_note(&mut self, input: pb::NoteId) -> BackendResult { + fn get_note(&self, input: pb::NoteId) -> BackendResult { self.with_col(|col| { col.storage .get_note(input.into())? @@ -858,7 +851,7 @@ impl BackendService for Backend { }) } - fn remove_notes(&mut self, input: pb::RemoveNotesIn) -> BackendResult { + fn remove_notes(&self, input: pb::RemoveNotesIn) -> BackendResult { self.with_col(|col| { if !input.note_ids.is_empty() { col.remove_notes( @@ -883,7 +876,7 @@ impl BackendService for Backend { }) } - fn add_note_tags(&mut self, input: pb::AddNoteTagsIn) -> BackendResult { + fn add_note_tags(&self, input: pb::AddNoteTagsIn) -> BackendResult { self.with_col(|col| { col.add_tags_for_notes(&to_nids(input.nids), &input.tags) .map(|n| n as u32) @@ -891,7 +884,7 @@ impl BackendService for Backend { .map(Into::into) } - fn update_note_tags(&mut self, input: pb::UpdateNoteTagsIn) -> BackendResult { + fn update_note_tags(&self, input: pb::UpdateNoteTagsIn) -> BackendResult { self.with_col(|col| { col.replace_tags_for_notes( &to_nids(input.nids), @@ -903,10 +896,7 @@ impl BackendService for Backend { }) } - fn cloze_numbers_in_note( - &mut self, - note: pb::Note, - ) -> BackendResult { + fn cloze_numbers_in_note(&self, note: pb::Note) -> BackendResult { let mut set = HashSet::with_capacity(4); for field in ¬e.fields { add_cloze_numbers_in_string(field, &mut set); @@ -917,7 +907,7 @@ impl BackendService for Backend { } fn field_names_for_notes( - &mut self, + &self, input: pb::FieldNamesForNotesIn, ) -> BackendResult { self.with_col(|col| { @@ -928,7 +918,7 @@ impl BackendService for Backend { }) } - fn after_note_updates(&mut self, input: pb::AfterNoteUpdatesIn) -> BackendResult { + fn after_note_updates(&self, input: pb::AfterNoteUpdatesIn) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { col.after_note_updates( @@ -942,7 +932,7 @@ impl BackendService for Backend { } fn note_is_duplicate_or_empty( - &mut self, + &self, input: pb::Note, ) -> BackendResult { let note: Note = input.into(); @@ -952,7 +942,7 @@ impl BackendService for Backend { }) } - fn cards_of_note(&mut self, input: pb::NoteId) -> BackendResult { + fn cards_of_note(&self, input: pb::NoteId) -> BackendResult { self.with_col(|col| { col.storage .all_card_ids_of_note(NoteID(input.nid)) @@ -965,10 +955,7 @@ impl BackendService for Backend { // notetypes //------------------------------------------------------------------- - fn get_stock_notetype_legacy( - &mut self, - input: pb::GetStockNotetypeIn, - ) -> BackendResult { + fn get_stock_notetype_legacy(&self, input: pb::GetStockNotetypeIn) -> BackendResult { // fixme: use individual functions instead of full vec let mut all = all_stock_notetypes(&self.i18n); let idx = (input.kind as usize).min(all.len() - 1); @@ -979,7 +966,7 @@ impl BackendService for Backend { .map(Into::into) } - fn get_notetype_names(&mut self, _input: Empty) -> BackendResult { + fn get_notetype_names(&self, _input: Empty) -> BackendResult { self.with_col(|col| { let entries: Vec<_> = col .storage @@ -991,10 +978,7 @@ impl BackendService for Backend { }) } - fn get_notetype_names_and_counts( - &mut self, - _input: Empty, - ) -> BackendResult { + fn get_notetype_names_and_counts(&self, _input: Empty) -> BackendResult { self.with_col(|col| { let entries: Vec<_> = col .storage @@ -1010,7 +994,7 @@ impl BackendService for Backend { }) } - fn get_notetype_legacy(&mut self, input: pb::NoteTypeId) -> BackendResult { + fn get_notetype_legacy(&self, input: pb::NoteTypeId) -> BackendResult { self.with_col(|col| { let schema11: NoteTypeSchema11 = col .storage @@ -1021,7 +1005,7 @@ impl BackendService for Backend { }) } - fn get_notetype_id_by_name(&mut self, input: pb::String) -> BackendResult { + fn get_notetype_id_by_name(&self, input: pb::String) -> BackendResult { self.with_col(|col| { col.storage .get_notetype_id(&input.val) @@ -1031,7 +1015,7 @@ impl BackendService for Backend { } fn add_or_update_notetype( - &mut self, + &self, input: pb::AddOrUpdateNotetypeIn, ) -> BackendResult { self.with_col(|col| { @@ -1046,7 +1030,7 @@ impl BackendService for Backend { }) } - fn remove_notetype(&mut self, input: pb::NoteTypeId) -> BackendResult { + fn remove_notetype(&self, input: pb::NoteTypeId) -> BackendResult { self.with_col(|col| col.remove_notetype(input.into())) .map(Into::into) } @@ -1054,7 +1038,7 @@ impl BackendService for Backend { // media //------------------------------------------------------------------- - fn add_media_file(&mut self, input: pb::AddMediaFileIn) -> BackendResult { + fn add_media_file(&self, input: pb::AddMediaFileIn) -> BackendResult { self.with_col(|col| { let mgr = MediaManager::new(&col.media_folder, &col.media_db)?; let mut ctx = mgr.dbctx(); @@ -1065,7 +1049,7 @@ impl BackendService for Backend { }) } - fn empty_trash(&mut self, _input: Empty) -> BackendResult { + fn empty_trash(&self, _input: Empty) -> BackendResult { let mut handler = self.new_progress_handler(); let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32), true); @@ -1081,7 +1065,7 @@ impl BackendService for Backend { .map(Into::into) } - fn restore_trash(&mut self, _input: Empty) -> BackendResult { + fn restore_trash(&self, _input: Empty) -> BackendResult { let mut handler = self.new_progress_handler(); let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32), true); @@ -1097,7 +1081,7 @@ impl BackendService for Backend { .map(Into::into) } - fn trash_media_files(&mut self, input: pb::TrashMediaFilesIn) -> BackendResult { + fn trash_media_files(&self, input: pb::TrashMediaFilesIn) -> BackendResult { self.with_col(|col| { let mgr = MediaManager::new(&col.media_folder, &col.media_db)?; let mut ctx = mgr.dbctx(); @@ -1106,7 +1090,7 @@ impl BackendService for Backend { .map(Into::into) } - fn check_media(&mut self, _input: pb::Empty) -> Result { + fn check_media(&self, _input: pb::Empty) -> Result { let mut handler = self.new_progress_handler(); let progress_fn = move |progress| handler.update(Progress::MediaCheck(progress as u32), true); @@ -1131,7 +1115,7 @@ impl BackendService for Backend { // collection //------------------------------------------------------------------- - fn check_database(&mut self, _input: pb::Empty) -> BackendResult { + fn check_database(&self, _input: pb::Empty) -> BackendResult { let mut handler = self.new_progress_handler(); let progress_fn = move |progress, throttle| { handler.update(Progress::DatabaseCheck(progress), throttle); @@ -1144,7 +1128,7 @@ impl BackendService for Backend { }) } - fn open_collection(&mut self, input: pb::OpenCollectionIn) -> BackendResult { + fn open_collection(&self, input: pb::OpenCollectionIn) -> BackendResult { let mut col = self.col.lock().unwrap(); if col.is_some() { return Err(AnkiError::CollectionAlreadyOpen); @@ -1173,7 +1157,7 @@ impl BackendService for Backend { Ok(().into()) } - fn close_collection(&mut self, input: pb::CloseCollectionIn) -> BackendResult { + fn close_collection(&self, input: pb::CloseCollectionIn) -> BackendResult { self.abort_media_sync_and_wait(); let mut col = self.col.lock().unwrap(); @@ -1195,41 +1179,41 @@ impl BackendService for Backend { // sync //------------------------------------------------------------------- - fn sync_login(&mut self, input: pb::SyncLoginIn) -> BackendResult { + fn sync_login(&self, input: pb::SyncLoginIn) -> BackendResult { self.sync_login_inner(input) } - fn sync_status(&mut self, input: pb::SyncAuth) -> BackendResult { + fn sync_status(&self, input: pb::SyncAuth) -> BackendResult { self.sync_status_inner(input) } - fn sync_collection(&mut self, input: pb::SyncAuth) -> BackendResult { + fn sync_collection(&self, input: pb::SyncAuth) -> BackendResult { self.sync_collection_inner(input) } - fn full_upload(&mut self, input: pb::SyncAuth) -> BackendResult { + fn full_upload(&self, input: pb::SyncAuth) -> BackendResult { self.full_sync_inner(input, true)?; Ok(().into()) } - fn full_download(&mut self, input: pb::SyncAuth) -> BackendResult { + fn full_download(&self, input: pb::SyncAuth) -> BackendResult { self.full_sync_inner(input, false)?; Ok(().into()) } - fn sync_media(&mut self, input: pb::SyncAuth) -> BackendResult { + fn sync_media(&self, input: pb::SyncAuth) -> BackendResult { self.sync_media_inner(input).map(Into::into) } - fn abort_sync(&mut self, _input: Empty) -> BackendResult { - if let Some(handle) = self.sync_abort.take() { + fn abort_sync(&self, _input: Empty) -> BackendResult { + if let Some(handle) = self.sync_abort.lock().unwrap().take() { handle.abort(); } Ok(().into()) } /// Abort the media sync. Does not wait for completion. - fn abort_media_sync(&mut self, _input: Empty) -> BackendResult { + fn abort_media_sync(&self, _input: Empty) -> BackendResult { let guard = self.state.lock().unwrap(); if let Some(handle) = &guard.media_sync_abort { handle.abort(); @@ -1237,14 +1221,14 @@ impl BackendService for Backend { Ok(().into()) } - fn before_upload(&mut self, _input: Empty) -> BackendResult { + fn before_upload(&self, _input: Empty) -> BackendResult { self.with_col(|col| col.before_upload().map(Into::into)) } // i18n/messages //------------------------------------------------------------------- - fn translate_string(&mut self, input: pb::TranslateStringIn) -> BackendResult { + fn translate_string(&self, input: pb::TranslateStringIn) -> BackendResult { let key = match pb::FluentString::from_i32(input.key) { Some(key) => key, None => return Ok("invalid key".to_string().into()), @@ -1259,7 +1243,7 @@ impl BackendService for Backend { Ok(self.i18n.trn(key, map).into()) } - fn format_timespan(&mut self, input: pb::FormatTimespanIn) -> BackendResult { + fn format_timespan(&self, input: pb::FormatTimespanIn) -> BackendResult { let context = match pb::format_timespan_in::Context::from_i32(input.context) { Some(context) => context, None => return Ok("".to_string().into()), @@ -1276,7 +1260,7 @@ impl BackendService for Backend { .into()) } - fn i18n_resources(&mut self, _input: Empty) -> BackendResult { + fn i18n_resources(&self, _input: Empty) -> BackendResult { serde_json::to_vec(&self.i18n.resources_for_js()) .map(Into::into) .map_err(Into::into) @@ -1285,7 +1269,7 @@ impl BackendService for Backend { // tags //------------------------------------------------------------------- - fn all_tags(&mut self, _input: Empty) -> BackendResult { + fn all_tags(&self, _input: Empty) -> BackendResult { let tags = self.with_col(|col| col.storage.all_tags())?; let tags: Vec<_> = tags .into_iter() @@ -1294,7 +1278,7 @@ impl BackendService for Backend { Ok(pb::AllTagsOut { tags }) } - fn register_tags(&mut self, input: pb::RegisterTagsIn) -> BackendResult { + fn register_tags(&self, input: pb::RegisterTagsIn) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { let usn = if input.preserve_usn { @@ -1311,7 +1295,7 @@ impl BackendService for Backend { // config/preferences //------------------------------------------------------------------- - fn get_config_json(&mut self, input: pb::String) -> BackendResult { + fn get_config_json(&self, input: pb::String) -> BackendResult { self.with_col(|col| { let val: Option = col.get_config_optional(input.val.as_str()); val.ok_or(AnkiError::NotFound) @@ -1320,7 +1304,7 @@ impl BackendService for Backend { }) } - fn set_config_json(&mut self, input: pb::SetConfigJsonIn) -> BackendResult { + fn set_config_json(&self, input: pb::SetConfigJsonIn) -> BackendResult { self.with_col(|col| { col.transact(None, |col| { // ensure it's a well-formed object @@ -1331,12 +1315,12 @@ impl BackendService for Backend { .map(Into::into) } - fn remove_config(&mut self, input: pb::String) -> BackendResult { + fn remove_config(&self, input: pb::String) -> BackendResult { self.with_col(|col| col.transact(None, |col| col.remove_config(input.val.as_str()))) .map(Into::into) } - fn set_all_config(&mut self, input: pb::Json) -> BackendResult { + fn set_all_config(&self, input: pb::Json) -> BackendResult { let val: HashMap = serde_json::from_slice(&input.json)?; self.with_col(|col| { col.transact(None, |col| { @@ -1347,7 +1331,7 @@ impl BackendService for Backend { .map(Into::into) } - fn get_all_config(&mut self, _input: Empty) -> BackendResult { + fn get_all_config(&self, _input: Empty) -> BackendResult { self.with_col(|col| { let conf = col.storage.get_all_config()?; serde_json::to_vec(&conf).map_err(Into::into) @@ -1355,11 +1339,11 @@ impl BackendService for Backend { .map(Into::into) } - fn get_preferences(&mut self, _input: Empty) -> BackendResult { + fn get_preferences(&self, _input: Empty) -> BackendResult { self.with_col(|col| col.get_preferences()) } - fn set_preferences(&mut self, input: pb::Preferences) -> BackendResult { + fn set_preferences(&self, input: pb::Preferences) -> BackendResult { self.with_col(|col| col.transact(None, |col| col.set_preferences(input))) .map(Into::into) } @@ -1371,12 +1355,12 @@ impl Backend { col: Arc::new(Mutex::new(None)), i18n, server, - sync_abort: None, + sync_abort: Arc::new(Mutex::new(None)), progress_state: Arc::new(Mutex::new(ProgressState { want_abort: false, last_progress: None, })), - runtime: None, + runtime: OnceCell::new(), state: Arc::new(Mutex::new(BackendState::default())), } } @@ -1385,11 +1369,7 @@ impl Backend { &self.i18n } - pub fn run_command_bytes( - &mut self, - method: u32, - input: &[u8], - ) -> result::Result, Vec> { + pub fn run_command_bytes(&self, method: u32, input: &[u8]) -> result::Result, Vec> { self.run_command_bytes2_inner(method, input).map_err(|err| { let backend_err = anki_error_to_proto_error(err, &self.i18n); let mut bytes = Vec::new(); @@ -1421,26 +1401,54 @@ impl Backend { guard.last_progress = None; } ThrottlingProgressHandler { - state: self.progress_state.clone(), + state: Arc::clone(&self.progress_state), last_update: coarsetime::Instant::now(), } } - fn runtime_handle(&mut self) -> runtime::Handle { - if self.runtime.is_none() { - self.runtime = Some( + fn runtime_handle(&self) -> runtime::Handle { + self.runtime + .get_or_init(|| { runtime::Builder::new() .threaded_scheduler() .core_threads(1) .enable_all() .build() - .unwrap(), - ) - } - self.runtime.as_ref().unwrap().handle().clone() + .unwrap() + }) + .handle() + .clone() } - fn sync_media_inner(&mut self, input: pb::SyncAuth) -> Result<()> { + fn sync_abort_handle( + &self, + ) -> BackendResult<( + scopeguard::ScopeGuard, + AbortRegistration, + )> { + let (abort_handle, abort_reg) = AbortHandle::new_pair(); + + // Register the new abort_handle. + let old_handle = self.sync_abort.lock().unwrap().replace(abort_handle); + if old_handle.is_some() { + // NOTE: In the future we would ideally be able to handle multiple + // abort handles by just iterating over them all in + // abort_sync). But for now, just log a warning if there was + // already one present -- but don't abort it either. + let log = self.with_col(|col| Ok(col.log.clone()))?; + warn!( + log, + "new sync_abort handle registered, but old one was still present (old sync job might not be cancelled on abort)" + ); + } + // Clear the abort handle after the caller is done and drops the guard. + let guard = scopeguard::guard(Arc::clone(&self.sync_abort), |sync_abort| { + sync_abort.lock().unwrap().take(); + }); + Ok((guard, abort_reg)) + } + + fn sync_media_inner(&self, input: pb::SyncAuth) -> Result<()> { // mark media sync as active let (abort_handle, abort_reg) = AbortHandle::new_pair(); { @@ -1485,7 +1493,7 @@ impl Backend { } /// Abort the media sync. Won't return until aborted. - fn abort_media_sync_and_wait(&mut self) { + fn abort_media_sync_and_wait(&self) { let guard = self.state.lock().unwrap(); if let Some(handle) = &guard.media_sync_abort { handle.abort(); @@ -1500,9 +1508,8 @@ impl Backend { } } - fn sync_login_inner(&mut self, input: pb::SyncLoginIn) -> BackendResult { - let (abort_handle, abort_reg) = AbortHandle::new_pair(); - self.sync_abort = Some(abort_handle); + fn sync_login_inner(&self, input: pb::SyncLoginIn) -> BackendResult { + let (_guard, abort_reg) = self.sync_abort_handle()?; let rt = self.runtime_handle(); let sync_fut = sync_login(&input.username, &input.password); @@ -1511,14 +1518,13 @@ impl Backend { Ok(sync_result) => sync_result, Err(_) => Err(AnkiError::Interrupted), }; - self.sync_abort = None; ret.map(|a| pb::SyncAuth { hkey: a.hkey, host_number: a.host_number, }) } - fn sync_status_inner(&mut self, input: pb::SyncAuth) -> BackendResult { + fn sync_status_inner(&self, input: pb::SyncAuth) -> BackendResult { // any local changes mean we can skip the network round-trip let req = self.with_col(|col| col.get_local_sync_status())?; if req != pb::sync_status_out::Required::NoChanges { @@ -1547,12 +1553,8 @@ impl Backend { Ok(response.into()) } - fn sync_collection_inner( - &mut self, - input: pb::SyncAuth, - ) -> BackendResult { - let (abort_handle, abort_reg) = AbortHandle::new_pair(); - self.sync_abort = Some(abort_handle); + fn sync_collection_inner(&self, input: pb::SyncAuth) -> BackendResult { + let (_guard, abort_reg) = self.sync_abort_handle()?; let rt = self.runtime_handle(); let input_copy = input.clone(); @@ -1580,7 +1582,6 @@ impl Backend { } } }); - self.sync_abort = None; let output: SyncOutput = ret?; self.state @@ -1591,7 +1592,7 @@ impl Backend { Ok(output.into()) } - fn full_sync_inner(&mut self, input: pb::SyncAuth, upload: bool) -> Result<()> { + fn full_sync_inner(&self, input: pb::SyncAuth, upload: bool) -> Result<()> { self.abort_media_sync_and_wait(); let rt = self.runtime_handle(); @@ -1603,8 +1604,7 @@ impl Backend { let col_inner = col.take().unwrap(); - let (abort_handle, abort_reg) = AbortHandle::new_pair(); - self.sync_abort = Some(abort_handle); + let (_guard, abort_reg) = self.sync_abort_handle()?; let col_path = col_inner.col_path.clone(); let media_folder_path = col_inner.media_folder.clone(); @@ -1625,7 +1625,6 @@ impl Backend { let abortable_sync = Abortable::new(sync_fut, abort_reg); rt.block_on(abortable_sync) }; - self.sync_abort = None; // ensure re-opened regardless of outcome col.replace(open_collection( diff --git a/rspy/src/lib.rs b/rspy/src/lib.rs index ca501182d..0986ead04 100644 --- a/rspy/src/lib.rs +++ b/rspy/src/lib.rs @@ -61,10 +61,10 @@ fn want_release_gil(method: u32) -> bool { #[pymethods] impl Backend { - fn command(&mut self, py: Python, method: u32, input: &PyBytes) -> PyResult { + fn command(&self, py: Python, method: u32, input: &PyBytes) -> PyResult { let in_bytes = input.as_bytes(); if want_release_gil(method) { - py.allow_threads(move || self.backend.run_command_bytes(method, in_bytes)) + py.allow_threads(|| self.backend.run_command_bytes(method, in_bytes)) } else { self.backend.run_command_bytes(method, in_bytes) } @@ -77,9 +77,9 @@ impl Backend { /// This takes and returns JSON, due to Python's slow protobuf /// encoding/decoding. - fn db_command(&mut self, py: Python, input: &PyBytes) -> PyResult { + fn db_command(&self, py: Python, input: &PyBytes) -> PyResult { let in_bytes = input.as_bytes(); - let out_res = py.allow_threads(move || { + let out_res = py.allow_threads(|| { self.backend .run_db_command_bytes(in_bytes) .map_err(BackendError::py_err)