mirror of
https://github.com/ankitects/anki.git
synced 2025-09-18 22:12:21 -04:00

The progress messages are only really intended to be consumed by Anki. If consumption by add-ons was expected, we'd be better off keeping the wrapper, as the API for oneofs in Python is quite awkward to use.
217 lines
6.8 KiB
Python
217 lines
6.8 KiB
Python
# Copyright: Ankitects Pty Ltd and contributors
|
|
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
|
|
|
from __future__ import annotations
|
|
|
|
import itertools
|
|
import time
|
|
from concurrent.futures import Future
|
|
from typing import Iterable, List, Optional, Sequence, TypeVar
|
|
|
|
import aqt
|
|
from anki.collection import SearchTerm
|
|
from anki.errors import Interrupted
|
|
from anki.lang import TR
|
|
from anki.media import CheckMediaOut
|
|
from aqt.qt import *
|
|
from aqt.utils import (
|
|
askUser,
|
|
disable_help_button,
|
|
restoreGeom,
|
|
saveGeom,
|
|
showText,
|
|
tooltip,
|
|
tr,
|
|
)
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
def chunked_list(l: Iterable[T], n: int) -> Iterable[List[T]]:
|
|
l = iter(l)
|
|
while True:
|
|
res = list(itertools.islice(l, n))
|
|
if not res:
|
|
return
|
|
yield res
|
|
|
|
|
|
def check_media_db(mw: aqt.AnkiQt) -> None:
|
|
c = MediaChecker(mw)
|
|
c.check()
|
|
|
|
|
|
class MediaChecker:
|
|
progress_dialog: Optional[aqt.progress.ProgressDialog]
|
|
|
|
def __init__(self, mw: aqt.AnkiQt) -> None:
|
|
self.mw = mw
|
|
self._progress_timer: Optional[QTimer] = None
|
|
|
|
def check(self) -> None:
|
|
self.progress_dialog = self.mw.progress.start()
|
|
self._set_progress_enabled(True)
|
|
self.mw.taskman.run_in_background(self._check, self._on_finished)
|
|
|
|
def _set_progress_enabled(self, enabled: bool) -> None:
|
|
if self._progress_timer:
|
|
self._progress_timer.stop()
|
|
self._progress_timer = None
|
|
if enabled:
|
|
self._progress_timer = self.mw.progress.timer(100, self._on_progress, True)
|
|
|
|
def _on_progress(self) -> None:
|
|
progress = self.mw.col.latest_progress()
|
|
if not progress.HasField("media_check"):
|
|
return
|
|
label = progress.media_check
|
|
|
|
try:
|
|
if self.progress_dialog.wantCancel:
|
|
self.mw.col.set_wants_abort()
|
|
except AttributeError:
|
|
# dialog may not be active
|
|
pass
|
|
|
|
self.mw.taskman.run_on_main(lambda: self.mw.progress.update(label=label))
|
|
|
|
def _check(self) -> CheckMediaOut:
|
|
"Run the check on a background thread."
|
|
return self.mw.col.media.check()
|
|
|
|
def _on_finished(self, future: Future) -> None:
|
|
self._set_progress_enabled(False)
|
|
self.mw.progress.finish()
|
|
self.progress_dialog = None
|
|
|
|
exc = future.exception()
|
|
if isinstance(exc, Interrupted):
|
|
return
|
|
|
|
output: CheckMediaOut = future.result()
|
|
report = output.report
|
|
|
|
# show report and offer to delete
|
|
diag = QDialog(self.mw)
|
|
diag.setWindowTitle(tr(TR.MEDIA_CHECK_WINDOW_TITLE))
|
|
disable_help_button(diag)
|
|
layout = QVBoxLayout(diag)
|
|
diag.setLayout(layout)
|
|
text = QTextEdit()
|
|
text.setReadOnly(True)
|
|
text.setPlainText(report)
|
|
layout.addWidget(text)
|
|
box = QDialogButtonBox(QDialogButtonBox.Close)
|
|
layout.addWidget(box)
|
|
|
|
if output.unused:
|
|
b = QPushButton(tr(TR.MEDIA_CHECK_DELETE_UNUSED))
|
|
b.setAutoDefault(False)
|
|
box.addButton(b, QDialogButtonBox.RejectRole)
|
|
qconnect(b.clicked, lambda c: self._on_trash_files(output.unused))
|
|
|
|
if output.missing:
|
|
if any(map(lambda x: x.startswith("latex-"), output.missing)):
|
|
b = QPushButton(tr(TR.MEDIA_CHECK_RENDER_LATEX))
|
|
b.setAutoDefault(False)
|
|
box.addButton(b, QDialogButtonBox.RejectRole)
|
|
qconnect(b.clicked, self._on_render_latex)
|
|
|
|
if output.have_trash:
|
|
b = QPushButton(tr(TR.MEDIA_CHECK_EMPTY_TRASH))
|
|
b.setAutoDefault(False)
|
|
box.addButton(b, QDialogButtonBox.RejectRole)
|
|
qconnect(b.clicked, lambda c: self._on_empty_trash())
|
|
|
|
b = QPushButton(tr(TR.MEDIA_CHECK_RESTORE_TRASH))
|
|
b.setAutoDefault(False)
|
|
box.addButton(b, QDialogButtonBox.RejectRole)
|
|
qconnect(b.clicked, lambda c: self._on_restore_trash())
|
|
|
|
qconnect(box.rejected, diag.reject)
|
|
diag.setMinimumHeight(400)
|
|
diag.setMinimumWidth(500)
|
|
restoreGeom(diag, "checkmediadb")
|
|
diag.exec_()
|
|
saveGeom(diag, "checkmediadb")
|
|
|
|
def _on_render_latex(self) -> None:
|
|
self.progress_dialog = self.mw.progress.start()
|
|
try:
|
|
out = self.mw.col.media.render_all_latex(self._on_render_latex_progress)
|
|
if self.progress_dialog.wantCancel:
|
|
return
|
|
finally:
|
|
self.mw.progress.finish()
|
|
self.progress_dialog = None
|
|
|
|
if out is not None:
|
|
nid, err = out
|
|
aqt.dialogs.open("Browser", self.mw, search=(SearchTerm(nid=nid),))
|
|
showText(err, type="html")
|
|
else:
|
|
tooltip(tr(TR.MEDIA_CHECK_ALL_LATEX_RENDERED))
|
|
|
|
def _on_render_latex_progress(self, count: int) -> bool:
|
|
if self.progress_dialog.wantCancel:
|
|
return False
|
|
|
|
self.mw.progress.update(tr(TR.MEDIA_CHECK_CHECKED, count=count))
|
|
return True
|
|
|
|
def _on_trash_files(self, fnames: Sequence[str]) -> None:
|
|
if not askUser(tr(TR.MEDIA_CHECK_DELETE_UNUSED_CONFIRM)):
|
|
return
|
|
|
|
self.progress_dialog = self.mw.progress.start()
|
|
|
|
last_progress = time.time()
|
|
remaining = len(fnames)
|
|
total = len(fnames)
|
|
try:
|
|
for chunk in chunked_list(fnames, 25):
|
|
self.mw.col.media.trash_files(chunk)
|
|
remaining -= len(chunk)
|
|
if time.time() - last_progress >= 0.3:
|
|
self.mw.progress.update(
|
|
tr(TR.MEDIA_CHECK_FILES_REMAINING, count=remaining)
|
|
)
|
|
finally:
|
|
self.mw.progress.finish()
|
|
self.progress_dialog = None
|
|
|
|
tooltip(tr(TR.MEDIA_CHECK_DELETE_UNUSED_COMPLETE, count=total))
|
|
|
|
def _on_empty_trash(self) -> None:
|
|
self.progress_dialog = self.mw.progress.start()
|
|
self._set_progress_enabled(True)
|
|
|
|
def empty_trash() -> None:
|
|
self.mw.col.media.empty_trash()
|
|
|
|
def on_done(fut: Future) -> None:
|
|
self.mw.progress.finish()
|
|
self._set_progress_enabled(False)
|
|
# check for errors
|
|
fut.result()
|
|
|
|
tooltip(tr(TR.MEDIA_CHECK_TRASH_EMPTIED))
|
|
|
|
self.mw.taskman.run_in_background(empty_trash, on_done)
|
|
|
|
def _on_restore_trash(self) -> None:
|
|
self.progress_dialog = self.mw.progress.start()
|
|
self._set_progress_enabled(True)
|
|
|
|
def restore_trash() -> None:
|
|
self.mw.col.media.restore_trash()
|
|
|
|
def on_done(fut: Future) -> None:
|
|
self.mw.progress.finish()
|
|
self._set_progress_enabled(False)
|
|
# check for errors
|
|
fut.result()
|
|
|
|
tooltip(tr(TR.MEDIA_CHECK_TRASH_RESTORED))
|
|
|
|
self.mw.taskman.run_in_background(restore_trash, on_done)
|