remove old finder code; add search hooks to browser & remove old one

This commit is contained in:
Damien Elmes 2020-03-21 16:38:46 +10:00
parent 9696e959be
commit 9afbcd4178
7 changed files with 107 additions and 553 deletions

View file

@ -4,500 +4,25 @@
from __future__ import annotations
import re
import sre_constants
import unicodedata
from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, Union, cast
from typing import TYPE_CHECKING, Optional, Set
from anki import hooks
from anki.consts import *
from anki.hooks import *
from anki.utils import (
fieldChecksum,
ids2str,
intTime,
joinFields,
splitFields,
stripHTMLMedia,
)
from anki.utils import ids2str, intTime, joinFields, splitFields, stripHTMLMedia
if TYPE_CHECKING:
from anki.collection import _Collection
# Find
##########################################################################
class Finder:
def __init__(self, col: Optional[_Collection]) -> None:
self.col = col.weakref()
self.search = dict(
added=self._findAdded,
card=self._findTemplate,
deck=self._findDeck,
mid=self._findMid,
nid=self._findNids,
cid=self._findCids,
note=self._findModel,
prop=self._findProp,
rated=self._findRated,
tag=self._findTag,
dupe=self._findDupes,
flag=self._findFlag,
)
self.search["is"] = self._findCardState
hooks.search_terms_prepared(self.search)
print("Finder() is deprecated, please use col.find_cards() or .find_notes()")
def findCards(self, query: str, order: Union[bool, str] = False) -> List[Any]:
"Return a list of card ids for QUERY."
tokens = self._tokenize(query)
preds, args = self._where(tokens)
if preds is None:
raise Exception("invalidSearch")
order, rev = self._order(order)
sql = self._query(preds, order)
try:
res = self.col.db.list(sql, *args)
except:
# invalid grouping
return []
if rev:
res.reverse()
return res
def findCards(self, query, order):
return self.col.find_cards(query, order)
def findNotes(self, query: str) -> List[Any]:
tokens = self._tokenize(query)
preds, args = self._where(tokens)
if preds is None:
return []
if preds:
preds = "(" + preds + ")"
else:
preds = "1"
sql = (
"""
select distinct(n.id) from cards c, notes n where c.nid=n.id and """
+ preds
)
try:
res = self.col.db.list(sql, *args)
except:
# invalid grouping
return []
return res
# Tokenizing
######################################################################
def _tokenize(self, query: str) -> List[str]:
inQuote: Union[bool, str] = False
tokens = []
token = ""
for c in query:
# quoted text
if c in ("'", '"'):
if inQuote:
if c == inQuote:
inQuote = False
else:
token += c
elif token:
# quotes are allowed to start directly after a :
if token[-1] == ":":
inQuote = c
else:
token += c
else:
inQuote = c
# separator (space and ideographic space)
elif c in (" ", "\u3000"):
if inQuote:
token += c
elif token:
# space marks token finished
tokens.append(token)
token = ""
# nesting
elif c in ("(", ")"):
if inQuote:
token += c
else:
if c == ")" and token:
tokens.append(token)
token = ""
tokens.append(c)
# negation
elif c == "-":
if token:
token += c
elif not tokens or tokens[-1] != "-":
tokens.append("-")
# normal character
else:
token += c
# if we finished in a token, add it
if token:
tokens.append(token)
return tokens
# Query building
######################################################################
def _where(self, tokens: List[str]) -> Tuple[str, Optional[List[str]]]:
# state and query
s: Dict[str, Any] = dict(isnot=False, isor=False, join=False, q="", bad=False)
args: List[Any] = []
def add(txt, wrap=True):
# failed command?
if not txt:
# if it was to be negated then we can just ignore it
if s["isnot"]:
s["isnot"] = False
return None, None
else:
s["bad"] = True
return None, None
elif txt == "skip":
return None, None
# do we need a conjunction?
if s["join"]:
if s["isor"]:
s["q"] += " or "
s["isor"] = False
else:
s["q"] += " and "
if s["isnot"]:
s["q"] += " not "
s["isnot"] = False
if wrap:
txt = "(" + txt + ")"
s["q"] += txt
s["join"] = True
for token in tokens:
if s["bad"]:
return None, None
# special tokens
if token == "-":
s["isnot"] = True
elif token.lower() == "or":
s["isor"] = True
elif token == "(":
add(token, wrap=False)
s["join"] = False
elif token == ")":
s["q"] += ")"
# commands
elif ":" in token:
cmd, val = token.split(":", 1)
cmd = cmd.lower()
if cmd in self.search:
add(self.search[cmd]((val, args)))
else:
add(self._findField(cmd, val))
# normal text search
else:
add(self._findText(token, args))
if s["bad"]:
return None, None
return s["q"], args
def _query(self, preds: str, order: str) -> str:
# can we skip the note table?
if "n." not in preds and "n." not in order:
sql = "select c.id from cards c where "
else:
sql = "select c.id from cards c, notes n where c.nid=n.id and "
# combine with preds
if preds:
sql += "(" + preds + ")"
else:
sql += "1"
# order
if order:
sql += " " + order
return sql
# Ordering
######################################################################
def _order(self, order: Union[bool, str]) -> Tuple[str, bool]:
if not order:
return "", False
elif order is not True:
# custom order string provided
return " order by " + cast(str, order), False
# use deck default
type = self.col.conf["sortType"]
sort = None
if type.startswith("note"):
if type == "noteCrt":
sort = "n.id, c.ord"
elif type == "noteMod":
sort = "n.mod, c.ord"
elif type == "noteFld":
sort = "n.sfld collate nocase, c.ord"
elif type.startswith("card"):
if type == "cardMod":
sort = "c.mod"
elif type == "cardReps":
sort = "c.reps"
elif type == "cardDue":
sort = "c.type, c.due"
elif type == "cardEase":
sort = f"c.type == {CARD_TYPE_NEW}, c.factor"
elif type == "cardLapses":
sort = "c.lapses"
elif type == "cardIvl":
sort = "c.ivl"
if not sort:
# deck has invalid sort order; revert to noteCrt
sort = "n.id, c.ord"
return " order by " + sort, self.col.conf["sortBackwards"]
# Commands
######################################################################
def _findTag(self, args: Tuple[str, List[Any]]) -> str:
(val, list_args) = args
if val == "none":
return 'n.tags = ""'
val = val.replace("*", "%")
if not val.startswith("%"):
val = "% " + val
if not val.endswith("%") or val.endswith("\\%"):
val += " %"
list_args.append(val)
return "n.tags like ? escape '\\'"
def _findCardState(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
if val in ("review", "new", "learn"):
if val == "review":
n = 2
elif val == "new":
n = CARD_TYPE_NEW
else:
return f"queue in ({QUEUE_TYPE_LRN}, {QUEUE_TYPE_DAY_LEARN_RELEARN})"
return "type = %d" % n
elif val == "suspended":
return "c.queue = -1"
elif val == "buried":
return f"c.queue in ({QUEUE_TYPE_SIBLING_BURIED}, {QUEUE_TYPE_MANUALLY_BURIED})"
elif val == "due":
return f"""
(c.queue in ({QUEUE_TYPE_REV},{QUEUE_TYPE_DAY_LEARN_RELEARN}) and c.due <= %d) or
(c.queue = {QUEUE_TYPE_LRN} and c.due <= %d)""" % (
self.col.sched.today,
self.col.sched.dayCutoff,
)
else:
# unknown
return None
def _findFlag(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
if not val or len(val) != 1 or val not in "01234":
return None
mask = 2 ** 3 - 1
return "(c.flags & %d) == %d" % (mask, int(val))
def _findRated(self, args: Tuple[str, List[Any]]) -> Optional[str]:
# days(:optional_ease)
(val, __) = args
r = val.split(":")
try:
days = int(r[0])
except ValueError:
return None
days = min(days, 31)
# ease
ease = ""
if len(r) > 1:
if r[1] not in ("1", "2", "3", "4"):
return None
ease = "and ease=%s" % r[1]
cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000
return "c.id in (select cid from revlog where id>%d %s)" % (cutoff, ease)
def _findAdded(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
try:
days = int(val)
except ValueError:
return None
cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000
return "c.id > %d" % cutoff
def _findProp(self, args: Tuple[str, List[Any]]) -> Optional[str]:
# extract
(strval, __) = args
m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", strval)
if not m:
return None
prop, cmp, strval = m.groups()
prop = prop.lower() # pytype: disable=attribute-error
# is val valid?
try:
if prop == "ease":
val = float(strval)
else:
val = int(strval)
except ValueError:
return None
# is prop valid?
if prop not in ("due", "ivl", "reps", "lapses", "ease"):
return None
# query
q = []
if prop == "due":
val += self.col.sched.today
# only valid for review/daily learning
q.append(f"(c.queue in ({QUEUE_TYPE_REV},{QUEUE_TYPE_DAY_LEARN_RELEARN}))")
elif prop == "ease":
prop = "factor"
val = int(val * 1000)
q.append("(%s %s %s)" % (prop, cmp, val))
return " and ".join(q)
def _findText(self, val: str, args: List[str]) -> str:
val = val.replace("*", "%")
args.append("%" + val + "%")
args.append("%" + val + "%")
return "(n.sfld like ? escape '\\' or n.flds like ? escape '\\')"
def _findNids(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
if re.search("[^0-9,]", val):
return None
return "n.id in (%s)" % val
def _findCids(self, args) -> Optional[str]:
(val, __) = args
if re.search("[^0-9,]", val):
return None
return "c.id in (%s)" % val
def _findMid(self, args) -> Optional[str]:
(val, __) = args
if re.search("[^0-9]", val):
return None
return "n.mid = %s" % val
def _findModel(self, args: Tuple[str, List[Any]]) -> str:
(val, __) = args
ids = []
val = val.lower()
for m in self.col.models.all():
if unicodedata.normalize("NFC", m["name"].lower()) == val:
ids.append(m["id"])
return "n.mid in %s" % ids2str(ids)
def _findDeck(self, args: Tuple[str, List[Any]]) -> Optional[str]:
# if searching for all decks, skip
(val, __) = args
if val == "*":
return "skip"
# deck types
elif val == "filtered":
return "c.odid"
def dids(did):
if not did:
return None
return [did] + [a[1] for a in self.col.decks.children(did)]
# current deck?
ids = None
if val.lower() == "current":
ids = dids(self.col.decks.current()["id"])
elif "*" not in val:
# single deck
ids = dids(self.col.decks.id(val, create=False))
else:
# wildcard
ids = set()
val = re.escape(val).replace(r"\*", ".*")
for d in self.col.decks.all():
if re.match("(?i)" + val, unicodedata.normalize("NFC", d["name"])):
ids.update(dids(d["id"]))
if not ids:
return None
sids = ids2str(ids)
return "c.did in %s or c.odid in %s" % (sids, sids)
def _findTemplate(self, args: Tuple[str, List[Any]]) -> str:
# were we given an ordinal number?
(val, __) = args
try:
num = int(val) - 1
except:
num = None
if num is not None:
return "c.ord = %d" % num
# search for template names
lims = []
for m in self.col.models.all():
for t in m["tmpls"]:
if unicodedata.normalize("NFC", t["name"].lower()) == val.lower():
if m["type"] == MODEL_CLOZE:
# if the user has asked for a cloze card, we want
# to give all ordinals, so we just limit to the
# model instead
lims.append("(n.mid = %s)" % m["id"])
else:
lims.append("(n.mid = %s and c.ord = %s)" % (m["id"], t["ord"]))
return " or ".join(lims)
def _findField(self, field: str, val: str) -> Optional[str]:
field = field.lower()
val = val.replace("*", "%")
# find models that have that field
mods = {}
for m in self.col.models.all():
for f in m["flds"]:
if unicodedata.normalize("NFC", f["name"].lower()) == field:
mods[str(m["id"])] = (m, f["ord"])
if not mods:
# nothing has that field
return None
# gather nids
regex = re.escape(val).replace("_", ".").replace(re.escape("%"), ".*")
nids = []
for (id, mid, flds) in self.col.db.execute(
"""
select id, mid, flds from notes
where mid in %s and flds like ? escape '\\'"""
% (ids2str(list(mods.keys()))),
"%" + val + "%",
):
flds = splitFields(flds)
ord = mods[str(mid)][1]
strg = flds[ord]
try:
if re.search("(?si)^" + regex + "$", strg):
nids.append(id)
except sre_constants.error:
return None
if not nids:
return "0"
return "n.id in %s" % ids2str(nids)
def _findDupes(self, args) -> Optional[str]:
# caller must call stripHTMLMedia on passed val
(val, __) = args
try:
mid, val = val.split(",", 1)
except OSError:
return None
csum = fieldChecksum(val)
nids = []
for nid, flds in self.col.db.execute(
"select id, flds from notes where mid=? and csum=?", mid, csum
):
if stripHTMLMedia(splitFields(flds)[0]) == val:
nids.append(nid)
return "n.id in %s" % ids2str(nids)
def findNotes(self, query):
return self.col.find_notes(query)
# Find and replace

View file

@ -492,32 +492,6 @@ class _SchemaWillChangeFilter:
schema_will_change = _SchemaWillChangeFilter()
class _SearchTermsPreparedHook:
_hooks: List[Callable[[Dict[str, Callable]], None]] = []
def append(self, cb: Callable[[Dict[str, Callable]], None]) -> None:
"""(searches: Dict[str, Callable])"""
self._hooks.append(cb)
def remove(self, cb: Callable[[Dict[str, Callable]], None]) -> None:
if cb in self._hooks:
self._hooks.remove(cb)
def __call__(self, searches: Dict[str, Callable]) -> None:
for hook in self._hooks:
try:
hook(searches)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
# legacy support
runHook("search", searches)
search_terms_prepared = _SearchTermsPreparedHook()
class _SyncProgressDidChangeHook:
_hooks: List[Callable[[str], None]] = []

View file

@ -2,7 +2,6 @@
import pytest
from anki.consts import *
from anki.find import Finder
from tests.shared import getEmptyCol
@ -11,32 +10,6 @@ class DummyCollection:
return None
def test_parse():
f = Finder(DummyCollection())
assert f._tokenize("hello world") == ["hello", "world"]
assert f._tokenize("hello world") == ["hello", "world"]
assert f._tokenize("one -two") == ["one", "-", "two"]
assert f._tokenize("one --two") == ["one", "-", "two"]
assert f._tokenize("one - two") == ["one", "-", "two"]
assert f._tokenize("one or -two") == ["one", "or", "-", "two"]
assert f._tokenize("'hello \"world\"'") == ['hello "world"']
assert f._tokenize('"hello world"') == ["hello world"]
assert f._tokenize("one (two or ( three or four))") == [
"one",
"(",
"two",
"or",
"(",
"three",
"or",
"four",
")",
")",
]
assert f._tokenize("embedded'string") == ["embedded'string"]
assert f._tokenize("deck:'two words'") == ["deck:two words"]
def test_findCards():
deck = getEmptyCol()
f = deck.newNote()

View file

@ -37,11 +37,6 @@ hooks = [
args=["exporters: List[Tuple[str, Any]]"],
legacy_hook="exportersList",
),
Hook(
name="search_terms_prepared",
args=["searches: Dict[str, Callable]"],
legacy_hook="search",
),
Hook(
name="note_type_added",
args=["notetype: Dict[str, Any]"],

View file

@ -13,7 +13,7 @@ import unicodedata
from dataclasses import dataclass
from enum import Enum
from operator import itemgetter
from typing import Callable, List, Optional, Union
from typing import Callable, List, Optional, Sequence, Union
import anki
import aqt.forms
@ -69,6 +69,13 @@ class FindDupesDialog:
browser: Browser
@dataclass
class SearchContext:
search: str
# if set, provided card ids will be used instead of the regular search
card_ids: Optional[Sequence[int]] = None
# Data model
##########################################################################
@ -82,7 +89,7 @@ class DataModel(QAbstractTableModel):
self.activeCols = self.col.conf.get(
"activeCols", ["noteFld", "template", "cardDue", "deck"]
)
self.cards: List[int] = []
self.cards: Sequence[int] = []
self.cardObjs: Dict[int, Card] = {}
def getCard(self, index: QModelIndex) -> Card:
@ -169,23 +176,21 @@ class DataModel(QAbstractTableModel):
# Filtering
######################################################################
def search(self, txt):
def search(self, txt: str) -> None:
self.beginReset()
t = time.time()
# the db progress handler may cause a refresh, so we need to zero out
# old data first
self.cards = []
invalid = False
try:
self.cards = self.col.findCards(txt, order=True)
ctx = SearchContext(search=txt)
gui_hooks.browser_will_search(ctx)
if ctx.card_ids is None:
ctx.card_ids = self.col.find_cards(txt)
gui_hooks.browser_did_search(ctx)
self.cards = ctx.card_ids
except Exception as e:
if str(e) == "invalidSearch":
self.cards = []
print("search failed:", e)
invalid = True
else:
raise
finally:
# print "fetch cards in %dms" % ((time.time() - t)*1000)
self.endReset()
if invalid:

View file

@ -359,6 +359,32 @@ class _BrowserDidChangeRowHook:
browser_did_change_row = _BrowserDidChangeRowHook()
class _BrowserDidSearchHook:
"""Allows you to modify the list of returned card ids from a search."""
_hooks: List[Callable[["aqt.browser.SearchContext"], None]] = []
def append(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None:
"""(context: aqt.browser.SearchContext)"""
self._hooks.append(cb)
def remove(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None:
if cb in self._hooks:
self._hooks.remove(cb)
def __call__(self, context: aqt.browser.SearchContext) -> None:
for hook in self._hooks:
try:
hook(context)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
browser_did_search = _BrowserDidSearchHook()
class _BrowserMenusDidInitHook:
_hooks: List[Callable[["aqt.browser.Browser"], None]] = []
@ -484,6 +510,42 @@ class _BrowserWillBuildTreeFilter:
browser_will_build_tree = _BrowserWillBuildTreeFilter()
class _BrowserWillSearchHook:
"""Allows you to modify the search text, or perform your own search.
You can modify context.search to change the text that is sent to the
searching backend.
If you set context.card_ids to a list of ids, the regular search will
not be performed, and the provided ids will be used instead.
Your add-on should check if context.card_ids is not None, and return
without making changes if it has been set.
"""
_hooks: List[Callable[["aqt.browser.SearchContext"], None]] = []
def append(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None:
"""(context: aqt.browser.SearchContext)"""
self._hooks.append(cb)
def remove(self, cb: Callable[["aqt.browser.SearchContext"], None]) -> None:
if cb in self._hooks:
self._hooks.remove(cb)
def __call__(self, context: aqt.browser.SearchContext) -> None:
for hook in self._hooks:
try:
hook(context)
except:
# if the hook fails, remove it
self._hooks.remove(hook)
raise
browser_will_search = _BrowserWillSearchHook()
class _BrowserWillShowHook:
_hooks: List[Callable[["aqt.browser.Browser"], None]] = []

View file

@ -235,6 +235,26 @@ hooks = [
return True
""",
),
Hook(
name="browser_will_search",
args=["context: aqt.browser.SearchContext"],
doc="""Allows you to modify the search text, or perform your own search.
You can modify context.search to change the text that is sent to the
searching backend.
If you set context.card_ids to a list of ids, the regular search will
not be performed, and the provided ids will be used instead.
Your add-on should check if context.card_ids is not None, and return
without making changes if it has been set.
""",
),
Hook(
name="browser_did_search",
args=["context: aqt.browser.SearchContext"],
doc="""Allows you to modify the list of returned card ids from a search.""",
),
# States
###################
Hook(