switch to classes for hooks

This allows us to add a docstring to .append() so users can see
the names of the arguments that are being passed, and means we
don't have to remember to prepend run_ when calling a hook.
This commit is contained in:
Damien Elmes 2020-01-14 08:54:07 +10:00
parent 9d84f8fb70
commit 662eb53e6a
19 changed files with 629 additions and 311 deletions

View file

@ -87,7 +87,7 @@ class Card:
self.usn = self.col.usn()
# bug check
if self.queue == 2 and self.odue and not self.col.decks.isDyn(self.did):
hooks.run_odue_invalid_hook()
hooks.odue_invalid_hook()
assert self.due < 4294967296
self.col.db.execute(
"""
@ -119,7 +119,7 @@ insert or replace into cards values
self.usn = self.col.usn()
# bug checks
if self.queue == 2 and self.odue and not self.col.decks.isDyn(self.did):
hooks.run_odue_invalid_hook()
hooks.odue_invalid_hook()
assert self.due < 4294967296
self.col.db.execute(
"""update cards set

View file

@ -272,7 +272,7 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""",
def modSchema(self, check: bool) -> None:
"Mark schema modified. Call this first so user can abort if necessary."
if not self.schemaChanged():
if check and not hooks.run_mod_schema_filter(proceed=True):
if check and not hooks.mod_schema_filter(proceed=True):
raise AnkiError("abortSchemaMod")
self.scm = intTime(1000)
self.setMod()
@ -372,7 +372,7 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""",
strids = ids2str(ids)
# we need to log these independently of cards, as one side may have
# more card templates
hooks.run_remove_notes_hook(self, ids)
hooks.remove_notes_hook(self, ids)
self._logRem(ids, REM_NOTE)
self.db.execute("delete from notes where id in %s" % strids)
@ -665,12 +665,12 @@ where c.nid = n.id and c.id in %s group by nid"""
fields["c%d" % (card_ord + 1)] = "1"
# allow add-ons to modify the available fields
hooks.run_modify_fields_for_rendering_hook(fields, model, data)
hooks.modify_fields_for_rendering_hook(fields, model, data)
fields = runFilter("mungeFields", fields, model, data, self) # legacy
# and the template prior to rendering
qfmt = hooks.run_original_card_template_filter(qfmt, True)
afmt = hooks.run_original_card_template_filter(qfmt, False)
qfmt = hooks.original_card_template_filter(qfmt, True)
afmt = hooks.original_card_template_filter(qfmt, False)
# render fields
qatext = render_card(self, qfmt, afmt, fields, card_ord)
@ -678,7 +678,7 @@ where c.nid = n.id and c.id in %s group by nid"""
# allow add-ons to modify the generated result
for type in "q", "a":
ret[type] = hooks.run_rendered_card_template_filter(
ret[type] = hooks.rendered_card_template_filter(
ret[type], type, fields, model, data, self
)

View file

@ -165,7 +165,7 @@ class DeckManager:
self.decks[str(id)] = g
self.save(g)
self.maybeAddToActive()
hooks.run_deck_created_hook(g)
hooks.deck_created_hook(g)
return int(id)
def rem(self, did: int, cardsToo: bool = False, childrenToo: bool = True) -> None:

View file

@ -347,7 +347,7 @@ class AnkiPackageExporter(AnkiExporter):
else:
z.write(mpath, cStr, zipfile.ZIP_STORED)
media[cStr] = unicodedata.normalize("NFC", file)
hooks.run_exported_media_files_hook(c)
hooks.exported_media_files_hook(c)
return media
@ -417,5 +417,5 @@ def exporters() -> List[Tuple[str, Any]]:
id(TextNoteExporter),
id(TextCardExporter),
]
hooks.run_create_exporters_list_hook(exps)
hooks.create_exporters_list_hook(exps)
return exps

View file

@ -40,7 +40,7 @@ class Finder:
flag=self._findFlag,
)
self.search["is"] = self._findCardState
hooks.run_prepare_searches_hook(self.search)
hooks.prepare_searches_hook(self.search)
def findCards(self, query, order=False) -> Any:
"Return a list of card ids for QUERY."

View file

