From 9067bf98bdbbdc0474dda5845ebecac3e3fc9f4c Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Wed, 5 Feb 2020 20:23:29 +1000 Subject: [PATCH] handle concurrent modifications and ankiweb terminating early --- rslib/src/media/sync.rs | 62 ++++++++++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/rslib/src/media/sync.rs b/rslib/src/media/sync.rs index 2a7017a92..911a23ab5 100644 --- a/rslib/src/media/sync.rs +++ b/rslib/src/media/sync.rs @@ -24,8 +24,6 @@ static SYNC_MAX_FILES: usize = 25; static SYNC_MAX_BYTES: usize = (2.5 * 1024.0 * 1024.0) as usize; static SYNC_SINGLE_FILE_MAX_BYTES: usize = 100 * 1024 * 1024; -// fixme: concurrent modifications during upload step - /// The counts are not cumulative - the progress hook should accumulate them. #[derive(Debug)] pub enum Progress { @@ -157,19 +155,40 @@ where break; } - let file_count = pending.iter().filter(|e| e.sha1.is_some()).count(); - let zip_data = zip_files(&self.mgr.media_folder, &pending)?; - self.send_zip_data(zip_data).await?; + let reply = self.send_zip_data(zip_data).await?; + + let (processed_files, processed_deletions): (Vec<_>, Vec<_>) = pending + .iter() + .take(reply.processed) + .partition(|e| e.sha1.is_some()); self.progress(Progress::Uploaded { - files: file_count, - deletions: pending.len() - file_count, + files: processed_files.len(), + deletions: processed_deletions.len(), })?; - let fnames: Vec<_> = pending.iter().map(|e| &e.fname).collect(); - self.ctx - .transact(|ctx| record_clean(ctx, fnames.as_slice()))?; + let fnames: Vec<_> = processed_files + .iter() + .chain(processed_deletions.iter()) + .map(|e| &e.fname) + .collect(); + let fname_cnt = fnames.len() as i32; + self.ctx.transact(|ctx| { + record_clean(ctx, fnames.as_slice())?; + let mut meta = ctx.get_meta()?; + if meta.last_sync_usn + fname_cnt == reply.current_usn { + meta.last_sync_usn = reply.current_usn; + ctx.set_meta(&meta)?; + } else { + debug!( + "server usn {} is not {}, skipping usn update", + reply.current_usn, + meta.last_sync_usn + fname_cnt + ); + } + Ok(()) + })?; } Ok(()) @@ -230,12 +249,17 @@ where resp.bytes().await.map_err(Into::into) } - async fn send_zip_data(&self, data: Vec) -> Result<()> { + async fn send_zip_data(&self, data: Vec) -> Result { let url = format!("{}uploadChanges", self.endpoint); - ankiweb_bytes_request(&self.client, &url, data, self.skey()).await?; + let resp = ankiweb_bytes_request(&self.client, &url, data, self.skey()).await?; + let res: UploadResult = resp.json().await?; - Ok(()) + if let Some(reply) = res.data { + Ok(reply) + } else { + Err(AnkiError::server_message(res.err)) + } } } @@ -568,6 +592,18 @@ struct UploadEntry<'a> { in_zip_name: Option, } +#[derive(Deserialize, Debug)] +struct UploadResult { + data: Option, + err: String, +} + +#[derive(Deserialize, Debug)] +struct UploadReply { + processed: usize, + current_usn: i32, +} + fn zip_files(media_folder: &Path, files: &[MediaEntry]) -> Result> { let buf = vec![];