Populate media DB on import (#1977)

* Refactor MediaManager transactions

* Add media entries when importing

* Fix legacy apkg import not setting csum

Also test for this in the roundtrip test.

* Avoid reallocating MediaCopier's buffer

* Make sha1 optional (dae)
This commit is contained in:
RumovZ 2022-07-22 09:50:15 +02:00 committed by GitHub
parent 070c8ac735
commit 09841c7c0f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 131 additions and 90 deletions

View file

@ -8,13 +8,13 @@ use zip::ZipArchive;
use super::Context; use super::Context;
use crate::{ use crate::{
import_export::{ import_export::{
package::media::{extract_media_entries, SafeMediaEntry}, package::{
colpkg::export::MediaCopier,
media::{extract_media_entries, SafeMediaEntry},
},
ImportProgress, IncrementableProgress, ImportProgress, IncrementableProgress,
}, },
media::{ media::files::{add_hash_suffix_to_file_stem, sha1_of_reader},
files::{add_hash_suffix_to_file_stem, sha1_of_reader},
MediaManager,
},
prelude::*, prelude::*,
}; };
@ -37,7 +37,9 @@ impl Context<'_> {
} }
let db_progress_fn = self.progress.media_db_fn(ImportProgress::MediaCheck)?; let db_progress_fn = self.progress.media_db_fn(ImportProgress::MediaCheck)?;
let existing_sha1s = self.target_col.all_existing_sha1s(db_progress_fn)?; let existing_sha1s = self
.media_manager
.all_checksums(db_progress_fn, &self.target_col.log)?;
prepare_media( prepare_media(
media_entries, media_entries,
@ -49,21 +51,21 @@ impl Context<'_> {
pub(super) fn copy_media(&mut self, media_map: &mut MediaUseMap) -> Result<()> { pub(super) fn copy_media(&mut self, media_map: &mut MediaUseMap) -> Result<()> {
let mut incrementor = self.progress.incrementor(ImportProgress::Media); let mut incrementor = self.progress.incrementor(ImportProgress::Media);
for entry in media_map.used_entries() { let mut dbctx = self.media_manager.dbctx();
incrementor.increment()?; let mut copier = MediaCopier::new(false);
entry.copy_from_archive(&mut self.archive, &self.target_col.media_folder)?; self.media_manager.transact(&mut dbctx, |dbctx| {
} for entry in media_map.used_entries() {
Ok(()) incrementor.increment()?;
} entry.copy_and_ensure_sha1_set(
} &mut self.archive,
&self.target_col.media_folder,
impl Collection { &mut copier,
fn all_existing_sha1s( )?;
&mut self, self.media_manager
progress_fn: impl FnMut(usize) -> bool, .add_entry(dbctx, &entry.name, entry.sha1.unwrap())?;
) -> Result<HashMap<String, Sha1Hash>> { }
let mgr = MediaManager::new(&self.media_folder, &self.media_db)?; Ok(())
mgr.all_checksums(progress_fn, &self.log) })
} }
} }
@ -84,8 +86,8 @@ fn prepare_media(
media_map.unchecked.push(entry); media_map.unchecked.push(entry);
} }
} else if let Some(other_sha1) = existing_sha1s.get(&entry.name) { } else if let Some(other_sha1) = existing_sha1s.get(&entry.name) {
entry.with_hash_from_archive(archive)?; entry.ensure_sha1_set(archive)?;
if entry.sha1 != *other_sha1 { if entry.sha1.unwrap() != *other_sha1 {
let original_name = entry.uniquify_name(); let original_name = entry.uniquify_name();
media_map.add_checked(original_name, entry); media_map.add_checked(original_name, entry);
} }
@ -108,26 +110,26 @@ impl MediaUseMap {
}) })
} }
pub(super) fn used_entries(&self) -> impl Iterator<Item = &SafeMediaEntry> { pub(super) fn used_entries(&mut self) -> impl Iterator<Item = &mut SafeMediaEntry> {
self.checked self.checked
.values() .values_mut()
.filter_map(|(used, entry)| used.then(|| entry)) .filter_map(|(used, entry)| used.then(|| entry))
.chain(self.unchecked.iter()) .chain(self.unchecked.iter_mut())
} }
} }
impl SafeMediaEntry { impl SafeMediaEntry {
fn with_hash_from_archive(&mut self, archive: &mut ZipArchive<File>) -> Result<()> { fn ensure_sha1_set(&mut self, archive: &mut ZipArchive<File>) -> Result<()> {
if self.sha1 == [0; 20] { if self.sha1.is_none() {
let mut reader = self.fetch_file(archive)?; let mut reader = self.fetch_file(archive)?;
self.sha1 = sha1_of_reader(&mut reader)?; self.sha1 = Some(sha1_of_reader(&mut reader)?);
} }
Ok(()) Ok(())
} }
/// Requires sha1 to be set. Returns old file name. /// Requires sha1 to be set. Returns old file name.
fn uniquify_name(&mut self) -> String { fn uniquify_name(&mut self) -> String {
let new_name = add_hash_suffix_to_file_stem(&self.name, &self.sha1); let new_name = add_hash_suffix_to_file_stem(&self.name, &self.sha1.expect("sha1 not set"));
mem::replace(&mut self.name, new_name) mem::replace(&mut self.name, new_name)
} }

View file

@ -19,12 +19,14 @@ use crate::{
import_export::{ import_export::{
gather::ExchangeData, package::Meta, ImportProgress, IncrementableProgress, NoteLog, gather::ExchangeData, package::Meta, ImportProgress, IncrementableProgress, NoteLog,
}, },
media::MediaManager,
prelude::*, prelude::*,
search::SearchNode, search::SearchNode,
}; };
struct Context<'a> { struct Context<'a> {
target_col: &'a mut Collection, target_col: &'a mut Collection,
media_manager: MediaManager,
archive: ZipArchive<File>, archive: ZipArchive<File>,
meta: Meta, meta: Meta,
data: ExchangeData, data: ExchangeData,
@ -56,6 +58,7 @@ impl<'a> Context<'a> {
) -> Result<Self> { ) -> Result<Self> {
let mut progress = IncrementableProgress::new(progress_fn); let mut progress = IncrementableProgress::new(progress_fn);
progress.call(ImportProgress::Extracting)?; progress.call(ImportProgress::Extracting)?;
let media_manager = MediaManager::new(&target_col.media_folder, &target_col.media_db)?;
let meta = Meta::from_archive(&mut archive)?; let meta = Meta::from_archive(&mut archive)?;
let data = ExchangeData::gather_from_archive( let data = ExchangeData::gather_from_archive(
&mut archive, &mut archive,
@ -67,6 +70,7 @@ impl<'a> Context<'a> {
let usn = target_col.usn()?; let usn = target_col.usn()?;
Ok(Self { Ok(Self {
target_col, target_col,
media_manager,
archive, archive,
meta, meta,
data, data,

View file

@ -6,7 +6,10 @@
use std::{collections::HashSet, fs::File, io::Write}; use std::{collections::HashSet, fs::File, io::Write};
use crate::{ use crate::{
media::files::sha1_of_data, prelude::*, search::SearchNode, tests::open_fs_test_collection, media::{files::sha1_of_data, MediaManager},
prelude::*,
search::SearchNode,
tests::open_fs_test_collection,
}; };
const SAMPLE_JPG: &str = "sample.jpg"; const SAMPLE_JPG: &str = "sample.jpg";
@ -132,9 +135,13 @@ impl Collection {
fn assert_note_and_media(&mut self, note: &Note) { fn assert_note_and_media(&mut self, note: &Note) {
let sha1 = sha1_of_data(MP3_DATA); let sha1 = sha1_of_data(MP3_DATA);
let new_mp3_name = format!("sample-{}.mp3", hex::encode(&sha1)); let new_mp3_name = format!("sample-{}.mp3", hex::encode(&sha1));
let csums = MediaManager::new(&self.media_folder, &self.media_db)
.unwrap()
.all_checksums_as_is();
for file in [SAMPLE_JPG, SAMPLE_JS, &new_mp3_name] { for file in [SAMPLE_JPG, SAMPLE_JS, &new_mp3_name] {
assert!(self.media_folder.join(file).exists()) assert!(self.media_folder.join(file).exists());
assert!(*csums.get(file).unwrap() != [0; 20]);
} }
let imported_note = self.storage.get_note(note.id).unwrap().unwrap(); let imported_note = self.storage.get_note(note.id).unwrap().unwrap();

View file

@ -282,7 +282,7 @@ fn write_media_files(
media_entries: &mut Vec<MediaEntry>, media_entries: &mut Vec<MediaEntry>,
progress: &mut IncrementableProgress<ExportProgress>, progress: &mut IncrementableProgress<ExportProgress>,
) -> Result<()> { ) -> Result<()> {
let mut copier = MediaCopier::new(meta); let mut copier = MediaCopier::new(meta.zstd_compressed());
let mut incrementor = progress.incrementor(ExportProgress::Media); let mut incrementor = progress.incrementor(ExportProgress::Media);
for (index, res) in media.0.enumerate() { for (index, res) in media.0.enumerate() {
incrementor.increment()?; incrementor.increment()?;
@ -315,18 +315,20 @@ fn normalized_unicode_file_name(filename: &OsStr) -> Result<String> {
.ok_or(AnkiError::MediaCheckRequired) .ok_or(AnkiError::MediaCheckRequired)
} }
/// Copies and hashes while encoding according to the targeted version. /// Copies and hashes while optionally encoding.
/// If compressing, the encoder is reused to optimize for repeated calls. /// If compressing, the encoder is reused to optimize for repeated calls.
struct MediaCopier { pub(crate) struct MediaCopier {
encoding: bool, encoding: bool,
encoder: Option<RawEncoder<'static>>, encoder: Option<RawEncoder<'static>>,
buf: [u8; 64 * 1024],
} }
impl MediaCopier { impl MediaCopier {
fn new(meta: &Meta) -> Self { pub(crate) fn new(encoding: bool) -> Self {
Self { Self {
encoding: meta.zstd_compressed(), encoding,
encoder: None, encoder: None,
buf: [0; 64 * 1024],
} }
} }
@ -339,25 +341,25 @@ impl MediaCopier {
} }
/// Returns size and sha1 hash of the copied data. /// Returns size and sha1 hash of the copied data.
fn copy( pub(crate) fn copy(
&mut self, &mut self,
reader: &mut impl Read, reader: &mut impl Read,
writer: &mut impl Write, writer: &mut impl Write,
) -> Result<(usize, Sha1Hash)> { ) -> Result<(usize, Sha1Hash)> {
let mut size = 0; let mut size = 0;
let mut hasher = Sha1::new(); let mut hasher = Sha1::new();
let mut buf = [0; 64 * 1024]; self.buf = [0; 64 * 1024];
let mut wrapped_writer = MaybeEncodedWriter::new(writer, self.encoder()); let mut wrapped_writer = MaybeEncodedWriter::new(writer, self.encoder());
loop { loop {
let count = match reader.read(&mut buf) { let count = match reader.read(&mut self.buf) {
Ok(0) => break, Ok(0) => break,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue, Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
result => result?, result => result?,
}; };
size += count; size += count;
hasher.update(&buf[..count]); hasher.update(&self.buf[..count]);
wrapped_writer.write(&buf[..count])?; wrapped_writer.write(&self.buf[..count])?;
} }
self.encoder = wrapped_writer.finish()?; self.encoder = wrapped_writer.finish()?;
@ -410,7 +412,7 @@ mod test {
let bytes_hash = sha1_of_data(b"foo"); let bytes_hash = sha1_of_data(b"foo");
for meta in [Meta::new_legacy(), Meta::new()] { for meta in [Meta::new_legacy(), Meta::new()] {
let mut writer = MediaCopier::new(&meta); let mut writer = MediaCopier::new(meta.zstd_compressed());
let mut buf = Vec::new(); let mut buf = Vec::new();
let (size, hash) = writer.copy(&mut bytes.as_slice(), &mut buf).unwrap(); let (size, hash) = writer.copy(&mut bytes.as_slice(), &mut buf).unwrap();

View file

@ -14,7 +14,7 @@ use tempfile::NamedTempFile;
use zip::{read::ZipFile, ZipArchive}; use zip::{read::ZipFile, ZipArchive};
use zstd::stream::copy_decode; use zstd::stream::copy_decode;
use super::{MediaEntries, MediaEntry, Meta}; use super::{colpkg::export::MediaCopier, MediaEntries, MediaEntry, Meta};
use crate::{ use crate::{
error::ImportError, error::ImportError,
io::{atomic_rename, filename_is_safe}, io::{atomic_rename, filename_is_safe},
@ -26,7 +26,7 @@ use crate::{
pub(super) struct SafeMediaEntry { pub(super) struct SafeMediaEntry {
pub(super) name: String, pub(super) name: String,
pub(super) size: u32, pub(super) size: u32,
pub(super) sha1: Sha1Hash, pub(super) sha1: Option<Sha1Hash>,
pub(super) index: usize, pub(super) index: usize,
} }
@ -53,7 +53,7 @@ impl SafeMediaEntry {
return Ok(Self { return Ok(Self {
name: entry.name, name: entry.name,
size: entry.size, size: entry.size,
sha1, sha1: Some(sha1),
index, index,
}); });
} }
@ -70,7 +70,7 @@ impl SafeMediaEntry {
Ok(Self { Ok(Self {
name, name,
size: 0, size: 0,
sha1: [0; 20], sha1: None,
index: zip_filename, index: zip_filename,
}) })
} }
@ -89,21 +89,29 @@ impl SafeMediaEntry {
&self, &self,
get_checksum: &mut impl FnMut(&str) -> Result<Option<Sha1Hash>>, get_checksum: &mut impl FnMut(&str) -> Result<Option<Sha1Hash>>,
) -> Result<bool> { ) -> Result<bool> {
get_checksum(&self.name).map(|opt| opt.map_or(false, |sha1| sha1 == self.sha1)) get_checksum(&self.name)
.map(|opt| opt.map_or(false, |sha1| sha1 == self.sha1.expect("sha1 not set")))
} }
pub(super) fn has_size_equal_to(&self, other_path: &Path) -> bool { pub(super) fn has_size_equal_to(&self, other_path: &Path) -> bool {
fs::metadata(other_path).map_or(false, |metadata| metadata.len() == self.size as u64) fs::metadata(other_path).map_or(false, |metadata| metadata.len() == self.size as u64)
} }
pub(super) fn copy_from_archive( /// Copy the archived file to the target folder, setting its hash if necessary.
&self, pub(super) fn copy_and_ensure_sha1_set(
&mut self,
archive: &mut ZipArchive<File>, archive: &mut ZipArchive<File>,
target_folder: &Path, target_folder: &Path,
copier: &mut MediaCopier,
) -> Result<()> { ) -> Result<()> {
let mut file = self.fetch_file(archive)?; let mut file = self.fetch_file(archive)?;
let mut tempfile = NamedTempFile::new_in(target_folder)?; let mut tempfile = NamedTempFile::new_in(target_folder)?;
io::copy(&mut file, &mut tempfile)?; if self.sha1.is_none() {
let (_, sha1) = copier.copy(&mut file, &mut tempfile)?;
self.sha1 = Some(sha1);
} else {
io::copy(&mut file, &mut tempfile)?;
}
atomic_rename(tempfile, &self.file_path(target_folder), false) atomic_rename(tempfile, &self.file_path(target_folder), false)
} }
} }

View file

@ -52,31 +52,23 @@ impl MediaManager {
/// appended to the name. /// appended to the name.
/// ///
/// Also notes the file in the media database. /// Also notes the file in the media database.
#[allow(clippy::match_like_matches_macro)]
pub fn add_file<'a>( pub fn add_file<'a>(
&self, &self,
ctx: &mut MediaDatabaseContext, ctx: &mut MediaDatabaseContext,
desired_name: &'a str, desired_name: &'a str,
data: &[u8], data: &[u8],
) -> Result<Cow<'a, str>> { ) -> Result<Cow<'a, str>> {
let pre_add_folder_mtime = mtime_as_i64(&self.media_folder)?;
// add file to folder
let data_hash = sha1_of_data(data); let data_hash = sha1_of_data(data);
let chosen_fname =
add_data_to_folder_uniquely(&self.media_folder, desired_name, data, data_hash)?;
let file_mtime = mtime_as_i64(self.media_folder.join(chosen_fname.as_ref()))?;
let post_add_folder_mtime = mtime_as_i64(&self.media_folder)?;
// add to the media DB self.transact(ctx, |ctx| {
ctx.transact(|ctx| { let chosen_fname =
add_data_to_folder_uniquely(&self.media_folder, desired_name, data, data_hash)?;
let file_mtime = mtime_as_i64(self.media_folder.join(chosen_fname.as_ref()))?;
let existing_entry = ctx.get_entry(&chosen_fname)?; let existing_entry = ctx.get_entry(&chosen_fname)?;
let new_sha1 = Some(data_hash); let new_sha1 = Some(data_hash);
let entry_update_required = match existing_entry { let entry_update_required = existing_entry.map(|e| e.sha1 != new_sha1).unwrap_or(true);
Some(existing) if existing.sha1 == new_sha1 => false,
_ => true,
};
if entry_update_required { if entry_update_required {
ctx.set_entry(&MediaEntry { ctx.set_entry(&MediaEntry {
@ -87,34 +79,16 @@ impl MediaManager {
})?; })?;
} }
let mut meta = ctx.get_meta()?; Ok(chosen_fname)
if meta.folder_mtime == pre_add_folder_mtime { })
// if media db was in sync with folder prior to this add,
// we can keep it in sync
meta.folder_mtime = post_add_folder_mtime;
ctx.set_meta(&meta)?;
} else {
// otherwise, leave it alone so that other pending changes
// get picked up later
}
Ok(())
})?;
Ok(chosen_fname)
} }
pub fn remove_files<S>(&self, ctx: &mut MediaDatabaseContext, filenames: &[S]) -> Result<()> pub fn remove_files<S>(&self, ctx: &mut MediaDatabaseContext, filenames: &[S]) -> Result<()>
where where
S: AsRef<str> + std::fmt::Debug, S: AsRef<str> + std::fmt::Debug,
{ {
let pre_remove_folder_mtime = mtime_as_i64(&self.media_folder)?; self.transact(ctx, |ctx| {
remove_files(&self.media_folder, filenames)?;
remove_files(&self.media_folder, filenames)?;
let post_remove_folder_mtime = mtime_as_i64(&self.media_folder)?;
ctx.transact(|ctx| {
for fname in filenames { for fname in filenames {
if let Some(mut entry) = ctx.get_entry(fname.as_ref())? { if let Some(mut entry) = ctx.get_entry(fname.as_ref())? {
entry.sha1 = None; entry.sha1 = None;
@ -123,19 +97,50 @@ impl MediaManager {
ctx.set_entry(&entry)?; ctx.set_entry(&entry)?;
} }
} }
Ok(())
})
}
/// Opens a transaction and manages folder mtime, so user should perform not
/// only db ops, but also all file ops inside the closure.
pub(crate) fn transact<T>(
&self,
ctx: &mut MediaDatabaseContext,
func: impl FnOnce(&mut MediaDatabaseContext) -> Result<T>,
) -> Result<T> {
let start_folder_mtime = mtime_as_i64(&self.media_folder)?;
ctx.transact(|ctx| {
let out = func(ctx)?;
let mut meta = ctx.get_meta()?; let mut meta = ctx.get_meta()?;
if meta.folder_mtime == pre_remove_folder_mtime { if meta.folder_mtime == start_folder_mtime {
// if media db was in sync with folder prior to this add, // if media db was in sync with folder prior to this add,
// we can keep it in sync // we can keep it in sync
meta.folder_mtime = post_remove_folder_mtime; meta.folder_mtime = mtime_as_i64(&self.media_folder)?;
ctx.set_meta(&meta)?; ctx.set_meta(&meta)?;
} else { } else {
// otherwise, leave it alone so that other pending changes // otherwise, leave it alone so that other pending changes
// get picked up later // get picked up later
} }
Ok(()) Ok(out)
})
}
/// Set entry for a newly added file. Caller must ensure transaction.
pub(crate) fn add_entry(
&self,
ctx: &mut MediaDatabaseContext,
fname: impl Into<String>,
sha1: [u8; 20],
) -> Result<()> {
let fname = fname.into();
let mtime = mtime_as_i64(self.media_folder.join(&fname))?;
ctx.set_entry(&MediaEntry {
fname,
mtime,
sha1: Some(sha1),
sync_required: true,
}) })
} }
@ -185,3 +190,16 @@ impl MediaManager {
ChangeTracker::new(&self.media_folder, progress, log).register_changes(&mut self.dbctx()) ChangeTracker::new(&self.media_folder, progress, log).register_changes(&mut self.dbctx())
} }
} }
#[cfg(test)]
mod test {
use super::*;
impl MediaManager {
/// All checksums without registering changes first.
pub(crate) fn all_checksums_as_is(&self) -> HashMap<String, [u8; 20]> {
let mut dbctx = self.dbctx();
dbctx.all_checksums().unwrap()
}
}
}