@ -33,248 +33,497 @@ from anki.types import QAData
#
# @@AUTOGEN@@
create_exporters_list_hook: List[Callable[[List[Tuple[str, Any]]], None]] = []
deck_created_hook: List[Callable[[Dict[str, Any]], None]] = []
exported_media_files_hook: List[Callable[[int], None]] = []
field_replacement_filter: List[Callable[[str, str, str, Dict[str, str]], str]] = []
http_data_received_hook: List[Callable[[int], None]] = []
http_data_sent_hook: List[Callable[[int], None]] = []
leech_hook: List[Callable[[Card], None]] = []
mod_schema_filter: List[Callable[[bool], bool]] = []
modify_fields_for_rendering_hook: List[
Callable[[Dict[str, str], Dict[str, Any], QAData], None]
] = []
note_type_created_hook: List[Callable[[Dict[str, Any]], None]] = []
odue_invalid_hook: List[Callable[[], None]] = []
original_card_template_filter: List[Callable[[str, bool], str]] = []
prepare_searches_hook: List[Callable[[Dict[str, Callable]], None]] = []
remove_notes_hook: List[Callable[[anki.storage._Collection, List[int]], None]] = []
rendered_card_template_filter: List[
Callable[
[str, str, Dict[str, str], Dict[str, Any], QAData, anki.storage._Collection],
str,
]
] = []
sync_progress_message_hook: List[Callable[[str], None]] = []
sync_stage_hook: List[Callable[[str], None]] = []
tag_created_hook: List[Callable[[str], None]] = []
class CreateExportersListHook:
_hooks: List[Callable[[List[Tuple[str, Any]]], None]] = []
def append(self, cb: Callable[[List[Tuple[str, Any]]], None]) -> None:
"""(exporters: List[Tuple[str, Any]])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[List[Tuple[str, Any]]], None]) -> None:
self._hooks.remove(cb)
def __call__(self, exporters: List[Tuple[str, Any]]) -> None:
for hook in self._hooks:
try:
hook(exporters)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("exportersList", exporters)
def run_create_exporters_list_hook(exporters: List[Tuple[str, Any]]) -> None:
for hook in create_exporters_list_hook:
try:
hook(exporters)
except:
# if the hook fails, remove it
create_exporters_list_hook.remove(hook)
raise
# legacy support
runHook("exportersList", exporters)
create_exporters_list_hook = CreateExportersListHook()
def run_deck_created_hook(deck: Dict[str, Any]) -> None:
for hook in deck_created_hook:
try:
hook(deck)
except:
# if the hook fails, remove it
deck_created_hook.remove(hook)
raise
# legacy support
runHook("newDeck")
class DeckCreatedHook:
_hooks: List[Callable[[Dict[str, Any]], None]] = []
def append(self, cb: Callable[[Dict[str, Any]], None]) -> None:
"""(deck: Dict[str, Any])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Dict[str, Any]], None]) -> None:
self._hooks.remove(cb)
def __call__(self, deck: Dict[str, Any]) -> None:
for hook in self._hooks:
try:
hook(deck)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("newDeck")
def run_exported_media_files_hook(count: int) -> None:
for hook in exported_media_files_hook:
try:
hook(count)
except:
# if the hook fails, remove it
exported_media_files_hook.remove(hook)
raise
deck_created_hook = DeckCreatedHook()
def run_field_replacement_filter(
field_text: str, field_name: str, filter_name: str, fields: Dict[str, str]
) -> str:
for filter in field_replacement_filter:
try:
field_text = filter(field_text, field_name, filter_name, fields)
except:
# if the hook fails, remove it
field_replacement_filter.remove(filter)
raise
return field_text
class ExportedMediaFilesHook:
_hooks: List[Callable[[int], None]] = []
def append(self, cb: Callable[[int], None]) -> None:
"""(count: int)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[int], None]) -> None:
self._hooks.remove(cb)
def __call__(self, count: int) -> None:
for hook in self._hooks:
try:
hook(count)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
def run_http_data_received_hook(bytes: int) -> None:
for hook in http_data_received_hook:
try:
hook(bytes)
except:
# if the hook fails, remove it
http_data_received_hook.remove(hook)
raise
exported_media_files_hook = ExportedMediaFilesHook()
def run_http_data_sent_hook(bytes: int) -> None:
for hook in http_data_sent_hook:
try:
hook(bytes)
except:
# if the hook fails, remove it
http_data_sent_hook.remove(hook)
raise
class FieldReplacementFilter:
_hooks: List[Callable[[str, str, str, Dict[str, str]], str]] = []
def append(self, cb: Callable[[str, str, str, Dict[str, str]], str]) -> None:
"""(field_text: str, field_name: str, filter_name: str, fields: Dict[str, str])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[str, str, str, Dict[str, str]], str]) -> None:
self._hooks.remove(cb)
def __call__(
self, field_text: str, field_name: str, filter_name: str, fields: Dict[str, str]
) -> str:
for filter in self._hooks:
try:
field_text = filter(field_text, field_name, filter_name, fields)
except:
# if the hook fails, remove it
self._hooks.remove(filter)
raise
return field_text
def run_leech_hook(card: Card) -> None:
for hook in leech_hook:
try:
hook(card)
except:
# if the hook fails, remove it
leech_hook.remove(hook)
raise
# legacy support
runHook("leech", card)
field_replacement_filter = FieldReplacementFilter()
def run_mod_schema_filter(proceed: bool) -> bool:
for filter in mod_schema_filter:
try:
proceed = filter(proceed)
except:
# if the hook fails, remove it
mod_schema_filter.remove(filter)
raise
return proceed
class HttpDataReceivedHook:
_hooks: List[Callable[[int], None]] = []
def append(self, cb: Callable[[int], None]) -> None:
"""(bytes: int)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[int], None]) -> None:
self._hooks.remove(cb)
def __call__(self, bytes: int) -> None:
for hook in self._hooks:
try:
hook(bytes)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
def run_modify_fields_for_rendering_hook(
fields: Dict[str, str], notetype: Dict[str, Any], data: QAData
) -> None:
for hook in modify_fields_for_rendering_hook:
try:
hook(fields, notetype, data)
except:
# if the hook fails, remove it
modify_fields_for_rendering_hook.remove(hook)
raise
http_data_received_hook = HttpDataReceivedHook()
def run_note_type_created_hook(notetype: Dict[str, Any]) -> None:
for hook in note_type_created_hook:
try:
hook(notetype)
except:
# if the hook fails, remove it
note_type_created_hook.remove(hook)
raise
# legacy support
runHook("newModel")
class HttpDataSentHook:
_hooks: List[Callable[[int], None]] = []
def append(self, cb: Callable[[int], None]) -> None:
"""(bytes: int)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[int], None]) -> None:
self._hooks.remove(cb)
def __call__(self, bytes: int) -> None:
for hook in self._hooks:
try:
hook(bytes)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
def run_odue_invalid_hook() -> None:
for hook in odue_invalid_hook:
try:
hook()
except:
# if the hook fails, remove it
odue_invalid_hook.remove(hook)
raise
http_data_sent_hook = HttpDataSentHook()
def run_original_card_template_filter(template: str, question_side: bool) -> str:
for filter in original_card_template_filter:
try:
template = filter(template, question_side)
except:
# if the hook fails, remove it
original_card_template_filter.remove(filter)
raise
return template
class LeechHook:
_hooks: List[Callable[[Card], None]] = []
def append(self, cb: Callable[[Card], None]) -> None:
"""(card: Card)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Card], None]) -> None:
self._hooks.remove(cb)
def __call__(self, card: Card) -> None:
for hook in self._hooks:
try:
hook(card)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("leech", card)
def run_prepare_searches_hook(searches: Dict[str, Callable]) -> None:
for hook in prepare_searches_hook:
try:
hook(searches)
except:
# if the hook fails, remove it
prepare_searches_hook.remove(hook)
raise
# legacy support
runHook("search", searches)
leech_hook = LeechHook()
def run_remove_notes_hook(col: anki.storage._Collection, ids: List[int]) -> None:
for hook in remove_notes_hook:
try:
hook(col, ids)
except:
# if the hook fails, remove it
remove_notes_hook.remove(hook)
raise
# legacy support
runHook("remNotes", col, ids)
class ModSchemaFilter:
_hooks: List[Callable[[bool], bool]] = []
def append(self, cb: Callable[[bool], bool]) -> None:
"""(proceed: bool)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[bool], bool]) -> None:
self._hooks.remove(cb)
def __call__(self, proceed: bool) -> bool:
for filter in self._hooks:
try:
proceed = filter(proceed)
except:
# if the hook fails, remove it
self._hooks.remove(filter)
raise
return proceed
def run_rendered_card_template_filter(
text: str,
side: str,
fields: Dict[str, str],
notetype: Dict[str, Any],
data: QAData,
col: anki.storage._Collection,
) -> str:
for filter in rendered_card_template_filter:
try:
text = filter(text, side, fields, notetype, data, col)
except:
# if the hook fails, remove it
rendered_card_template_filter.remove(filter)
raise
# legacy support
runFilter("mungeQA", text, side, fields, notetype, data, col)
return text
mod_schema_filter = ModSchemaFilter()
def run_sync_progress_message_hook(msg: str) -> None:
for hook in sync_progress_message_hook:
try:
hook(msg)
except:
# if the hook fails, remove it
sync_progress_message_hook.remove(hook)
raise
# legacy support
runHook("syncMsg", msg)
class ModifyFieldsForRenderingHook:
_hooks: List[Callable[[Dict[str, str], Dict[str, Any], QAData], None]] = []
def append(
self, cb: Callable[[Dict[str, str], Dict[str, Any], QAData], None]
) -> None:
"""(fields: Dict[str, str], notetype: Dict[str, Any], data: QAData)"""
self._hooks.append(cb)
def remove(
self, cb: Callable[[Dict[str, str], Dict[str, Any], QAData], None]
) -> None:
self._hooks.remove(cb)
def __call__(
self, fields: Dict[str, str], notetype: Dict[str, Any], data: QAData
) -> None:
for hook in self._hooks:
try:
hook(fields, notetype, data)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
def run_sync_stage_hook(stage: str) -> None:
for hook in sync_stage_hook:
try:
hook(stage)
except:
# if the hook fails, remove it
sync_stage_hook.remove(hook)
raise
# legacy support
runHook("sync", stage)
modify_fields_for_rendering_hook = ModifyFieldsForRenderingHook()
def run_tag_created_hook(tag: str) -> None:
for hook in tag_created_hook:
try:
hook(tag)
except:
# if the hook fails, remove it
tag_created_hook.remove(hook)
raise
# legacy support
runHook("newTag")
class NoteTypeCreatedHook:
_hooks: List[Callable[[Dict[str, Any]], None]] = []
def append(self, cb: Callable[[Dict[str, Any]], None]) -> None:
"""(notetype: Dict[str, Any])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Dict[str, Any]], None]) -> None:
self._hooks.remove(cb)
def __call__(self, notetype: Dict[str, Any]) -> None:
for hook in self._hooks:
try:
hook(notetype)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("newModel")
note_type_created_hook = NoteTypeCreatedHook()
class OdueInvalidHook:
_hooks: List[Callable[[], None]] = []
def append(self, cb: Callable[[], None]) -> None:
"""()"""
self._hooks.append(cb)
def remove(self, cb: Callable[[], None]) -> None:
self._hooks.remove(cb)
def __call__(self) -> None:
for hook in self._hooks:
try:
hook()
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
odue_invalid_hook = OdueInvalidHook()
class OriginalCardTemplateFilter:
_hooks: List[Callable[[str, bool], str]] = []
def append(self, cb: Callable[[str, bool], str]) -> None:
"""(template: str, question_side: bool)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[str, bool], str]) -> None:
self._hooks.remove(cb)
def __call__(self, template: str, question_side: bool) -> str:
for filter in self._hooks:
try:
template = filter(template, question_side)
except:
# if the hook fails, remove it
self._hooks.remove(filter)
raise
return template
original_card_template_filter = OriginalCardTemplateFilter()
class PrepareSearchesHook:
_hooks: List[Callable[[Dict[str, Callable]], None]] = []
def append(self, cb: Callable[[Dict[str, Callable]], None]) -> None:
"""(searches: Dict[str, Callable])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Dict[str, Callable]], None]) -> None:
self._hooks.remove(cb)
def __call__(self, searches: Dict[str, Callable]) -> None:
for hook in self._hooks:
try:
hook(searches)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("search", searches)
prepare_searches_hook = PrepareSearchesHook()
class RemoveNotesHook:
_hooks: List[Callable[[anki.storage._Collection, List[int]], None]] = []
def append(self, cb: Callable[[anki.storage._Collection, List[int]], None]) -> None:
"""(col: anki.storage._Collection, ids: List[int])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[anki.storage._Collection, List[int]], None]) -> None:
self._hooks.remove(cb)
def __call__(self, col: anki.storage._Collection, ids: List[int]) -> None:
for hook in self._hooks:
try:
hook(col, ids)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("remNotes", col, ids)
remove_notes_hook = RemoveNotesHook()
class RenderedCardTemplateFilter:
_hooks: List[
Callable[
[
str,
str,
Dict[str, str],
Dict[str, Any],
QAData,
anki.storage._Collection,
],
str,
]
] = []
def append(
self,
cb: Callable[
[
str,
str,
Dict[str, str],
Dict[str, Any],
QAData,
anki.storage._Collection,
],
str,
],
) -> None:
"""(text: str, side: str, fields: Dict[str, str], notetype: Dict[str, Any], data: QAData, col: anki.storage._Collection)"""
self._hooks.append(cb)
def remove(
self,
cb: Callable[
[
str,
str,
Dict[str, str],
Dict[str, Any],
QAData,
anki.storage._Collection,
],
str,
],
) -> None:
self._hooks.remove(cb)
def __call__(
self,
text: str,
side: str,
fields: Dict[str, str],
notetype: Dict[str, Any],
data: QAData,
col: anki.storage._Collection,
) -> str:
for filter in self._hooks:
try:
text = filter(text, side, fields, notetype, data, col)
except:
# if the hook fails, remove it
self._hooks.remove(filter)
raise
# legacy support
runFilter("mungeQA", text, side, fields, notetype, data, col)
return text
rendered_card_template_filter = RenderedCardTemplateFilter()
class SyncProgressMessageHook:
_hooks: List[Callable[[str], None]] = []
def append(self, cb: Callable[[str], None]) -> None:
"""(msg: str)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[str], None]) -> None:
self._hooks.remove(cb)
def __call__(self, msg: str) -> None:
for hook in self._hooks:
try:
hook(msg)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("syncMsg", msg)
sync_progress_message_hook = SyncProgressMessageHook()
class SyncStageHook:
_hooks: List[Callable[[str], None]] = []
def append(self, cb: Callable[[str], None]) -> None:
"""(stage: str)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[str], None]) -> None:
self._hooks.remove(cb)
def __call__(self, stage: str) -> None:
for hook in self._hooks:
try:
hook(stage)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("sync", stage)
sync_stage_hook = SyncStageHook()
class TagCreatedHook:
_hooks: List[Callable[[str], None]] = []
def append(self, cb: Callable[[str], None]) -> None:
"""(tag: str)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[str], None]) -> None:
self._hooks.remove(cb)
def __call__(self, tag: str) -> None:
for hook in self._hooks:
try:
hook(tag)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("newTag")
tag_created_hook = TagCreatedHook()
# @@AUTOGEN@@
# Legacy hook handling

View file

@ -10,6 +10,8 @@ from dataclasses import dataclass
from operator import attrgetter
from typing import List, Optional
import stringcase
@dataclass
class Hook:
@ -52,11 +54,31 @@ class Hook:
else:
return "hook"
def classname(self) -> str:
return stringcase.pascalcase(self.full_name())
def list_code(self) -> str:
return f"""\
{self.full_name()}: List[{self.callable()}] = []
_hooks: List[{self.callable()}] = []
"""
def code(self) -> str:
doc = f"({', '.join(self.args or [])})"
code = f"""\
class {self.classname()}:
{self.list_code()}
def append(self, cb: {self.callable()}) -> None:
'''{doc}'''
self._hooks.append(cb)
def remove(self, cb: {self.callable()}) -> None:
self._hooks.remove(cb)
{self.fire_code()}
{self.full_name()} = {self.classname()}()
"""
return code
def fire_code(self) -> str:
if self.return_type is not None:
# filter
@ -74,43 +96,45 @@ class Hook:
def hook_fire_code(self) -> str:
arg_names = self.arg_names()
args_including_self = ["self"] + (self.args or [])
out = f"""\
def run_{self.full_name()}({", ".join(self.args or [])}) -> None:
for hook in {self.full_name()}:
try:
hook({", ".join(arg_names)})
except:
# if the hook fails, remove it
{self.full_name()}.remove(hook)
raise
def __call__({", ".join(args_including_self)}) -> None:
for hook in self._hooks:
try:
hook({", ".join(arg_names)})
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
"""
if self.legacy_hook:
out += f"""\
# legacy support
runHook({self.legacy_args()})
# legacy support
runHook({self.legacy_args()})
"""
return out + "\n\n"
def filter_fire_code(self) -> str:
arg_names = self.arg_names()
args_including_self = ["self"] + (self.args or [])
out = f"""\
def run_{self.full_name()}({", ".join(self.args or [])}) -> {self.return_type}:
for filter in {self.full_name()}:
try:
{arg_names[0]} = filter({", ".join(arg_names)})
except:
# if the hook fails, remove it
{self.full_name()}.remove(filter)
raise
def __call__({", ".join(args_including_self)}) -> {self.return_type}:
for filter in self._hooks:
try:
{arg_names[0]} = filter({", ".join(arg_names)})
except:
# if the hook fails, remove it
self._hooks.remove(filter)
raise
"""
if self.legacy_hook:
out += f"""\
# legacy support
runFilter({self.legacy_args()})
# legacy support
runFilter({self.legacy_args()})
"""
out += f"""\
return {arg_names[0]}
return {arg_names[0]}
"""
return out + "\n\n"
@ -119,10 +143,7 @@ def update_file(path: str, hooks: List[Hook]):
hooks.sort(key=attrgetter("name"))
code = ""
for hook in hooks:
code += hook.list_code()
code += "\n\n"
for hook in hooks:
code += hook.fire_code()
code += hook.code()
orig = open(path).read()
new = re.sub(

View file

@ -183,5 +183,5 @@ def _errMsg(type: str, texpath: str) -> Any:
return msg
# setup q/a filter
hooks.rendered_card_template_filter.append(mungeQA)
# setup q/a filter - type ignored due to import cycle
hooks.rendered_card_template_filter.append(mungeQA) # type: ignore

View file

@ -107,7 +107,7 @@ class ModelManager:
if templates:
self._syncTemplates(m)
self.changed = True
hooks.run_note_type_created_hook(m)
hooks.note_type_created_hook(m)
def flush(self) -> None:
"Flush the registry if any models were changed."

View file

@ -1150,7 +1150,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
card.odue = card.odid = 0
card.queue = -1
# notify UI
hooks.run_leech_hook(card)
hooks.leech_hook(card)
return True
# Tools

View file

@ -1270,7 +1270,7 @@ where id = ?
if a == 0:
card.queue = -1
# notify UI
hooks.run_leech_hook(card)
hooks.leech_hook(card)
return True
return None

View file

@ -55,7 +55,7 @@ class Syncer:
self.col.save()
# step 1: login & metadata
hooks.run_sync_stage_hook("login")
hooks.sync_stage_hook("login")
meta = self.server.meta()
self.col.log("rmeta", meta)
if not meta:
@ -95,7 +95,7 @@ class Syncer:
self.col.log("basic check")
return "basicCheckFailed"
# step 2: startup and deletions
hooks.run_sync_stage_hook("meta")
hooks.sync_stage_hook("meta")
rrem = self.server.start(
minUsn=self.minUsn, lnewer=self.lnewer, offset=self.col.localOffset()
)
@ -118,31 +118,31 @@ class Syncer:
self.server.abort()
return self._forceFullSync()
# step 3: stream large tables from server
hooks.run_sync_stage_hook("server")
hooks.sync_stage_hook("server")
while 1:
hooks.run_sync_stage_hook("stream")
hooks.sync_stage_hook("stream")
chunk = self.server.chunk()
self.col.log("server chunk", chunk)
self.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 4: stream to server
hooks.run_sync_stage_hook("client")
hooks.sync_stage_hook("client")
while 1:
hooks.run_sync_stage_hook("stream")
hooks.sync_stage_hook("stream")
chunk = self.chunk()
self.col.log("client chunk", chunk)
self.server.applyChunk(chunk=chunk)
if chunk["done"]:
break
# step 5: sanity check
hooks.run_sync_stage_hook("sanity")
hooks.sync_stage_hook("sanity")
c = self.sanityCheck()
ret = self.server.sanityCheck2(client=c)
if ret["status"] != "ok":
return self._forceFullSync()
# finalize
hooks.run_sync_stage_hook("finalize")
hooks.sync_stage_hook("finalize")
mod = self.server.finish()
self.finish(mod)
return "success"
@ -501,7 +501,7 @@ class AnkiRequestsClient:
buf = io.BytesIO()
for chunk in resp.iter_content(chunk_size=HTTP_BUF_SIZE):
hooks.run_http_data_received_hook(len(chunk))
hooks.http_data_received_hook(len(chunk))
buf.write(chunk)
return buf.getvalue()
@ -523,7 +523,7 @@ if os.environ.get("ANKI_NOVERIFYSSL"):
class _MonitoringFile(io.BufferedReader):
def read(self, size=-1) -> bytes:
data = io.BufferedReader.read(self, HTTP_BUF_SIZE)
hooks.run_http_data_sent_hook(len(data))
hooks.http_data_sent_hook(len(data))
return data
@ -707,13 +707,13 @@ class FullSyncer(HttpSyncer):
self.col = col
def download(self) -> Optional[str]:
hooks.run_sync_stage_hook("download")
hooks.sync_stage_hook("download")
localNotEmpty = self.col.db.scalar("select 1 from cards")
self.col.close()
cont = self.req("download")
tpath = self.col.path + ".tmp"
if cont == "upgradeRequired":
hooks.run_sync_stage_hook("upgradeRequired")
hooks.sync_stage_hook("upgradeRequired")
return None
open(tpath, "wb").write(cont)
# check the received file is ok
@ -733,7 +733,7 @@ class FullSyncer(HttpSyncer):
def upload(self) -> bool:
"True if upload successful."
hooks.run_sync_stage_hook("upload")
hooks.sync_stage_hook("upload")
# make sure it's ok before we try to upload
if self.col.db.scalar("pragma integrity_check") != "ok":
return False
@ -765,7 +765,7 @@ class MediaSyncer:
def sync(self) -> Any:
# check if there have been any changes
hooks.run_sync_stage_hook("findMedia")
hooks.sync_stage_hook("findMedia")
self.col.log("findChanges")
try:
self.col.media.findChanges()
@ -835,7 +835,7 @@ class MediaSyncer:
if not fnames:
break
hooks.run_sync_progress_message_hook(
hooks.sync_progress_message_hook(
ngettext(
"%d media change to upload", "%d media changes to upload", toSend
)
@ -886,7 +886,7 @@ class MediaSyncer:
fnames = fnames[cnt:]
n = self.downloadCount
hooks.run_sync_progress_message_hook(
hooks.sync_progress_message_hook(
ngettext("%d media file downloaded", "%d media files downloaded", n)
% n,
)

View file

@ -50,7 +50,7 @@ class TagManager:
self.tags[t] = self.col.usn() if usn is None else usn
self.changed = True
if found:
hooks.run_tag_created_hook(t) # pylint: disable=undefined-loop-variable
hooks.tag_created_hook(t) # pylint: disable=undefined-loop-variable
def all(self) -> List:
return list(self.tags.keys())

View file

@ -72,7 +72,7 @@ def apply_custom_filters(
field_text = node.current_text
for filter_name in node.filters:
field_text = hooks.run_field_replacement_filter(
field_text = hooks.field_replacement_filter(
field_text, node.field_name, filter_name, fields
)
# legacy hook - the second and fifth argument are no longer used

View file

@ -26,3 +26,5 @@ ignore_missing_imports = True
ignore_missing_imports = True
[mypy-ankirspy]
ignore_missing_imports = True
[mypy-stringcase]
ignore_missing_imports = True

View file

@ -7,3 +7,4 @@ pytest
git+https://github.com/ankitects/isort#egg=isort
# fixme: when pylint supports isort 5.0, switch to pypi
git+https://github.com/ankitects/pylint#egg=pylint
stringcase==1.2.0

View file

@ -19,56 +19,101 @@ from anki.hooks import runFilter, runHook # pylint: disable=unused-import
#
# @@AUTOGEN@@
mpv_idle_hook: List[Callable[[], None]] = []
mpv_will_play_hook: List[Callable[[str], None]] = []
reviewer_showing_answer_hook: List[Callable[[Card], None]] = []
reviewer_showing_question_hook: List[Callable[[Card], None]] = []
class MpvIdleHook:
_hooks: List[Callable[[], None]] = []
def append(self, cb: Callable[[], None]) -> None:
"""()"""
self._hooks.append(cb)
def remove(self, cb: Callable[[], None]) -> None:
self._hooks.remove(cb)
def __call__(self) -> None:
for hook in self._hooks:
try:
hook()
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
def run_mpv_idle_hook() -> None:
for hook in mpv_idle_hook:
try:
hook()
except:
# if the hook fails, remove it
mpv_idle_hook.remove(hook)
raise
mpv_idle_hook = MpvIdleHook()
def run_mpv_will_play_hook(file: str) -> None:
for hook in mpv_will_play_hook:
try:
hook(file)
except:
# if the hook fails, remove it
mpv_will_play_hook.remove(hook)
raise
# legacy support
runHook("mpvWillPlay", file)
class MpvWillPlayHook:
_hooks: List[Callable[[str], None]] = []
def append(self, cb: Callable[[str], None]) -> None:
"""(file: str)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[str], None]) -> None:
self._hooks.remove(cb)
def __call__(self, file: str) -> None:
for hook in self._hooks:
try:
hook(file)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("mpvWillPlay", file)
def run_reviewer_showing_answer_hook(card: Card) -> None:
for hook in reviewer_showing_answer_hook:
try:
hook(card)
except:
# if the hook fails, remove it
reviewer_showing_answer_hook.remove(hook)
raise
# legacy support
runHook("showAnswer")
mpv_will_play_hook = MpvWillPlayHook()
def run_reviewer_showing_question_hook(card: Card) -> None:
for hook in reviewer_showing_question_hook:
try:
hook(card)
except:
# if the hook fails, remove it
reviewer_showing_question_hook.remove(hook)
raise
# legacy support
runHook("showQuestion")
class ReviewerShowingAnswerHook:
_hooks: List[Callable[[Card], None]] = []
def append(self, cb: Callable[[Card], None]) -> None:
"""(card: Card)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Card], None]) -> None:
self._hooks.remove(cb)
def __call__(self, card: Card) -> None:
for hook in self._hooks:
try:
hook(card)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("showAnswer")
reviewer_showing_answer_hook = ReviewerShowingAnswerHook()
class ReviewerShowingQuestionHook:
_hooks: List[Callable[[Card], None]] = []
def append(self, cb: Callable[[Card], None]) -> None:
"""(card: Card)"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Card], None]) -> None:
self._hooks.remove(cb)
def __call__(self, card: Card) -> None:
for hook in self._hooks:
try:
hook(card)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("showQuestion")
reviewer_showing_question_hook = ReviewerShowingQuestionHook()
# @@AUTOGEN@@

