update the rest of the anki/ hooks/filters

This commit is contained in:
Damien Elmes 2020-01-13 17:59:52 +10:00
parent f379167648
commit b86ae31907
13 changed files with 305 additions and 75 deletions

View file

@ -22,7 +22,7 @@ from anki.consts import *
from anki.db import DB
from anki.decks import DeckManager
from anki.errors import AnkiError
from anki.hooks import runFilter, runHook
from anki.hooks import runFilter
from anki.lang import _, ngettext
from anki.media import MediaManager
from anki.models import ModelManager
@ -372,7 +372,7 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""",
strids = ids2str(ids)
# we need to log these independently of cards, as one side may have
# more card templates
runHook("remNotes", self, ids)
hooks.run_remove_notes_hook(self, ids)
self._logRem(ids, REM_NOTE)
self.db.execute("delete from notes where id in %s" % strids)
@ -664,7 +664,9 @@ where c.nid = n.id and c.id in %s group by nid"""
fields["CardFlag"] = self._flagNameFromCardFlags(flag)
fields["c%d" % (card_ord + 1)] = "1"
fields = runFilter("mungeFields", fields, model, data, self)
# allow add-ons to modify the available fields
hooks.run_modify_fields_for_rendering_hook(fields, model, data)
fields = runFilter("mungeFields", fields, model, data, self) # legacy
# render fields
qatext = render_card(self, qfmt, afmt, fields, card_ord)
@ -672,7 +674,9 @@ where c.nid = n.id and c.id in %s group by nid"""
# allow add-ons to modify the generated result
for type in "q", "a":
ret[type] = runFilter("mungeQA", ret[type], type, fields, model, data, self)
ret[type] = hooks.run_rendered_card_template_filter(
ret[type], type, fields, model, data, self
)
# empty cloze?
if type == "q" and model["type"] == MODEL_CLOZE:

View file

@ -10,6 +10,7 @@ import unicodedata
from typing import Any, Dict, List, Optional, Set, Tuple, Union
import anki # pylint: disable=unused-import
from anki import hooks
from anki.consts import *
from anki.errors import DeckRenameError
from anki.hooks import runHook
@ -165,6 +166,8 @@ class DeckManager:
self.decks[str(id)] = g
self.save(g)
self.maybeAddToActive()
hooks.run_deck_created_hook(g)
# legacy hook did not pass deck
runHook("newDeck")
return int(id)

View file

@ -12,8 +12,8 @@ from io import BufferedWriter
from typing import Any, Dict, List, Tuple
from zipfile import ZipFile
from anki import hooks
from anki.collection import _Collection
from anki.hooks import runHook
from anki.lang import _
from anki.storage import Collection
from anki.utils import ids2str, namedtmp, splitFields, stripHTML
@ -347,7 +347,7 @@ class AnkiPackageExporter(AnkiExporter):
else:
z.write(mpath, cStr, zipfile.ZIP_STORED)
media[cStr] = unicodedata.normalize("NFC", file)
runHook("exportedMediaFiles", c)
hooks.run_exported_media_files_hook(c)
return media
@ -417,5 +417,5 @@ def exporters() -> List[Tuple[str, Any]]:
id(TextNoteExporter),
id(TextCardExporter),
]
runHook("exportersList", exps)
hooks.run_create_exporters_list_hook(exps)
return exps

View file

@ -6,6 +6,7 @@ import sre_constants
import unicodedata
from typing import Any, List, Optional, Set, Tuple
from anki import hooks
from anki.consts import *
from anki.hooks import *
from anki.utils import (
@ -39,7 +40,7 @@ class Finder:
flag=self._findFlag,
)
self.search["is"] = self._findCardState
runHook("search", self.search)
hooks.run_prepare_searches_hook(self.search)
def findCards(self, query, order=False) -> Any:
"Return a list of card ids for QUERY."

View file

@ -12,11 +12,13 @@ modifying it.
from __future__ import annotations
from typing import Any, Callable, Dict, List
from typing import Any, Callable, Dict, List, Tuple
import decorator
import anki
from anki.cards import Card
from anki.types import QAData
# New hook/filter handling
##############################################################################
@ -31,9 +33,94 @@ from anki.cards import Card
#
# @@AUTOGEN@@
create_exporters_list_hook: List[Callable[[List[Tuple[str, Any]]], None]] = []
deck_created_hook: List[Callable[[Dict[str, Any]], None]] = []
exported_media_files_hook: List[Callable[[int], None]] = []
field_replacement_filter: List[Callable[[str, str, str, Dict[str, str]], str]] = []
http_data_received_hook: List[Callable[[int], None]] = []
http_data_sent_hook: List[Callable[[int], None]] = []
leech_hook: List[Callable[[Card], None]] = []
mod_schema_filter: List[Callable[[bool], bool]] = []
modify_fields_for_rendering_hook: List[
Callable[[Dict[str, str], Dict[str, Any], QAData], None]
] = []
note_type_created_hook: List[Callable[[Dict[str, Any]], None]] = []
odue_invalid_hook: List[Callable[[], None]] = []
prepare_searches_hook: List[Callable[[Dict[str, Callable]], None]] = []
remove_notes_hook: List[Callable[[anki.storage._Collection, List[int]], None]] = []
rendered_card_template_filter: List[
Callable[
[str, str, Dict[str, str], Dict[str, Any], QAData, anki.storage._Collection],
str,
]
] = []
sync_stage_hook: List[Callable[[str], None]] = []
tag_created_hook: List[Callable[[str], None]] = []
def run_create_exporters_list_hook(exporters: List[Tuple[str, Any]]) -> None:
for hook in create_exporters_list_hook:
try:
hook(exporters)
except:
# if the hook fails, remove it
create_exporters_list_hook.remove(hook)
raise
# legacy support
runHook("exportersList", exporters)
def run_deck_created_hook(deck: Dict[str, Any]) -> None:
for hook in deck_created_hook:
try:
hook(deck)
except:
# if the hook fails, remove it
deck_created_hook.remove(hook)
raise
def run_exported_media_files_hook(count: int) -> None:
for hook in exported_media_files_hook:
try:
hook(count)
except:
# if the hook fails, remove it
exported_media_files_hook.remove(hook)
raise
def run_field_replacement_filter(
field_text: str, field_name: str, filter_name: str, fields: Dict[str, str]
) -> str:
for filter in field_replacement_filter:
try:
field_text = filter(field_text, field_name, filter_name, fields)
except:
# if the hook fails, remove it
field_replacement_filter.remove(filter)
raise
return field_text
def run_http_data_received_hook(bytes: int) -> None:
for hook in http_data_received_hook:
try:
hook(bytes)
except:
# if the hook fails, remove it
http_data_received_hook.remove(hook)
raise
def run_http_data_sent_hook(bytes: int) -> None:
for hook in http_data_sent_hook:
try:
hook(bytes)
except:
# if the hook fails, remove it
http_data_sent_hook.remove(hook)
raise
def run_leech_hook(card: Card) -> None:
@ -59,6 +146,28 @@ def run_mod_schema_filter(proceed: bool) -> bool:
return proceed
def run_modify_fields_for_rendering_hook(
fields: Dict[str, str], notetype: Dict[str, Any], data: QAData
) -> None:
for hook in modify_fields_for_rendering_hook:
try:
hook(fields, notetype, data)
except:
# if the hook fails, remove it
modify_fields_for_rendering_hook.remove(hook)
raise
def run_note_type_created_hook(notetype: Dict[str, Any]) -> None:
for hook in note_type_created_hook:
try:
hook(notetype)
except:
# if the hook fails, remove it
note_type_created_hook.remove(hook)
raise
def run_odue_invalid_hook() -> None:
for hook in odue_invalid_hook:
try:
@ -69,6 +178,70 @@ def run_odue_invalid_hook() -> None:
raise
def run_prepare_searches_hook(searches: Dict[str, Callable]) -> None:
for hook in prepare_searches_hook:
try:
hook(searches)
except:
# if the hook fails, remove it
prepare_searches_hook.remove(hook)
raise
# legacy support
runHook("search", searches)
def run_remove_notes_hook(col: anki.storage._Collection, ids: List[int]) -> None:
for hook in remove_notes_hook:
try:
hook(col, ids)
except:
# if the hook fails, remove it
remove_notes_hook.remove(hook)
raise
# legacy support
runHook("remNotes", col, ids)
def run_rendered_card_template_filter(
text: str,
side: str,
fields: Dict[str, str],
notetype: Dict[str, Any],
data: QAData,
col: anki.storage._Collection,
) -> str:
for filter in rendered_card_template_filter:
try:
text = filter(text, side, fields, notetype, data, col)
except:
# if the hook fails, remove it
rendered_card_template_filter.remove(filter)
raise
# legacy support
runFilter("mungeQA", text, side, fields, notetype, data, col)
return text
def run_sync_stage_hook(stage: str) -> None:
for hook in sync_stage_hook:
try:
hook(stage)
except:
# if the hook fails, remove it
sync_stage_hook.remove(hook)
raise
def run_tag_created_hook(tag: str) -> None:
for hook in tag_created_hook:
try:
hook(tag)
except:
# if the hook fails, remove it
tag_created_hook.remove(hook)
raise
# @@AUTOGEN@@
# Legacy hook handling

View file

@ -16,8 +16,8 @@ class Hook:
# the name of the hook. _filter or _hook is appending automatically.
name: str
# string of the typed arguments passed to the callback, eg
# "kind: str, val: int"
cb_args: str = ""
# ["kind: str", "val: int"]
args: List[str] = None
# string of the return type. if set, hook is a filter.
return_type: Optional[str] = None
# if add-ons may be relying on the legacy hook name, add it here
@ -26,9 +26,7 @@ class Hook:
def callable(self) -> str:
"Convert args into a Callable."
types = []
for arg in self.cb_args.split(","):
if not arg:
continue
for arg in self.args or []:
(name, type) = arg.split(":")
types.append(type.strip())
types_str = ", ".join(types)
@ -36,7 +34,7 @@ class Hook:
def arg_names(self) -> List[str]:
names = []
for arg in self.cb_args.split(","):
for arg in self.args or []:
if not arg:
continue
(name, type) = arg.split(":")
@ -68,7 +66,7 @@ class Hook:
def hook_fire_code(self) -> str:
arg_names = self.arg_names()
out = f"""\
def run_{self.full_name()}({self.cb_args}) -> None:
def run_{self.full_name()}({", ".join(self.args or [])}) -> None:
for hook in {self.full_name()}:
try:
hook({", ".join(arg_names)})
@ -88,7 +86,7 @@ def run_{self.full_name()}({self.cb_args}) -> None:
def filter_fire_code(self) -> str:
arg_names = self.arg_names()
out = f"""\
def run_{self.full_name()}({self.cb_args}) -> {self.return_type}:
def run_{self.full_name()}({", ".join(self.args or [])}) -> {self.return_type}:
for filter in {self.full_name()}:
try:
{arg_names[0]} = filter({", ".join(arg_names)})

View file

@ -1,15 +1,18 @@
# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import html
import os
import re
import shutil
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, Optional
from anki.hooks import addHook
import anki
from anki import hooks
from anki.lang import _
from anki.types import NoteType
from anki.types import NoteType, QAData
from anki.utils import call, checksum, isMac, namedtmp, stripHTML, tmpdir
pngCommands = [
@ -44,14 +47,15 @@ def stripLatex(text) -> Any:
return text
# media code and some add-ons depend on the current name
def mungeQA(
html: str,
type: Optional[str],
fields: Optional[Dict[str, str]],
type: str,
fields: Dict[str, str],
model: NoteType,
data: Optional[List[Union[int, str]]],
col,
) -> Any:
data: QAData,
col: anki.storage._Collection,
) -> str:
"Convert TEXT with embedded latex tags to image links."
for match in regexps["standard"].finditer(html):
html = html.replace(match.group(), _imgLink(col, match.group(1), model))
@ -180,4 +184,4 @@ def _errMsg(type: str, texpath: str) -> Any:
# setup q/a filter
addHook("mungeQA", mungeQA)
hooks.rendered_card_template_filter.append(mungeQA)

View file

@ -10,6 +10,7 @@ import time
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
import anki # pylint: disable=unused-import
from anki import hooks
from anki.consts import *
from anki.hooks import runHook
from anki.lang import _
@ -107,6 +108,8 @@ class ModelManager:
if templates:
self._syncTemplates(m)
self.changed = True
hooks.run_note_type_created_hook(m)
# legacy hook did not pass note type
runHook("newModel")
def flush(self) -> None:

View file

@ -18,6 +18,7 @@ from anki.consts import *
from anki.db import DB, DBError
from anki.utils import checksum, devMode, ids2str, intTime, platDesc, versionWithBuild
from . import hooks
from .hooks import runHook
from .lang import ngettext
@ -55,7 +56,7 @@ class Syncer:
self.col.save()
# step 1: login & metadata
runHook("sync", "login")
hooks.run_sync_stage_hook("login")
meta = self.server.meta()
self.col.log("rmeta", meta)
if not meta:
@ -95,7 +96,7 @@ class Syncer:
self.col.log("basic check")
return "basicCheckFailed"
# step 2: startup and deletions
runHook("sync", "meta")
hooks.run_sync_stage_hook("meta")
rrem = self.server.start(
minUsn=self.minUsn, lnewer=self.lnewer, offset=self.col.localOffset()
)
@ -118,31 +119,31 @@ class Syncer:
self.server.abort()
return self._forceFullSync()
# step 3: stream large tables from server
runHook("sync", "server")
hooks.run_sync_stage_hook("server")
while 1:
runHook("sync", "stream")
hooks.run_sync_stage_hook("stream")
chunk = self.server.chunk()
self.col.log("server chunk", chunk)
self.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 4: stream to server
runHook("sync", "client")
hooks.run_sync_stage_hook("client")
while 1:
runHook("sync", "stream")
hooks.run_sync_stage_hook("stream")
chunk = self.chunk()
self.col.log("client chunk", chunk)
self.server.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 5: sanity check
runHook("sync", "sanity")
hooks.run_sync_stage_hook("sanity")
c = self.sanityCheck()
ret = self.server.sanityCheck2(client=c)
if ret["status"] != "ok":
return self._forceFullSync()
# finalize
runHook("sync", "finalize")
hooks.run_sync_stage_hook("finalize")
mod = self.server.finish()
self.finish(mod)
return "success"
@ -501,7 +502,7 @@ class AnkiRequestsClient:
buf = io.BytesIO()
for chunk in resp.iter_content(chunk_size=HTTP_BUF_SIZE):
runHook("httpRecv", len(chunk))
hooks.run_http_data_received_hook(len(chunk))
buf.write(chunk)
return buf.getvalue()
@ -523,7 +524,7 @@ if os.environ.get("ANKI_NOVERIFYSSL"):
class _MonitoringFile(io.BufferedReader):
def read(self, size=-1) -> bytes:
data = io.BufferedReader.read(self, HTTP_BUF_SIZE)
runHook("httpSend", len(data))
hooks.run_http_data_sent_hook(len(data))
return data
@ -707,13 +708,13 @@ class FullSyncer(HttpSyncer):
self.col = col
def download(self) -> Optional[str]:
runHook("sync", "download")
hooks.run_sync_stage_hook("download")
localNotEmpty = self.col.db.scalar("select 1 from cards")
self.col.close()
cont = self.req("download")
tpath = self.col.path + ".tmp"
if cont == "upgradeRequired":
runHook("sync", "upgradeRequired")
hooks.run_sync_stage_hook("upgradeRequired")
return None
open(tpath, "wb").write(cont)
# check the received file is ok
@ -733,7 +734,7 @@ class FullSyncer(HttpSyncer):
def upload(self) -> bool:
"True if upload successful."
runHook("sync", "upload")
hooks.run_sync_stage_hook("upload")
# make sure it's ok before we try to upload
if self.col.db.scalar("pragma integrity_check") != "ok":
return False
@ -765,7 +766,7 @@ class MediaSyncer:
def sync(self) -> Any:
# check if there have been any changes
runHook("sync", "findMedia")
hooks.run_sync_stage_hook("findMedia")
self.col.log("findChanges")
try:
self.col.media.findChanges()

View file

@ -16,6 +16,7 @@ import re
from typing import Callable, Dict, List, Tuple
import anki # pylint: disable=unused-import
from anki import hooks
from anki.hooks import runHook
from anki.utils import ids2str, intTime
@ -50,6 +51,7 @@ class TagManager:
self.tags[t] = self.col.usn() if usn is None else usn
self.changed = True
if found:
hooks.run_tag_created_hook(t) # pylint: disable=undefined-loop-variable
runHook("newTag")
def all(self) -> List:

View file

@ -12,18 +12,18 @@ and applied using the hook system. For example,
and then attempt to apply myfilter. If no add-ons have provided the filter,
the filter is skipped.
Add-ons can register a filter by adding a hook to "fmod_<filter name>".
As standard filters will not be run after a custom filter, it is up to the
add-on to do any further processing that is required.
Add-ons can register a filter with the following code:
The hook is called with the arguments
(field_text, filter_args, field_map, field_name, "").
The last argument is no longer used.
If the field name contains a hyphen, it is split on the hyphen, eg
{{foo-bar:baz}} calls fmod_foo with filter_args set to "bar".
from anki import hooks
hooks.field_replacement_filter.append(myfunc)
This will call myfunc, passing the field text in as the first argument.
Your function should decide if it wants to modify the text by checking
the filter_name argument, and then return the text whether it has been
modified or not.
A Python implementation of the standard filters is currently available in the
template_legacy.py file.
template_legacy.py file, using the legacy addHook() system.
"""
from __future__ import annotations
@ -32,6 +32,7 @@ import re
from typing import Dict, List, Optional, Tuple
import anki
from anki import hooks
from anki.hooks import runFilter
from anki.rsbackend import TemplateReplacementList
@ -44,7 +45,6 @@ def render_card(
card_ord: int,
) -> Tuple[str, str]:
"Renders the provided templates, returning rendered q & a text."
(qnodes, anodes) = col.backend.render_card(qfmt, afmt, fields, card_ord)
qtext = apply_custom_filters(qnodes, fields, front_side=None)
@ -70,30 +70,20 @@ def apply_custom_filters(
if node.field_name == "FrontSide" and front_side is not None:
node.current_text = front_side
res += apply_field_filters(
node.field_name, node.current_text, fields, node.filters
)
field_text = node.current_text
for filter_name in node.filters:
field_text = hooks.run_field_replacement_filter(
field_text, node.field_name, filter_name, fields
)
# legacy hook - the second and fifth argument are no longer used
field_text = runFilter(
"fmod_" + filter_name, field_text, "", fields, node.field_name, ""
)
res += field_text
return res
def apply_field_filters(
field_name: str, field_text: str, fields: Dict[str, str], filters: List[str]
) -> str:
"""Apply filters to field text, returning modified text."""
for filter in filters:
if "-" in filter:
filter_base, filter_args = filter.split("-", maxsplit=1)
else:
filter_base = filter
filter_args = ""
# the fifth argument is no longer used
field_text = runFilter(
"fmod_" + filter_base, field_text, filter_args, fields, field_name, ""
)
return field_text
# Cloze handling
##########################################################################

View file

@ -13,13 +13,64 @@ To add a new hook:
import os
from anki.hooks_gen import Hook, update_file
# Hook list
# Hook/filter list
######################################################################
hooks = [
Hook(name="leech", cb_args="card: Card", legacy_hook="leech"),
Hook(name="leech", args=["card: Card"], legacy_hook="leech"),
Hook(name="odue_invalid"),
Hook(name="mod_schema", cb_args="proceed: bool", return_type="bool"),
Hook(name="mod_schema", args=["proceed: bool"], return_type="bool"),
Hook(
name="remove_notes",
args=["col: anki.storage._Collection", "ids: List[int]"],
legacy_hook="remNotes",
),
Hook(name="deck_created", args=["deck: Dict[str, Any]"]),
Hook(name="exported_media_files", args=["count: int"]),
Hook(
name="create_exporters_list",
args=["exporters: List[Tuple[str, Any]]"],
legacy_hook="exportersList",
),
Hook(
name="prepare_searches",
args=["searches: Dict[str, Callable]"],
legacy_hook="search",
),
Hook(name="note_type_created", args=["notetype: Dict[str, Any]"]),
Hook(name="sync_stage", args=["stage: str"]),
Hook(name="http_data_sent", args=["bytes: int"]),
Hook(name="http_data_received", args=["bytes: int"]),
Hook(name="tag_created", args=["tag: str"]),
Hook(
name="modify_fields_for_rendering",
args=["fields: Dict[str, str]", "notetype: Dict[str, Any]", "data: QAData",],
),
Hook(
name="rendered_card_template",
args=[
"text: str",
"side: str",
"fields: Dict[str, str]",
"notetype: Dict[str, Any]",
"data: QAData",
# the hook in latex.py needs access to the collection and
# can't rely on the GUI's mw.col
"col: anki.storage._Collection",
],
return_type="str",
legacy_hook="mungeQA",
),
Hook(
name="field_replacement",
args=[
"field_text: str",
"field_name: str",
"filter_name: str",
"fields: Dict[str, str]",
],
return_type="str",
),
]
if __name__ == "__main__":

View file

@ -14,7 +14,7 @@ from anki.hooks_gen import Hook, update_file
hooks = [
Hook(name="mpv_idle"),
Hook(name="mpv_will_play", cb_args="file: str", legacy_hook="mpvWillPlay"),
Hook(name="mpv_will_play", args=["file: str"], legacy_hook="mpvWillPlay"),
]
if __name__ == "__main__":