From 9afbcd4178a746a599a5c50648ddc85d3c559cd0 Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Sat, 21 Mar 2020 16:38:46 +1000 Subject: [PATCH] remove old finder code; add search hooks to browser & remove old one --- pylib/anki/find.py | 489 +-------------------------------------- pylib/anki/hooks.py | 26 --- pylib/tests/test_find.py | 27 --- pylib/tools/genhooks.py | 5 - qt/aqt/browser.py | 31 +-- qt/aqt/gui_hooks.py | 62 +++++ qt/tools/genhooks_gui.py | 20 ++ 7 files changed, 107 insertions(+), 553 deletions(-) diff --git a/pylib/anki/find.py b/pylib/anki/find.py index 5beb1d044..8ecc8211e 100644 --- a/pylib/anki/find.py +++ b/pylib/anki/find.py @@ -4,500 +4,25 @@ from __future__ import annotations import re -import sre_constants -import unicodedata -from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, Union, cast +from typing import TYPE_CHECKING, Optional, Set -from anki import hooks -from anki.consts import * from anki.hooks import * -from anki.utils import ( - fieldChecksum, - ids2str, - intTime, - joinFields, - splitFields, - stripHTMLMedia, -) +from anki.utils import ids2str, intTime, joinFields, splitFields, stripHTMLMedia if TYPE_CHECKING: from anki.collection import _Collection -# Find -########################################################################## - class Finder: def __init__(self, col: Optional[_Collection]) -> None: self.col = col.weakref() - self.search = dict( - added=self._findAdded, - card=self._findTemplate, - deck=self._findDeck, - mid=self._findMid, - nid=self._findNids, - cid=self._findCids, - note=self._findModel, - prop=self._findProp, - rated=self._findRated, - tag=self._findTag, - dupe=self._findDupes, - flag=self._findFlag, - ) - self.search["is"] = self._findCardState - hooks.search_terms_prepared(self.search) + print("Finder() is deprecated, please use col.find_cards() or .find_notes()") - def findCards(self, query: str, order: Union[bool, str] = False) -> List[Any]: - "Return a list of card ids for QUERY." - tokens = self._tokenize(query) - preds, args = self._where(tokens) - if preds is None: - raise Exception("invalidSearch") - order, rev = self._order(order) - sql = self._query(preds, order) - try: - res = self.col.db.list(sql, *args) - except: - # invalid grouping - return [] - if rev: - res.reverse() - return res + def findCards(self, query, order): + return self.col.find_cards(query, order) - def findNotes(self, query: str) -> List[Any]: - tokens = self._tokenize(query) - preds, args = self._where(tokens) - if preds is None: - return [] - if preds: - preds = "(" + preds + ")" - else: - preds = "1" - sql = ( - """ -select distinct(n.id) from cards c, notes n where c.nid=n.id and """ - + preds - ) - try: - res = self.col.db.list(sql, *args) - except: - # invalid grouping - return [] - return res - - # Tokenizing - ###################################################################### - - def _tokenize(self, query: str) -> List[str]: - inQuote: Union[bool, str] = False - tokens = [] - token = "" - for c in query: - # quoted text - if c in ("'", '"'): - if inQuote: - if c == inQuote: - inQuote = False - else: - token += c - elif token: - # quotes are allowed to start directly after a : - if token[-1] == ":": - inQuote = c - else: - token += c - else: - inQuote = c - # separator (space and ideographic space) - elif c in (" ", "\u3000"): - if inQuote: - token += c - elif token: - # space marks token finished - tokens.append(token) - token = "" - # nesting - elif c in ("(", ")"): - if inQuote: - token += c - else: - if c == ")" and token: - tokens.append(token) - token = "" - tokens.append(c) - # negation - elif c == "-": - if token: - token += c - elif not tokens or tokens[-1] != "-": - tokens.append("-") - # normal character - else: - token += c - # if we finished in a token, add it - if token: - tokens.append(token) - return tokens - - # Query building - ###################################################################### - - def _where(self, tokens: List[str]) -> Tuple[str, Optional[List[str]]]: - # state and query - s: Dict[str, Any] = dict(isnot=False, isor=False, join=False, q="", bad=False) - args: List[Any] = [] - - def add(txt, wrap=True): - # failed command? - if not txt: - # if it was to be negated then we can just ignore it - if s["isnot"]: - s["isnot"] = False - return None, None - else: - s["bad"] = True - return None, None - elif txt == "skip": - return None, None - # do we need a conjunction? - if s["join"]: - if s["isor"]: - s["q"] += " or " - s["isor"] = False - else: - s["q"] += " and " - if s["isnot"]: - s["q"] += " not " - s["isnot"] = False - if wrap: - txt = "(" + txt + ")" - s["q"] += txt - s["join"] = True - - for token in tokens: - if s["bad"]: - return None, None - # special tokens - if token == "-": - s["isnot"] = True - elif token.lower() == "or": - s["isor"] = True - elif token == "(": - add(token, wrap=False) - s["join"] = False - elif token == ")": - s["q"] += ")" - # commands - elif ":" in token: - cmd, val = token.split(":", 1) - cmd = cmd.lower() - if cmd in self.search: - add(self.search[cmd]((val, args))) - else: - add(self._findField(cmd, val)) - # normal text search - else: - add(self._findText(token, args)) - if s["bad"]: - return None, None - return s["q"], args - - def _query(self, preds: str, order: str) -> str: - # can we skip the note table? - if "n." not in preds and "n." not in order: - sql = "select c.id from cards c where " - else: - sql = "select c.id from cards c, notes n where c.nid=n.id and " - # combine with preds - if preds: - sql += "(" + preds + ")" - else: - sql += "1" - # order - if order: - sql += " " + order - return sql - - # Ordering - ###################################################################### - - def _order(self, order: Union[bool, str]) -> Tuple[str, bool]: - if not order: - return "", False - elif order is not True: - # custom order string provided - return " order by " + cast(str, order), False - # use deck default - type = self.col.conf["sortType"] - sort = None - if type.startswith("note"): - if type == "noteCrt": - sort = "n.id, c.ord" - elif type == "noteMod": - sort = "n.mod, c.ord" - elif type == "noteFld": - sort = "n.sfld collate nocase, c.ord" - elif type.startswith("card"): - if type == "cardMod": - sort = "c.mod" - elif type == "cardReps": - sort = "c.reps" - elif type == "cardDue": - sort = "c.type, c.due" - elif type == "cardEase": - sort = f"c.type == {CARD_TYPE_NEW}, c.factor" - elif type == "cardLapses": - sort = "c.lapses" - elif type == "cardIvl": - sort = "c.ivl" - if not sort: - # deck has invalid sort order; revert to noteCrt - sort = "n.id, c.ord" - return " order by " + sort, self.col.conf["sortBackwards"] - - # Commands - ###################################################################### - - def _findTag(self, args: Tuple[str, List[Any]]) -> str: - (val, list_args) = args - if val == "none": - return 'n.tags = ""' - val = val.replace("*", "%") - if not val.startswith("%"): - val = "% " + val - if not val.endswith("%") or val.endswith("\\%"): - val += " %" - list_args.append(val) - return "n.tags like ? escape '\\'" - - def _findCardState(self, args: Tuple[str, List[Any]]) -> Optional[str]: - (val, __) = args - if val in ("review", "new", "learn"): - if val == "review": - n = 2 - elif val == "new": - n = CARD_TYPE_NEW - else: - return f"queue in ({QUEUE_TYPE_LRN}, {QUEUE_TYPE_DAY_LEARN_RELEARN})" - return "type = %d" % n - elif val == "suspended": - return "c.queue = -1" - elif val == "buried": - return f"c.queue in ({QUEUE_TYPE_SIBLING_BURIED}, {QUEUE_TYPE_MANUALLY_BURIED})" - elif val == "due": - return f""" -(c.queue in ({QUEUE_TYPE_REV},{QUEUE_TYPE_DAY_LEARN_RELEARN}) and c.due <= %d) or -(c.queue = {QUEUE_TYPE_LRN} and c.due <= %d)""" % ( - self.col.sched.today, - self.col.sched.dayCutoff, - ) - else: - # unknown - return None - - def _findFlag(self, args: Tuple[str, List[Any]]) -> Optional[str]: - (val, __) = args - if not val or len(val) != 1 or val not in "01234": - return None - mask = 2 ** 3 - 1 - return "(c.flags & %d) == %d" % (mask, int(val)) - - def _findRated(self, args: Tuple[str, List[Any]]) -> Optional[str]: - # days(:optional_ease) - (val, __) = args - r = val.split(":") - try: - days = int(r[0]) - except ValueError: - return None - days = min(days, 31) - # ease - ease = "" - if len(r) > 1: - if r[1] not in ("1", "2", "3", "4"): - return None - ease = "and ease=%s" % r[1] - cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000 - return "c.id in (select cid from revlog where id>%d %s)" % (cutoff, ease) - - def _findAdded(self, args: Tuple[str, List[Any]]) -> Optional[str]: - (val, __) = args - try: - days = int(val) - except ValueError: - return None - cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000 - return "c.id > %d" % cutoff - - def _findProp(self, args: Tuple[str, List[Any]]) -> Optional[str]: - # extract - (strval, __) = args - m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", strval) - if not m: - return None - prop, cmp, strval = m.groups() - prop = prop.lower() # pytype: disable=attribute-error - # is val valid? - try: - if prop == "ease": - val = float(strval) - else: - val = int(strval) - except ValueError: - return None - # is prop valid? - if prop not in ("due", "ivl", "reps", "lapses", "ease"): - return None - # query - q = [] - if prop == "due": - val += self.col.sched.today - # only valid for review/daily learning - q.append(f"(c.queue in ({QUEUE_TYPE_REV},{QUEUE_TYPE_DAY_LEARN_RELEARN}))") - elif prop == "ease": - prop = "factor" - val = int(val * 1000) - q.append("(%s %s %s)" % (prop, cmp, val)) - return " and ".join(q) - - def _findText(self, val: str, args: List[str]) -> str: - val = val.replace("*", "%") - args.append("%" + val + "%") - args.append("%" + val + "%") - return "(n.sfld like ? escape '\\' or n.flds like ? escape '\\')" - - def _findNids(self, args: Tuple[str, List[Any]]) -> Optional[str]: - (val, __) = args - if re.search("[^0-9,]", val): - return None - return "n.id in (%s)" % val - - def _findCids(self, args) -> Optional[str]: - (val, __) = args - if re.search("[^0-9,]", val): - return None - return "c.id in (%s)" % val - - def _findMid(self, args) -> Optional[str]: - (val, __) = args - if re.search("[^0-9]", val): - return None - return "n.mid = %s" % val - - def _findModel(self, args: Tuple[str, List[Any]]) -> str: - (val, __) = args - ids = [] - val = val.lower() - for m in self.col.models.all(): - if unicodedata.normalize("NFC", m["name"].lower()) == val: - ids.append(m["id"]) - return "n.mid in %s" % ids2str(ids) - - def _findDeck(self, args: Tuple[str, List[Any]]) -> Optional[str]: - # if searching for all decks, skip - (val, __) = args - if val == "*": - return "skip" - # deck types - elif val == "filtered": - return "c.odid" - - def dids(did): - if not did: - return None - return [did] + [a[1] for a in self.col.decks.children(did)] - - # current deck? - ids = None - if val.lower() == "current": - ids = dids(self.col.decks.current()["id"]) - elif "*" not in val: - # single deck - ids = dids(self.col.decks.id(val, create=False)) - else: - # wildcard - ids = set() - val = re.escape(val).replace(r"\*", ".*") - for d in self.col.decks.all(): - if re.match("(?i)" + val, unicodedata.normalize("NFC", d["name"])): - ids.update(dids(d["id"])) - if not ids: - return None - sids = ids2str(ids) - return "c.did in %s or c.odid in %s" % (sids, sids) - - def _findTemplate(self, args: Tuple[str, List[Any]]) -> str: - # were we given an ordinal number? - (val, __) = args - try: - num = int(val) - 1 - except: - num = None - if num is not None: - return "c.ord = %d" % num - # search for template names - lims = [] - for m in self.col.models.all(): - for t in m["tmpls"]: - if unicodedata.normalize("NFC", t["name"].lower()) == val.lower(): - if m["type"] == MODEL_CLOZE: - # if the user has asked for a cloze card, we want - # to give all ordinals, so we just limit to the - # model instead - lims.append("(n.mid = %s)" % m["id"]) - else: - lims.append("(n.mid = %s and c.ord = %s)" % (m["id"], t["ord"])) - return " or ".join(lims) - - def _findField(self, field: str, val: str) -> Optional[str]: - field = field.lower() - val = val.replace("*", "%") - # find models that have that field - mods = {} - for m in self.col.models.all(): - for f in m["flds"]: - if unicodedata.normalize("NFC", f["name"].lower()) == field: - mods[str(m["id"])] = (m, f["ord"]) - if not mods: - # nothing has that field - return None - # gather nids - regex = re.escape(val).replace("_", ".").replace(re.escape("%"), ".*") - nids = [] - for (id, mid, flds) in self.col.db.execute( - """ -select id, mid, flds from notes -where mid in %s and flds like ? escape '\\'""" - % (ids2str(list(mods.keys()))), - "%" + val + "%", - ): - flds = splitFields(flds) - ord = mods[str(mid)][1] - strg = flds[ord] - try: - if re.search("(?si)^" + regex + "$", strg): - nids.append(id) - except sre_constants.error: - return None - if not nids: - return "0" - return "n.id in %s" % ids2str(nids) - - def _findDupes(self, args) -> Optional[str]: - # caller must call stripHTMLMedia on passed val - (val, __) = args - try: - mid, val = val.split(",", 1) - except OSError: - return None - csum = fieldChecksum(val) - nids = [] - for nid, flds in self.col.db.execute( - "select id, flds from notes where mid=? and csum=?", mid, csum - ): - if stripHTMLMedia(splitFields(flds)[0]) == val: - nids.append(nid) - return "n.id in %s" % ids2str(nids) + def findNotes(self, query): + return self.col.find_notes(query) # Find and replace diff --git a/pylib/anki/hooks.py b/pylib/anki/hooks.py index 912d42fb3..2439eff3c 100644 --- a/pylib/anki/hooks.py +++ b/pylib/anki/hooks.py @@ -492,32 +492,6 @@ class _SchemaWillChangeFilter: schema_will_change = _SchemaWillChangeFilter() -class _SearchTermsPreparedHook: - _hooks: List[Callable[[Dict[str, Callable]], None]] = [] - - def append(self, cb: Callable[[Dict[str, Callable]], None]) -> None: - """(searches: Dict[str, Callable])""" - self._hooks.append(cb) - - def remove(self, cb: Callable[[Dict[str, Callable]], None]) -> None: - if cb in self._hooks: - self._hooks.remove(cb) - - def __call__(self, searches: Dict[str, Callable]) -> None: - for hook in self._hooks: - try: - hook(searches) - except: - # if the hook fails, remove it - self._hooks.remove(hook) - raise - # legacy support - runHook("search", searches) - - -search_terms_prepared = _SearchTermsPreparedHook() - - class _SyncProgressDidChangeHook: _hooks: List[Callable[[str], None]] = [] diff --git a/pylib/tests/test_find.py b/pylib/tests/test_find.py index 2591211e2..16d604b17 100644 --- a/pylib/tests/test_find.py +++ b/pylib/tests/test_find.py @@ -2,7 +2,6 @@ import pytest from anki.consts import * -from anki.find import Finder from tests.shared import getEmptyCol @@ -11,32 +10,6 @@ class DummyCollection: return None -def test_parse(): - f = Finder(DummyCollection()) - assert f._tokenize("hello world") == ["hello", "world"] - assert f._tokenize("hello world") == ["hello", "world"] - assert f._tokenize("one -two") == ["one", "-", "two"] - assert f._tokenize("one --two") == ["one", "-", "two"] - assert f._tokenize("one - two") == ["one", "-", "two"] - assert f._tokenize("one or -two") == ["one", "or", "-", "two"] - assert f._tokenize("'hello \"world\"'") == ['hello "world"'] - assert f._tokenize('"hello world"') == ["hello world"] - assert f._tokenize("one (two or ( three or four))") == [ - "one", - "(", - "two", - "or", - "(", - "three", - "or", - "four", - ")", - ")", - ] - assert f._tokenize("embedded'string") == ["embedded'string"] - assert f._tokenize("deck:'two words'") == ["deck:two words"] - - def test_findCards(): deck = getEmptyCol() f = deck.newNote() diff --git a/pylib/tools/genhooks.py b/pylib/tools/genhooks.py index 465a064cc..4ab2e90e8 100644 --- a/pylib/tools/genhooks.py +++ b/pylib/tools/genhooks.py @@ -37,11 +37,6 @@ hooks = [ args=["exporters: List[Tuple[str, Any]]"], legacy_hook="exportersList", ), - Hook( - name="search_terms_prepared", - args=["searches: Dict[str, Callable]"], - legacy_hook="search", - ), Hook( name="note_type_added", args=["notetype: Dict[str, Any]"], diff --git a/qt/aqt/browser.py b/qt/aqt/browser.py index efcea412d..3b5abeb7e 100644 --- a/qt/aqt/browser.py +++ b/qt/aqt/browser.py @@ -13,7 +13,7 @@ import unicodedata from dataclasses import dataclass from enum import Enum from operator import itemgetter -from typing import Callable, List, Optional, Union +from typing import Callable, List, Optional, Sequence, Union import anki import aqt.forms @@ -69,6 +69,13 @@ class FindDupesDialog: browser: Browser +@dataclass +class SearchContext: + search: str + # if set, provided card ids will be used instead of the regular search + card_ids: Optional[Sequence[int]] = None + + # Data model ########################################################################## @@ -82,7 +89,7 @@ class DataModel(QAbstractTableModel): self.activeCols = self.col.conf.get( "activeCols", ["noteFld", "template", "cardDue", "deck"] ) - self.cards: List[int] = [] + self.cards: Sequence[int] = [] self.cardObjs: Dict[int, Card] = {} def getCard(self, index: QModelIndex) -> Card: @@ -169,23 +176,21 @@ class DataModel(QAbstractTableModel): # Filtering ###################################################################### - def search(self, txt): + def search(self, txt: str) -> None: self.beginReset() - t = time.time() - # the db progress handler may cause a refresh, so we need to zero out - # old data first self.cards = [] invalid = False try: - self.cards = self.col.findCards(txt, order=True) + ctx = SearchContext(search=txt) + gui_hooks.browser_will_search(ctx) + if ctx.card_ids is None: + ctx.card_ids = self.col.find_cards(txt) + gui_hooks.browser_did_search(ctx) + self.cards = ctx.card_ids except Exception as e: - if str(e) == "invalidSearch": - self.cards = [] - invalid = True - else: - raise + print("search failed:", e) + invalid = True finally: - # print "fetch cards in %dms" % ((time.time() - t)*1000) self.endReset() if invalid: diff --git a/qt/aqt/gui_hooks.py b/qt/aqt/gui_hooks.py index 30a9ec0a7..a066504ab 100644 --- a/qt/aqt/gui_hooks.py +++ b/qt/aqt/gui_hooks.py @@ -359,6 +359,32 @@ class _BrowserDidChangeRowHook: browser_did_change_row = _BrowserDidChangeRowHook() +class _BrowserDidSearchHook: + """Allows you to modify the list of returned card ids from a search.""" + + _hooks: List[Callable[["aqt.browser.SearchContext"], None]] = [] + + def append(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None: + """(context: aqt.browser.SearchContext)""" + self._hooks.append(cb) + + def remove(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None: + if cb in self._hooks: + self._hooks.remove(cb) + + def __call__(self, context: aqt.browser.SearchContext) -> None: + for hook in self._hooks: + try: + hook(context) + except: + # if the hook fails, remove it + self._hooks.remove(hook) + raise + + +browser_did_search = _BrowserDidSearchHook() + + class _BrowserMenusDidInitHook: _hooks: List[Callable[["aqt.browser.Browser"], None]] = [] @@ -484,6 +510,42 @@ class _BrowserWillBuildTreeFilter: browser_will_build_tree = _BrowserWillBuildTreeFilter() +class _BrowserWillSearchHook: + """Allows you to modify the search text, or perform your own search. + + You can modify context.search to change the text that is sent to the + searching backend. + + If you set context.card_ids to a list of ids, the regular search will + not be performed, and the provided ids will be used instead. + + Your add-on should check if context.card_ids is not None, and return + without making changes if it has been set. + """ + + _hooks: List[Callable[["aqt.browser.SearchContext"], None]] = [] + + def append(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None: + """(context: aqt.browser.SearchContext)""" + self._hooks.append(cb) + + def remove(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None: + if cb in self._hooks: + self._hooks.remove(cb) + + def __call__(self, context: aqt.browser.SearchContext) -> None: + for hook in self._hooks: + try: + hook(context) + except: + # if the hook fails, remove it + self._hooks.remove(hook) + raise + + +browser_will_search = _BrowserWillSearchHook() + + class _BrowserWillShowHook: _hooks: List[Callable[["aqt.browser.Browser"], None]] = [] diff --git a/qt/tools/genhooks_gui.py b/qt/tools/genhooks_gui.py index 21f8de560..be3232a92 100644 --- a/qt/tools/genhooks_gui.py +++ b/qt/tools/genhooks_gui.py @@ -235,6 +235,26 @@ hooks = [ return True """, ), + Hook( + name="browser_will_search", + args=["context: aqt.browser.SearchContext"], + doc="""Allows you to modify the search text, or perform your own search. + + You can modify context.search to change the text that is sent to the + searching backend. + + If you set context.card_ids to a list of ids, the regular search will + not be performed, and the provided ids will be used instead. + + Your add-on should check if context.card_ids is not None, and return + without making changes if it has been set. + """, + ), + Hook( + name="browser_did_search", + args=["context: aqt.browser.SearchContext"], + doc="""Allows you to modify the list of returned card ids from a search.""", + ), # States ################### Hook(