Anki/qt/aqt/import_export/importing.py
2022-04-25 08:32:38 +02:00

124 lines
3.8 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
from concurrent.futures import Future
import aqt.main
from anki.errors import Interrupted
from aqt.operations import (
ClosedCollectionOpWithBackendProgress,
CollectionOpWithBackendProgress,
)
from aqt.qt import *
from aqt.utils import askUser, getFile, showInfo, showText, showWarning, tooltip, tr
def import_file(mw: aqt.main.AnkiQt) -> None:
if not (path := get_file_path(mw)):
return
filename = os.path.basename(path).lower()
if filename.endswith(".anki"):
showInfo(tr.importing_anki_files_are_from_a_very())
elif filename.endswith(".anki2"):
showInfo(tr.importing_anki2_files_are_not_directly_importable())
elif is_collection_package(filename):
maybe_import_collection_package(mw, path)
elif filename.endswith(".apkg"):
import_anki_package(mw, path)
else:
raise NotImplementedError
def get_file_path(mw: aqt.main.AnkiQt) -> str | None:
if file := getFile(
mw,
tr.actions_import(),
None,
key="import",
filter=tr.importing_packaged_anki_deckcollection_apkg_colpkg_zip(),
):
return str(file)
return None
def is_collection_package(filename: str) -> bool:
return (
filename == "collection.apkg"
or (filename.startswith("backup-") and filename.endswith(".apkg"))
or filename.endswith(".colpkg")
)
def maybe_import_collection_package(mw: aqt.main.AnkiQt, path: str) -> None:
if askUser(
tr.importing_this_will_delete_your_existing_collection(),
msgfunc=QMessageBox.warning,
defaultno=True,
):
import_collection_package(mw, path)
def import_collection_package(mw: aqt.main.AnkiQt, file: str) -> None:
def on_success(_future: Future) -> None:
mw.loadCollection()
tooltip(tr.importing_importing_complete())
def on_failure(err: Exception) -> None:
mw.loadCollection()
if not isinstance(err, Interrupted):
showWarning(str(err))
import_collection_package_op(mw, file).success(on_success).failure(
on_failure
).run_in_background()
def import_collection_package_op(
mw: aqt.main.AnkiQt, path: str
) -> ClosedCollectionOpWithBackendProgress:
def op() -> None:
col_path = mw.pm.collectionPath()
media_folder = os.path.join(mw.pm.profileFolder(), "collection.media")
mw.backend.import_collection_package(
col_path=col_path, backup_path=path, media_folder=media_folder
)
return ClosedCollectionOpWithBackendProgress(
parent=mw,
op=op,
key="importing",
)
def import_anki_package(mw: aqt.main.AnkiQt, path: str) -> None:
CollectionOpWithBackendProgress(
parent=mw,
op=lambda col: col.import_anki_package(path),
key="importing",
).success(show_import_log).run_in_background()
def show_import_log(future: Future) -> None:
log = future.log # type: ignore
total = len(log.conflicting) + len(log.updated) + len(log.new) + len(log.duplicate)
text = f"""{tr.importing_notes_found_in_file(val=total)}
{tr.importing_notes_that_could_not_be_imported(val=len(log.conflicting))}
{tr.importing_notes_updated_as_file_had_newer(val=len(log.updated))}
{tr.importing_notes_added_from_file(val=len(log.new))}
{tr.importing_notes_skipped_as_theyre_already_in(val=len(log.duplicate))}
{log_rows(log.conflicting, tr.importing_skipped())}
{log_rows(log.updated, tr.importing_updated())}
{log_rows(log.new, tr.adding_added())}
{log_rows(log.duplicate, tr.importing_identical())}
"""
showText(text, plain_text_edit=True)
def log_rows(rows: list, action: str) -> str:
return "\n".join(f"[{action}] {', '.join(note.fields)}" for note in rows)