View file

@ -200,7 +200,7 @@ The front of this card is empty. Please run Tools>Empty Cards."""
if self.typeCorrect:
self.mw.web.setFocus()
# user hook
gui_hooks.run_reviewer_showing_question_hook(c)
gui_hooks.reviewer_showing_question_hook(c)
def autoplay(self, card):
return self.mw.col.decks.confForDid(card.odid or card.did)["autoplay"]
@ -235,7 +235,7 @@ The front of this card is empty. Please run Tools>Empty Cards."""
self.web.eval("_showAnswer(%s);" % json.dumps(a))
self._showEaseButtons()
# user hook
gui_hooks.run_reviewer_showing_answer_hook(c)
gui_hooks.reviewer_showing_answer_hook(c)
# Answering a card
############################################################

View file

@ -158,7 +158,7 @@ class MpvManager(MPV):
super().__init__(window_id=None, debug=False)
def queueFile(self, file: str) -> None:
gui_hooks.run_mpv_will_play_hook(file)
gui_hooks.mpv_will_play_hook(file)
path = os.path.join(os.getcwd(), file)
self.command("loadfile", path, "append-play")
@ -173,7 +173,7 @@ class MpvManager(MPV):
self.command("seek", secs, "relative")
def on_idle(self) -> None:
gui_hooks.run_mpv_idle_hook()
gui_hooks.mpv_idle_hook()
def setMpvConfigBase(base) -> None: