From b157ee7570eff2fe47f254f422367dc327403531 Mon Sep 17 00:00:00 2001 From: Alan Du Date: Wed, 26 Feb 2020 00:24:32 -0500 Subject: [PATCH 1/6] Monkeytype pylib/anki/find.py --- pylib/anki/find.py | 109 +++++++++++++++++++++++++-------------------- 1 file changed, 61 insertions(+), 48 deletions(-) diff --git a/pylib/anki/find.py b/pylib/anki/find.py index 847d4e05f..3faa6fe2a 100644 --- a/pylib/anki/find.py +++ b/pylib/anki/find.py @@ -1,10 +1,11 @@ # Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html +from __future__ import annotations import re import sre_constants import unicodedata -from typing import Any, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, Union, cast from anki import hooks from anki.consts import * @@ -18,12 +19,15 @@ from anki.utils import ( stripHTMLMedia, ) +if TYPE_CHECKING: + from anki.collection import _Collection + # Find ########################################################################## class Finder: - def __init__(self, col) -> None: + def __init__(self, col: Optional[_Collection]) -> None: self.col = col self.search = dict( added=self._findAdded, @@ -42,7 +46,7 @@ class Finder: self.search["is"] = self._findCardState hooks.search_terms_prepared(self.search) - def findCards(self, query, order=False) -> Any: + def findCards(self, query: str, order: Union[bool, str] = False) -> List[Any]: "Return a list of card ids for QUERY." tokens = self._tokenize(query) preds, args = self._where(tokens) @@ -59,7 +63,7 @@ class Finder: res.reverse() return res - def findNotes(self, query) -> Any: + def findNotes(self, query: str) -> List[Any]: tokens = self._tokenize(query) preds, args = self._where(tokens) if preds is None: @@ -83,8 +87,8 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ # Tokenizing ###################################################################### - def _tokenize(self, query) -> List: - inQuote = False + def _tokenize(self, query: str) -> List[str]: + inQuote: Union[bool, str] = False tokens = [] token = "" for c in query: @@ -137,7 +141,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ # Query building ###################################################################### - def _where(self, tokens) -> Tuple[Any, Optional[List[str]]]: + def _where(self, tokens: List[str]) -> Tuple[str, Optional[List[str]]]: # state and query s: Dict[str, Any] = dict(isnot=False, isor=False, join=False, q="", bad=False) args: List[Any] = [] @@ -197,7 +201,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ return None, None return s["q"], args - def _query(self, preds, order) -> str: + def _query(self, preds: str, order: str) -> str: # can we skip the note table? if "n." not in preds and "n." not in order: sql = "select c.id from cards c where " @@ -216,12 +220,12 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ # Ordering ###################################################################### - def _order(self, order) -> Tuple[Any, Any]: + def _order(self, order: Union[bool, str]) -> Tuple[str, bool]: if not order: return "", False elif order is not True: # custom order string provided - return " order by " + order, False + return " order by " + cast(str, order), False # use deck default type = self.col.conf["sortType"] sort = None @@ -253,8 +257,8 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ # Commands ###################################################################### - def _findTag(self, args) -> str: - (val, args) = args + def _findTag(self, args: Tuple[str, List[Any]]) -> str: + (val, list_args) = args if val == "none": return 'n.tags = ""' val = val.replace("*", "%") @@ -262,11 +266,11 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ val = "% " + val if not val.endswith("%") or val.endswith("\\%"): val += " %" - args.append(val) + list_args.append(val) return "n.tags like ? escape '\\'" - def _findCardState(self, args) -> Optional[str]: - (val, args) = args + def _findCardState(self, args: Tuple[str, List[Any]]) -> Optional[str]: + (val, __) = args if val in ("review", "new", "learn"): if val == "review": n = 2 @@ -290,17 +294,16 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ # unknown return None - def _findFlag(self, args) -> Optional[str]: - (val, args) = args + def _findFlag(self, args: Tuple[str, List[Any]]) -> Optional[str]: + (val, __) = args if not val or len(val) != 1 or val not in "01234": return None - val = int(val) mask = 2 ** 3 - 1 - return "(c.flags & %d) == %d" % (mask, val) + return "(c.flags & %d) == %d" % (mask, int(val)) - def _findRated(self, args) -> Optional[str]: + def _findRated(self, args: Tuple[str, List[Any]]) -> Optional[str]: # days(:optional_ease) - (val, args) = args + (val, __) = args r = val.split(":") try: days = int(r[0]) @@ -316,8 +319,8 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000 return "c.id in (select cid from revlog where id>%d %s)" % (cutoff, ease) - def _findAdded(self, args) -> Optional[str]: - (val, args) = args + def _findAdded(self, args: Tuple[str, List[Any]]) -> Optional[str]: + (val, __) = args try: days = int(val) except ValueError: @@ -325,20 +328,20 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000 return "c.id > %d" % cutoff - def _findProp(self, args) -> Optional[str]: + def _findProp(self, args: Tuple[str, List[Any]]) -> Optional[str]: # extract - (val, args) = args - m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", val) + (strval, __) = args + m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", strval) if not m: return None - prop, cmp, val = m.groups() + prop, cmp, strval = m.groups() prop = prop.lower() # pytype: disable=attribute-error # is val valid? try: if prop == "ease": - val = float(val) + val = float(strval) else: - val = int(val) + val = int(strval) except ValueError: return None # is prop valid? @@ -356,32 +359,32 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ q.append("(%s %s %s)" % (prop, cmp, val)) return " and ".join(q) - def _findText(self, val, args) -> str: + def _findText(self, val: str, args: List[str]) -> str: val = val.replace("*", "%") args.append("%" + val + "%") args.append("%" + val + "%") return "(n.sfld like ? escape '\\' or n.flds like ? escape '\\')" - def _findNids(self, args) -> Optional[str]: - (val, args) = args + def _findNids(self, args: Tuple[str, List[Any]]) -> Optional[str]: + (val, __) = args if re.search("[^0-9,]", val): return None return "n.id in (%s)" % val def _findCids(self, args) -> Optional[str]: - (val, args) = args + (val, __) = args if re.search("[^0-9,]", val): return None return "c.id in (%s)" % val def _findMid(self, args) -> Optional[str]: - (val, args) = args + (val, __) = args if re.search("[^0-9]", val): return None return "n.mid = %s" % val - def _findModel(self, args) -> str: - (val, args) = args + def _findModel(self, args: Tuple[str, List[Any]]) -> str: + (val, __) = args ids = [] val = val.lower() for m in self.col.models.all(): @@ -389,9 +392,9 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ ids.append(m["id"]) return "n.mid in %s" % ids2str(ids) - def _findDeck(self, args) -> Optional[str]: + def _findDeck(self, args: Tuple[str, List[Any]]) -> Optional[str]: # if searching for all decks, skip - (val, args) = args + (val, __) = args if val == "*": return "skip" # deck types @@ -422,9 +425,9 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ sids = ids2str(ids) return "c.did in %s or c.odid in %s" % (sids, sids) - def _findTemplate(self, args) -> str: + def _findTemplate(self, args: Tuple[str, List[Any]]) -> str: # were we given an ordinal number? - (val, args) = args + (val, __) = args try: num = int(val) - 1 except: @@ -445,7 +448,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """ lims.append("(n.mid = %s and c.ord = %s)" % (m["id"], t["ord"])) return " or ".join(lims) - def _findField(self, field, val) -> Optional[str]: + def _findField(self, field: str, val: str) -> Optional[str]: field = field.lower() val = val.replace("*", "%") # find models that have that field @@ -481,7 +484,7 @@ where mid in %s and flds like ? escape '\\'""" def _findDupes(self, args) -> Optional[str]: # caller must call stripHTMLMedia on passed val - (val, args) = args + (val, __) = args try: mid, val = val.split(",", 1) except OSError: @@ -500,9 +503,17 @@ where mid in %s and flds like ? escape '\\'""" ########################################################################## -def findReplace(col, nids, src, dst, regex=False, field=None, fold=True) -> int: +def findReplace( + col: _Collection, + nids: List[int], + src: str, + dst: str, + regex: bool = False, + field: Optional[str] = None, + fold: bool = True, +) -> int: "Find and replace fields in a note." - mmap = {} + mmap: Dict[str, Any] = {} if field: for m in col.models.all(): for f in m["flds"]: @@ -516,10 +527,10 @@ def findReplace(col, nids, src, dst, regex=False, field=None, fold=True) -> int: dst = dst.replace("\\", "\\\\") if fold: src = "(?i)" + src - regex = re.compile(src) + compiled_re = re.compile(src) - def repl(str): - return re.sub(regex, dst, str) + def repl(s: str): + return compiled_re.sub(dst, s) d = [] snids = ids2str(nids) @@ -577,7 +588,9 @@ def fieldNamesForNotes(col, nids) -> List: # Find duplicates ########################################################################## # returns array of ("dupestr", [nids]) -def findDupes(col, fieldName, search="") -> List[Tuple[Any, List]]: +def findDupes( + col: _Collection, fieldName: str, search: str = "" +) -> List[Tuple[Any, List]]: # limit search to notes with applicable field name if search: search = "(" + search + ") " From 02dd30f2a07cb53a247fa2cccc8e99e4f350f53b Mon Sep 17 00:00:00 2001 From: Alan Du Date: Wed, 26 Feb 2020 00:49:07 -0500 Subject: [PATCH 2/6] Monkeytype pylib/anki/sched.py --- pylib/anki/sched.py | 260 +++++++++++++++++++++++++------------------- 1 file changed, 148 insertions(+), 112 deletions(-) diff --git a/pylib/anki/sched.py b/pylib/anki/sched.py index b88174ace..d13d4154d 100644 --- a/pylib/anki/sched.py +++ b/pylib/anki/sched.py @@ -8,7 +8,7 @@ import random import time from heapq import * from operator import itemgetter -from typing import List, Optional, Set +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union import anki from anki import hooks @@ -56,14 +56,14 @@ class Scheduler: return card return None - def reset(self): + def reset(self) -> None: self._updateCutoff() self._resetLrn() self._resetRev() self._resetNew() self._haveQueues = True - def answerCard(self, card, ease): + def answerCard(self, card: Card, ease: int) -> None: self.col.log() assert 1 <= ease <= 4 self.col.markReview(card) @@ -71,7 +71,7 @@ class Scheduler: self._burySiblings(card) card.reps += 1 # former is for logging new cards, latter also covers filt. decks - card.wasNew = card.type == CARD_TYPE_NEW + card.wasNew = card.type == CARD_TYPE_NEW # type: ignore wasNewQ = card.queue == QUEUE_TYPE_NEW if wasNewQ: # came from the new queue, move to learning @@ -102,7 +102,7 @@ class Scheduler: card.usn = self.col.usn() card.flushSched() - def counts(self, card=None): + def counts(self, card: Optional[Card] = None) -> Tuple[int, int, int]: counts = [self.newCount, self.lrnCount, self.revCount] if card: idx = self.countIdx(card) @@ -110,9 +110,11 @@ class Scheduler: counts[1] += card.left // 1000 else: counts[idx] += 1 - return tuple(counts) - def dueForecast(self, days=7): + new, lrn, rev = counts + return (new, lrn, rev) + + def dueForecast(self, days: int = 7) -> List[Any]: "Return counts over next DAYS. Includes today." daysd = dict( self.col.db.all( @@ -135,12 +137,12 @@ order by due""" ret = [x[1] for x in sorted(daysd.items())] return ret - def countIdx(self, card): + def countIdx(self, card: Card) -> int: if card.queue == QUEUE_TYPE_DAY_LEARN_RELEARN: return 1 return card.queue - def answerButtons(self, card): + def answerButtons(self, card: Card) -> int: if card.odue: # normal review in dyn deck? if card.odid and card.queue == QUEUE_TYPE_REV: @@ -154,7 +156,7 @@ order by due""" else: return 3 - def unburyCards(self): + def unburyCards(self) -> None: "Unbury cards." self.col.conf["lastUnburied"] = self.today self.col.log( @@ -166,7 +168,7 @@ order by due""" f"update cards set queue=type where queue = {QUEUE_TYPE_SIBLING_BURIED}" ) - def unburyCardsForDeck(self): + def unburyCardsForDeck(self) -> None: sids = ids2str(self.col.decks.active()) self.col.log( self.col.db.list( @@ -184,14 +186,14 @@ order by due""" # Rev/lrn/time daily stats ########################################################################## - def _updateStats(self, card, type, cnt=1): + def _updateStats(self, card: Card, type: str, cnt: int = 1) -> None: key = type + "Today" for g in [self.col.decks.get(card.did)] + self.col.decks.parents(card.did): # add g[key][1] += cnt self.col.decks.save(g) - def extendLimits(self, new, rev): + def extendLimits(self, new: int, rev: int) -> None: cur = self.col.decks.current() parents = self.col.decks.parents(cur["id"]) children = [ @@ -204,7 +206,11 @@ order by due""" g["revToday"][1] -= rev self.col.decks.save(g) - def _walkingCount(self, limFn=None, cntFn=None): + def _walkingCount( + self, + limFn: Optional[Callable[[Any], Optional[int]]] = None, + cntFn: Optional[Callable[[int, int], int]] = None, + ) -> int: tot = 0 pcounts: Dict[int, int] = {} # for each of the active decks @@ -238,7 +244,7 @@ order by due""" # Deck list ########################################################################## - def deckDueList(self): + def deckDueList(self) -> List[List[Any]]: "Returns [deckname, did, rev, lrn, new]" self._checkDay() self.col.decks.checkIntegrity() @@ -274,10 +280,10 @@ order by due""" lims[deck["name"]] = [nlim, rlim] return data - def deckDueTree(self): + def deckDueTree(self) -> Any: return self._groupChildren(self.deckDueList()) - def _groupChildren(self, grps): + def _groupChildren(self, grps: List[List[Any]]) -> Any: # first, split the group names into components for g in grps: g[0] = g[0].split("::") @@ -286,7 +292,7 @@ order by due""" # then run main function return self._groupChildrenMain(grps) - def _groupChildrenMain(self, grps): + def _groupChildrenMain(self, grps: List[List[Any]]) -> Any: tree = [] # group and recurse def key(grp): @@ -328,7 +334,7 @@ order by due""" # Getting the next card ########################################################################## - def _getCard(self): + def _getCard(self) -> Optional[Card]: "Return the next due card id, or None." # learning card due? c = self._getLrnCard() @@ -357,7 +363,7 @@ order by due""" # New cards ########################################################################## - def _resetNewCount(self): + def _resetNewCount(self) -> None: cntFn = lambda did, lim: self.col.db.scalar( f""" select count() from (select 1 from cards where @@ -367,13 +373,13 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", ) self.newCount = self._walkingCount(self._deckNewLimitSingle, cntFn) - def _resetNew(self): + def _resetNew(self) -> None: self._resetNewCount() self._newDids = self.col.decks.active()[:] - self._newQueue = [] + self._newQueue: List[Any] = [] self._updateNewCardRatio() - def _fillNew(self): + def _fillNew(self) -> Optional[bool]: if self._newQueue: return True if not self.newCount: @@ -400,13 +406,15 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", # removed from the queue but not buried self._resetNew() return self._fillNew() + return None - def _getNewCard(self): + def _getNewCard(self) -> Optional[Card]: if self._fillNew(): self.newCount -= 1 return self.col.getCard(self._newQueue.pop()) + return None - def _updateNewCardRatio(self): + def _updateNewCardRatio(self) -> None: if self.col.conf["newSpread"] == NEW_CARDS_DISTRIBUTE: if self.newCount: self.newCardModulus = (self.newCount + self.revCount) // self.newCount @@ -416,7 +424,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", return self.newCardModulus = 0 - def _timeForNewCard(self): + def _timeForNewCard(self) -> Optional[bool]: "True if it's time to display a new card when distributing." if not self.newCount: return False @@ -425,9 +433,12 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", elif self.col.conf["newSpread"] == NEW_CARDS_FIRST: return True elif self.newCardModulus: - return self.reps and self.reps % self.newCardModulus == 0 + return self.reps != 0 and self.reps % self.newCardModulus == 0 + return None - def _deckNewLimit(self, did, fn=None): + def _deckNewLimit( + self, did: int, fn: Optional[Callable[[Dict[str, Any]], int]] = None + ) -> int: if not fn: fn = self._deckNewLimitSingle sel = self.col.decks.get(did) @@ -441,7 +452,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", lim = min(rem, lim) return lim - def _newForDeck(self, did, lim): + def _newForDeck(self, did: int, lim: int) -> int: "New count for a single deck." if not lim: return 0 @@ -454,14 +465,14 @@ select count() from lim, ) - def _deckNewLimitSingle(self, g): + def _deckNewLimitSingle(self, g: Dict[str, Any]) -> int: "Limit for deck without parent limits." if g["dyn"]: return self.reportLimit c = self.col.decks.confForDid(g["id"]) return max(0, c["new"]["perDay"] - g["newToday"][1]) - def totalNewForCurrentDeck(self): + def totalNewForCurrentDeck(self) -> int: return self.col.db.scalar( f""" select count() from cards where id in ( @@ -473,7 +484,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_NEW} limit ?)""" # Learning queues ########################################################################## - def _resetLrnCount(self): + def _resetLrnCount(self) -> None: # sub-day self.lrnCount = ( self.col.db.scalar( @@ -494,14 +505,14 @@ and due <= ? limit %d""" self.today, ) - def _resetLrn(self): + def _resetLrn(self) -> None: self._resetLrnCount() - self._lrnQueue = [] - self._lrnDayQueue = [] + self._lrnQueue: List[Any] = [] + self._lrnDayQueue: List[Any] = [] self._lrnDids = self.col.decks.active()[:] # sub-day learning - def _fillLrn(self): + def _fillLrn(self) -> Union[bool, List[Any]]: if not self.lrnCount: return False if self._lrnQueue: @@ -518,7 +529,7 @@ limit %d""" self._lrnQueue.sort() return self._lrnQueue - def _getLrnCard(self, collapse=False): + def _getLrnCard(self, collapse: bool = False) -> Optional[Card]: if self._fillLrn(): cutoff = time.time() if collapse: @@ -528,9 +539,10 @@ limit %d""" card = self.col.getCard(id) self.lrnCount -= card.left // 1000 return card + return None # daily learning - def _fillLrnDay(self): + def _fillLrnDay(self) -> Optional[bool]: if not self.lrnCount: return False if self._lrnDayQueue: @@ -557,16 +569,18 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", return True # nothing left in the deck; move to next self._lrnDids.pop(0) + return None - def _getLrnDayCard(self): + def _getLrnDayCard(self) -> Optional[Card]: if self._fillLrnDay(): self.lrnCount -= 1 return self.col.getCard(self._lrnDayQueue.pop()) + return None - def _answerLrnCard(self, card, ease): + def _answerLrnCard(self, card: Card, ease: int) -> None: # ease 1=no, 2=yes, 3=remove conf = self._lrnConf(card) - if card.odid and not card.wasNew: + if card.odid and not card.wasNew: # type: ignore type = REVLOG_CRAM elif card.type == CARD_TYPE_REV: type = REVLOG_RELRN @@ -625,7 +639,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", card.queue = QUEUE_TYPE_DAY_LEARN_RELEARN self._logLrn(card, ease, conf, leaving, type, lastLeft) - def _delayForGrade(self, conf, left): + def _delayForGrade(self, conf: Dict[str, Any], left: int) -> Union[int, float]: left = left % 1000 try: delay = conf["delays"][-left] @@ -637,13 +651,13 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", delay = 1 return delay * 60 - def _lrnConf(self, card): + def _lrnConf(self, card: Card) -> Dict[str, Any]: if card.type == CARD_TYPE_REV: return self._lapseConf(card) else: return self._newConf(card) - def _rescheduleAsRev(self, card, conf, early): + def _rescheduleAsRev(self, card: Card, conf: Dict[str, Any], early: bool) -> None: lapse = card.type == CARD_TYPE_REV if lapse: if self._resched(card): @@ -666,7 +680,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", card.queue = card.type = CARD_TYPE_NEW card.due = self.col.nextID("pos") - def _startingLeft(self, card): + def _startingLeft(self, card: Card) -> int: if card.type == CARD_TYPE_REV: conf = self._lapseConf(card) else: @@ -675,7 +689,9 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", tod = self._leftToday(conf["delays"], tot) return tot + tod * 1000 - def _leftToday(self, delays, left, now=None): + def _leftToday( + self, delays: List[int], left: int, now: Optional[int] = None + ) -> int: "The number of steps that can be completed by the day cutoff." if not now: now = intTime() @@ -688,7 +704,9 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", ok = i return ok + 1 - def _graduatingIvl(self, card, conf, early, adj=True): + def _graduatingIvl( + self, card: Card, conf: Dict[str, Any], early: bool, adj: bool = True + ) -> int: if card.type == CARD_TYPE_REV: # lapsed card being relearnt if card.odid: @@ -706,13 +724,21 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", else: return ideal - def _rescheduleNew(self, card, conf, early): + def _rescheduleNew(self, card: Card, conf: Dict[str, Any], early: bool) -> None: "Reschedule a new card that's graduated for the first time." card.ivl = self._graduatingIvl(card, conf, early) card.due = self.today + card.ivl card.factor = conf["initialFactor"] - def _logLrn(self, card, ease, conf, leaving, type, lastLeft): + def _logLrn( + self, + card: Card, + ease: int, + conf: Dict[str, Any], + leaving: bool, + type: int, + lastLeft: int, + ) -> None: lastIvl = -(self._delayForGrade(conf, lastLeft)) ivl = card.ivl if leaving else -(self._delayForGrade(conf, card.left)) @@ -737,7 +763,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", time.sleep(0.01) log() - def removeLrn(self, ids=None): + def removeLrn(self, ids: Optional[List[int]] = None) -> None: "Remove cards from the learning queues." if ids: extra = " and id in " + ids2str(ids) @@ -763,7 +789,7 @@ where queue in ({QUEUE_TYPE_LRN},{QUEUE_TYPE_DAY_LEARN_RELEARN}) and type = {CAR ) ) - def _lrnForDeck(self, did): + def _lrnForDeck(self, did: int) -> int: cnt = ( self.col.db.scalar( f""" @@ -788,16 +814,16 @@ and due <= ? limit ?)""", # Reviews ########################################################################## - def _deckRevLimit(self, did): + def _deckRevLimit(self, did: int) -> int: return self._deckNewLimit(did, self._deckRevLimitSingle) - def _deckRevLimitSingle(self, d): + def _deckRevLimitSingle(self, d: Dict[str, Any]) -> int: if d["dyn"]: return self.reportLimit c = self.col.decks.confForDid(d["id"]) return max(0, c["rev"]["perDay"] - d["revToday"][1]) - def _revForDeck(self, did, lim): + def _revForDeck(self, did: int, lim: int) -> int: lim = min(lim, self.reportLimit) return self.col.db.scalar( f""" @@ -809,7 +835,7 @@ and due <= ? limit ?)""", lim, ) - def _resetRevCount(self): + def _resetRevCount(self) -> None: def cntFn(did, lim): return self.col.db.scalar( f""" @@ -822,12 +848,12 @@ did = ? and queue = {QUEUE_TYPE_REV} and due <= ? limit %d)""" self.revCount = self._walkingCount(self._deckRevLimitSingle, cntFn) - def _resetRev(self): + def _resetRev(self) -> None: self._resetRevCount() - self._revQueue = [] + self._revQueue: List[Any] = [] self._revDids = self.col.decks.active()[:] - def _fillRev(self): + def _fillRev(self) -> Optional[bool]: if self._revQueue: return True if not self.revCount: @@ -868,12 +894,15 @@ did = ? and queue = {QUEUE_TYPE_REV} and due <= ? limit ?""", self._resetRev() return self._fillRev() - def _getRevCard(self): + return None + + def _getRevCard(self) -> Optional[Card]: if self._fillRev(): self.revCount -= 1 return self.col.getCard(self._revQueue.pop()) + return None - def totalRevForCurrentDeck(self): + def totalRevForCurrentDeck(self) -> int: return self.col.db.scalar( f""" select count() from cards where id in ( @@ -886,15 +915,15 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l # Answering a review card ########################################################################## - def _answerRevCard(self, card, ease): - delay = 0 + def _answerRevCard(self, card: Card, ease: int) -> None: + delay: float = 0 if ease == BUTTON_ONE: delay = self._rescheduleLapse(card) else: self._rescheduleRev(card, ease) self._logRev(card, ease, delay) - def _rescheduleLapse(self, card): + def _rescheduleLapse(self, card: Card) -> Union[int, float]: conf = self._lapseConf(card) card.lastIvl = card.ivl if self._resched(card): @@ -906,7 +935,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l if card.odid: card.odue = card.due # if suspended as a leech, nothing to do - delay = 0 + delay: float = 0 if self._checkLeech(card, conf) and card.queue == QUEUE_TYPE_SUSPENDED: return delay # if no relearning steps, nothing to do @@ -930,10 +959,10 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l card.queue = QUEUE_TYPE_DAY_LEARN_RELEARN return delay - def _nextLapseIvl(self, card, conf): + def _nextLapseIvl(self, card: Card, conf: Dict[str, Any]) -> int: return max(conf["minInt"], int(card.ivl * conf["mult"])) - def _rescheduleRev(self, card, ease): + def _rescheduleRev(self, card: Card, ease: int) -> None: # update interval card.lastIvl = card.ivl if self._resched(card): @@ -948,7 +977,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l card.odid = 0 card.odue = 0 - def _logRev(self, card, ease, delay): + def _logRev(self, card: Card, ease: int, delay: Union[int, float]) -> None: def log(): self.col.db.execute( "insert into revlog values (?,?,?,?,?,?,?,?,?)", @@ -973,7 +1002,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l # Interval management ########################################################################## - def _nextRevIvl(self, card, ease): + def _nextRevIvl(self, card: Card, ease: int) -> int: "Ideal next interval for CARD, given EASE." delay = self._daysLate(card) conf = self._revConf(card) @@ -992,11 +1021,11 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l # interval capped? return min(interval, conf["maxIvl"]) - def _fuzzedIvl(self, ivl): + def _fuzzedIvl(self, ivl: int) -> int: min, max = self._fuzzIvlRange(ivl) return random.randint(min, max) - def _fuzzIvlRange(self, ivl): + def _fuzzIvlRange(self, ivl: int) -> List[int]: if ivl < 2: return [1, 1] elif ivl == 2: @@ -1011,24 +1040,24 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l fuzz = max(fuzz, 1) return [ivl - fuzz, ivl + fuzz] - def _constrainedIvl(self, ivl, conf, prev): + def _constrainedIvl(self, ivl: float, conf: Dict[str, Any], prev: int) -> int: "Integer interval after interval factor and prev+1 constraints applied." new = ivl * conf.get("ivlFct", 1) return int(max(new, prev + 1)) - def _daysLate(self, card): + def _daysLate(self, card: Card) -> int: "Number of days later than scheduled." due = card.odue if card.odid else card.due return max(0, self.today - due) - def _updateRevIvl(self, card, ease): + def _updateRevIvl(self, card: Card, ease: int) -> None: idealIvl = self._nextRevIvl(card, ease) card.ivl = min( max(self._adjRevIvl(card, idealIvl), card.ivl + 1), self._revConf(card)["maxIvl"], ) - def _adjRevIvl(self, card, idealIvl): + def _adjRevIvl(self, card: Card, idealIvl: int) -> int: if self._spreadRev: idealIvl = self._fuzzedIvl(idealIvl) return idealIvl @@ -1036,7 +1065,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l # Dynamic deck handling ########################################################################## - def rebuildDyn(self, did=None): + def rebuildDyn(self, did: Optional[int] = None) -> Optional[List[int]]: "Rebuild a dynamic deck." did = did or self.col.decks.selected() deck = self.col.decks.get(did) @@ -1045,12 +1074,12 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l self.emptyDyn(did) ids = self._fillDyn(deck) if not ids: - return + return None # and change to our new deck self.col.decks.select(did) return ids - def _fillDyn(self, deck): + def _fillDyn(self, deck: Dict[str, Any]) -> List[int]: search, limit, order = deck["terms"][0] orderlimit = self._dynOrder(order, limit) if search.strip(): @@ -1066,7 +1095,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l self._moveToDyn(deck["id"], ids) return ids - def emptyDyn(self, did, lim=None): + def emptyDyn(self, did: Optional[int], lim: Optional[str] = None) -> None: if not lim: lim = "did = %s" % did self.col.log(self.col.db.list("select id from cards where %s" % lim)) @@ -1080,10 +1109,10 @@ due = odue, odue = 0, odid = 0, usn = ? where %s""" self.col.usn(), ) - def remFromDyn(self, cids): + def remFromDyn(self, cids: List[int]) -> None: self.emptyDyn(None, "id in %s and odid" % ids2str(cids)) - def _dynOrder(self, o, l): + def _dynOrder(self, o: int, l: int) -> str: if o == DYN_OLDEST: t = "(select max(id) from revlog where cid=c.id)" elif o == DYN_RANDOM: @@ -1110,7 +1139,7 @@ due = odue, odue = 0, odid = 0, usn = ? where %s""" t = "c.due" return t + " limit %d" % l - def _moveToDyn(self, did, ids): + def _moveToDyn(self, did: int, ids: List[int]) -> None: deck = self.col.decks.get(did) data = [] t = intTime() @@ -1134,7 +1163,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" data, ) - def _dynIvlBoost(self, card): + def _dynIvlBoost(self, card: Card) -> int: assert card.odid and card.type == CARD_TYPE_REV assert card.factor elapsed = card.ivl - (card.odue - self.today) @@ -1146,7 +1175,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" # Leeches ########################################################################## - def _checkLeech(self, card, conf) -> bool: + def _checkLeech(self, card: Card, conf: Dict[str, Any]) -> bool: "Leech handler. True if card was a leech." lf = conf["leechFails"] if not lf: @@ -1176,10 +1205,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" # Tools ########################################################################## - def _cardConf(self, card): + def _cardConf(self, card: Card) -> Dict[str, Any]: return self.col.decks.confForDid(card.did) - def _newConf(self, card): + def _newConf(self, card: Card) -> Dict[str, Any]: conf = self._cardConf(card) # normal deck if not card.odid: @@ -1199,7 +1228,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" perDay=self.reportLimit, ) - def _lapseConf(self, card): + def _lapseConf(self, card: Card) -> Dict[str, Any]: conf = self._cardConf(card) # normal deck if not card.odid: @@ -1218,7 +1247,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" resched=conf["resched"], ) - def _revConf(self, card): + def _revConf(self, card: Card) -> Dict[str, Any]: conf = self._cardConf(card) # normal deck if not card.odid: @@ -1226,10 +1255,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" # dynamic deck return self.col.decks.confForDid(card.odid)["rev"] - def _deckLimit(self): + def _deckLimit(self) -> str: return ids2str(self.col.decks.active()) - def _resched(self, card): + def _resched(self, card: Card) -> bool: conf = self._cardConf(card) if not conf["dyn"]: return True @@ -1238,7 +1267,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" # Daily cutoff ########################################################################## - def _updateCutoff(self): + def _updateCutoff(self) -> None: oldToday = self.today # days since col created self.today = int((time.time() - self.col.crt) // 86400) @@ -1261,7 +1290,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" if unburied < self.today: self.unburyCards() - def _checkDay(self): + def _checkDay(self) -> None: # check if the day has rolled over if time.time() > self.dayCutoff: self.reset() @@ -1269,7 +1298,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" # Deck finished state ########################################################################## - def finishedMsg(self): + def finishedMsg(self) -> str: return ( "" + _("Congratulations! You have finished this deck for now.") @@ -1277,7 +1306,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" + self._nextDueMsg() ) - def _nextDueMsg(self): + def _nextDueMsg(self) -> str: line = [] # the new line replacements are so we don't break translations # in a point release @@ -1321,7 +1350,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" ) return "

".join(line) - def revDue(self): + def revDue(self) -> Optional[int]: "True if there are any rev cards due." return self.col.db.scalar( ( @@ -1332,7 +1361,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" self.today, ) - def newDue(self): + def newDue(self) -> Optional[int]: "True if there are any new cards due." return self.col.db.scalar( ( @@ -1342,7 +1371,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" % self._deckLimit() ) - def haveBuried(self): + def haveBuried(self) -> bool: sdids = ids2str(self.col.decks.active()) cnt = self.col.db.scalar( f"select 1 from cards where queue = {QUEUE_TYPE_SIBLING_BURIED} and did in %s limit 1" @@ -1353,7 +1382,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" # Next time reports ########################################################################## - def nextIvlStr(self, card, ease, short=False): + def nextIvlStr(self, card: Card, ease: int, short: bool = False) -> str: "Return the next interval for CARD as a string." ivl_secs = self.nextIvl(card, ease) if not ivl_secs: @@ -1365,7 +1394,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" s = "<" + s return s - def nextIvl(self, card, ease): + def nextIvl(self, card: Card, ease: int) -> float: "Return the next interval for CARD, in seconds." if card.queue in (QUEUE_TYPE_NEW, QUEUE_TYPE_LRN, QUEUE_TYPE_DAY_LEARN_RELEARN): return self._nextLrnIvl(card, ease) @@ -1380,7 +1409,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" return self._nextRevIvl(card, ease) * 86400 # this isn't easily extracted from the learn code - def _nextLrnIvl(self, card, ease): + def _nextLrnIvl(self, card: Card, ease: int) -> Union[int, float]: if card.queue == 0: card.left = self._startingLeft(card) conf = self._lrnConf(card) @@ -1405,7 +1434,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" # Suspending ########################################################################## - def suspendCards(self, ids): + def suspendCards(self, ids: List[int]) -> None: "Suspend cards." self.col.log(ids) self.remFromDyn(ids) @@ -1417,7 +1446,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" self.col.usn(), ) - def unsuspendCards(self, ids): + def unsuspendCards(self, ids: List[int]) -> None: "Unsuspend cards." self.col.log(ids) self.col.db.execute( @@ -1427,7 +1456,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" self.col.usn(), ) - def buryCards(self, cids): + def buryCards(self, cids: List[int]) -> None: self.col.log(cids) self.remFromDyn(cids) self.removeLrn(cids) @@ -1439,7 +1468,7 @@ update cards set queue={QUEUE_TYPE_SIBLING_BURIED},mod=?,usn=? where id in """ self.col.usn(), ) - def buryNote(self, nid): + def buryNote(self, nid: int) -> None: "Bury all cards for note until next session." cids = self.col.db.list( "select id from cards where nid = ? and queue >= 0", nid @@ -1449,7 +1478,7 @@ update cards set queue={QUEUE_TYPE_SIBLING_BURIED},mod=?,usn=? where id in """ # Sibling spacing ########################################################################## - def _burySiblings(self, card): + def _burySiblings(self, card: Card) -> None: toBury = [] nconf = self._newConf(card) buryNew = nconf.get("bury", True) @@ -1493,7 +1522,7 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""", # Resetting ########################################################################## - def forgetCards(self, ids): + def forgetCards(self, ids: List[int]) -> None: "Put cards at the end of the new queue." self.remFromDyn(ids) self.col.db.execute( @@ -1509,7 +1538,7 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""", self.sortCards(ids, start=pmax + 1) self.col.log(ids) - def reschedCards(self, ids, imin, imax): + def reschedCards(self, ids: List[int], imin: int, imax: int) -> None: "Put cards in review queue with a new interval in days (min, max)." d = [] t = self.today @@ -1535,7 +1564,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""", ) self.col.log(ids) - def resetCards(self, ids): + def resetCards(self, ids: List[int]) -> None: "Completely reset cards for export." sids = ids2str(ids) # we want to avoid resetting due number of existing new cards on export @@ -1555,7 +1584,14 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""", # Repositioning new cards ########################################################################## - def sortCards(self, cids, start=1, step=1, shuffle=False, shift=False): + def sortCards( + self, + cids: List[int], + start: int = 1, + step: int = 1, + shuffle: bool = False, + shift: bool = False, + ) -> None: scids = ids2str(cids) now = intTime() nids = [] @@ -1605,15 +1641,15 @@ and due >= ? and queue = {QUEUE_TYPE_NEW}""" "update cards set due=:due,mod=:now,usn=:usn where id = :cid", d ) - def randomizeCards(self, did): + def randomizeCards(self, did: int) -> None: cids = self.col.db.list("select id from cards where did = ?", did) self.sortCards(cids, shuffle=True) - def orderCards(self, did): + def orderCards(self, did: int) -> None: cids = self.col.db.list("select id from cards where did = ? order by nid", did) self.sortCards(cids) - def resortConf(self, conf): + def resortConf(self, conf) -> None: for did in self.col.decks.didsForConf(conf): if conf["new"]["order"] == 0: self.randomizeCards(did) @@ -1621,7 +1657,7 @@ and due >= ? and queue = {QUEUE_TYPE_NEW}""" self.orderCards(did) # for post-import - def maybeRandomizeDeck(self, did=None): + def maybeRandomizeDeck(self, did: Optional[int] = None) -> None: if not did: did = self.col.decks.selected() conf = self.col.decks.confForDid(did) From b451f4e3f2e44ae6951fb76748be85821b4dc74a Mon Sep 17 00:00:00 2001 From: Alan Du Date: Thu, 27 Feb 2020 19:05:06 -0500 Subject: [PATCH 3/6] Monkeytype anki/rsbackend.py --- pylib/anki/rsbackend.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pylib/anki/rsbackend.py b/pylib/anki/rsbackend.py index def276f30..b5f10f8dd 100644 --- a/pylib/anki/rsbackend.py +++ b/pylib/anki/rsbackend.py @@ -178,7 +178,9 @@ def proto_progress_to_native(progress: pb.Progress) -> Progress: class RustBackend: - def __init__(self, col_path: str, media_folder_path: str, media_db_path: str): + def __init__( + self, col_path: str, media_folder_path: str, media_db_path: str + ) -> None: ftl_folder = os.path.join(anki.lang.locale_folder, "fluent") init_msg = pb.BackendInit( collection_path=col_path, @@ -340,7 +342,7 @@ class RustBackend: ) ).format_time_span - def studied_today(self, cards: int, seconds: float,) -> str: + def studied_today(self, cards: int, seconds: float) -> str: return self._run_command( pb.BackendInput( studied_today=pb.StudiedTodayIn(cards=cards, seconds=seconds) @@ -376,7 +378,7 @@ class I18nBackend: ) self._backend = ankirspy.open_i18n(init_msg.SerializeToString()) - def translate(self, key: TR, **kwargs: Union[str, int, float]): + def translate(self, key: TR, **kwargs: Union[str, int, float]) -> str: return self._backend.translate( translate_string_in(key, **kwargs).SerializeToString() ) From 2879dc1158345d0f303cfba688c63ad50108724b Mon Sep 17 00:00:00 2001 From: Alan Du Date: Thu, 27 Feb 2020 19:24:38 -0500 Subject: [PATCH 4/6] Type pylib/anki/schedv2.py --- pylib/anki/schedv2.py | 55 +++++++++++++++++++++---------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/pylib/anki/schedv2.py b/pylib/anki/schedv2.py index e876272bb..61e91e8e2 100644 --- a/pylib/anki/schedv2.py +++ b/pylib/anki/schedv2.py @@ -128,14 +128,15 @@ class Scheduler: self._restorePreviewCard(card) self._removeFromFiltered(card) - def counts(self, card: None = None) -> tuple: + def counts(self, card: Optional[Card] = None) -> Tuple[int, int, int]: counts = [self.newCount, self.lrnCount, self.revCount] if card: idx = self.countIdx(card) counts[idx] += 1 - return tuple(counts) + new, lrn, rev = counts + return (new, lrn, rev) - def dueForecast(self, days=7) -> List: + def dueForecast(self, days: int = 7) -> List[Any]: "Return counts over next DAYS. Includes today." daysd = dict( self.col.db.all( @@ -158,7 +159,7 @@ order by due""" ret = [x[1] for x in sorted(daysd.items())] return ret - def countIdx(self, card: Card) -> Any: + def countIdx(self, card: Card) -> int: if card.queue in (QUEUE_TYPE_DAY_LEARN_RELEARN, QUEUE_TYPE_PREVIEW): return 1 return card.queue @@ -179,7 +180,7 @@ order by due""" g[key][1] += cnt self.col.decks.save(g) - def extendLimits(self, new, rev) -> None: + def extendLimits(self, new: int, rev: int) -> None: cur = self.col.decks.current() parents = self.col.decks.parents(cur["id"]) children = [ @@ -193,8 +194,10 @@ order by due""" self.col.decks.save(g) def _walkingCount( - self, limFn: Optional[Callable] = None, cntFn: Optional[Callable] = None - ) -> Any: + self, + limFn: Optional[Callable[[Any], Optional[int]]] = None, + cntFn: Optional[Callable[[int, int], int]] = None, + ) -> int: tot = 0 pcounts: Dict[int, int] = {} # for each of the active decks @@ -228,7 +231,7 @@ order by due""" # Deck list ########################################################################## - def deckDueList(self) -> List[list]: + def deckDueList(self) -> List[List[Any]]: "Returns [deckname, did, rev, lrn, new]" self._checkDay() self.col.decks.checkIntegrity() @@ -270,9 +273,7 @@ order by due""" def deckDueTree(self) -> Any: return self._groupChildren(self.deckDueList()) - def _groupChildren( - self, grps: List[List] - ) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]: + def _groupChildren(self, grps: List[List[Any]]) -> Any: # first, split the group names into components for g in grps: g[0] = g[0].split("::") @@ -281,7 +282,7 @@ order by due""" # then run main function return self._groupChildrenMain(grps) - def _groupChildrenMain(self, grps: Any) -> Any: + def _groupChildrenMain(self, grps: List[List[Any]]) -> Any: tree = [] # group and recurse def key(grp): @@ -379,7 +380,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", self._newQueue: List[int] = [] self._updateNewCardRatio() - def _fillNew(self) -> Any: + def _fillNew(self) -> Optional[bool]: if self._newQueue: return True if not self.newCount: @@ -406,6 +407,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", # removed from the queue but not buried self._resetNew() return self._fillNew() + return None def _getNewCard(self) -> Optional[Card]: if self._fillNew(): @@ -423,7 +425,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", return self.newCardModulus = 0 - def _timeForNewCard(self) -> Optional[int]: + def _timeForNewCard(self) -> Optional[bool]: "True if it's time to display a new card when distributing." if not self.newCount: return False @@ -432,10 +434,10 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", elif self.col.conf["newSpread"] == NEW_CARDS_FIRST: return True elif self.newCardModulus: - return self.reps and self.reps % self.newCardModulus == 0 + return self.reps != 0 and self.reps % self.newCardModulus == 0 else: # shouldn't reach - return False + return None def _deckNewLimit( self, did: int, fn: Callable[[Dict[str, Any]], int] = None @@ -453,7 +455,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""", lim = min(rem, lim) return lim - def _newForDeck(self, did: int, lim: int) -> Any: + def _newForDeck(self, did: int, lim: int) -> int: "New count for a single deck." if not lim: return 0 @@ -466,14 +468,14 @@ select count() from lim, ) - def _deckNewLimitSingle(self, g: Dict[str, Any]) -> Any: + def _deckNewLimitSingle(self, g: Dict[str, Any]) -> int: "Limit for deck without parent limits." if g["dyn"]: return self.dynReportLimit c = self.col.decks.confForDid(g["id"]) return max(0, c["new"]["perDay"] - g["newToday"][1]) - def totalNewForCurrentDeck(self) -> Any: + def totalNewForCurrentDeck(self) -> int: return self.col.db.scalar( f""" select count() from cards where id in ( @@ -533,7 +535,7 @@ select count() from cards where did in %s and queue = {QUEUE_TYPE_PREVIEW} self._lrnDids = self.col.decks.active()[:] # sub-day learning - def _fillLrn(self) -> Any: + def _fillLrn(self) -> Union[bool, List[Any]]: if not self.lrnCount: return False if self._lrnQueue: @@ -745,10 +747,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", return tot + tod * 1000 def _leftToday( - self, - delays: Union[List[int], List[Union[float, int]]], - left: int, - now: Optional[int] = None, + self, delays: List[int], left: int, now: Optional[int] = None, ) -> int: "The number of steps that can be completed by the day cutoff." if not now: @@ -803,7 +802,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", else: ivl = -self._delayForGrade(conf, card.left) - def log(): + def log() -> None: self.col.db.execute( "insert into revlog values (?,?,?,?,?,?,?,?,?)", int(time.time() * 1000), @@ -824,7 +823,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", time.sleep(0.01) log() - def _lrnForDeck(self, did: int) -> Any: + def _lrnForDeck(self, did: int) -> int: cnt = ( self.col.db.scalar( f""" @@ -849,13 +848,13 @@ and due <= ? limit ?)""", # Reviews ########################################################################## - def _currentRevLimit(self) -> Any: + def _currentRevLimit(self) -> int: d = self.col.decks.get(self.col.decks.selected(), default=False) return self._deckRevLimitSingle(d) def _deckRevLimitSingle( self, d: Dict[str, Any], parentLimit: Optional[int] = None - ) -> Any: + ) -> int: # invalid deck selected? if not d: return 0 From f8af9c509be1cf0bc94296e1ac234ff763a92607 Mon Sep 17 00:00:00 2001 From: Alan Du Date: Thu, 27 Feb 2020 19:27:23 -0500 Subject: [PATCH 5/6] Monkeytype pylib/anki/importing/supermemo_xml.py --- pylib/anki/importing/supermemo_xml.py | 45 ++++++++++++++------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/pylib/anki/importing/supermemo_xml.py b/pylib/anki/importing/supermemo_xml.py index 07ddb1c4c..a326daf7d 100644 --- a/pylib/anki/importing/supermemo_xml.py +++ b/pylib/anki/importing/supermemo_xml.py @@ -8,8 +8,11 @@ import sys import time import unicodedata from string import capwords +from typing import List, Optional, Union from xml.dom import minidom +from xml.dom.minidom import Element, Text +from anki.collection import _Collection from anki.importing.noteimp import ForeignCard, ForeignNote, NoteImporter from anki.lang import _, ngettext from anki.stdmodels import addBasicModel @@ -26,7 +29,7 @@ class SmartDict(dict): x.get('first_name'). """ - def __init__(self, *a, **kw): + def __init__(self, *a, **kw) -> None: if a: if isinstance(type(a[0]), dict): kw.update(a[0]) @@ -42,7 +45,7 @@ class SmartDict(dict): class SuperMemoElement(SmartDict): "SmartDict wrapper to store SM Element data" - def __init__(self, *a, **kw): + def __init__(self, *a, **kw) -> None: SmartDict.__init__(self, *a, **kw) # default content self.__dict__["lTitle"] = None @@ -80,7 +83,7 @@ class SupermemoXmlImporter(NoteImporter): Code should be upgrade to support importing of SM2006 exports. """ - def __init__(self, col, file): + def __init__(self, col: _Collection, file: str) -> None: """Initialize internal varables. Pameters to be exposed to GUI are stored in self.META""" NoteImporter.__init__(self, col, file) @@ -124,13 +127,13 @@ class SupermemoXmlImporter(NoteImporter): ## TOOLS - def _fudgeText(self, text): + def _fudgeText(self, text: str) -> str: "Replace sm syntax to Anki syntax" text = text.replace("\n\r", "
") text = text.replace("\n", "
") return text - def _unicode2ascii(self, str): + def _unicode2ascii(self, str: str) -> str: "Remove diacritic punctuation from strings (titles)" return "".join( [ @@ -140,7 +143,7 @@ class SupermemoXmlImporter(NoteImporter): ] ) - def _decode_htmlescapes(self, s): + def _decode_htmlescapes(self, s: str) -> str: """Unescape HTML code.""" # In case of bad formated html you can import MinimalSoup etc.. see btflsoup source code from bs4 import BeautifulSoup as btflsoup @@ -153,7 +156,7 @@ class SupermemoXmlImporter(NoteImporter): return str(btflsoup(s, "html.parser")) - def _afactor2efactor(self, af): + def _afactor2efactor(self, af: float) -> float: # Adapted from # Ranges for A-factors and E-factors @@ -177,7 +180,7 @@ class SupermemoXmlImporter(NoteImporter): ## DEFAULT IMPORTER METHODS - def foreignNotes(self): + def foreignNotes(self) -> List[ForeignNote]: # Load file and parse it by minidom self.loadSource(self.file) @@ -195,12 +198,12 @@ class SupermemoXmlImporter(NoteImporter): ) return self.notes - def fields(self): + def fields(self) -> int: return 2 ## PARSER METHODS - def addItemToCards(self, item): + def addItemToCards(self, item: SuperMemoElement) -> None: "This method actually do conversion" # new anki card @@ -274,7 +277,7 @@ class SupermemoXmlImporter(NoteImporter): self.notes.append(note) - def logger(self, text, level=1): + def logger(self, text: str, level: int = 1) -> None: "Wrapper for Anki logger" dLevels = {0: "", 1: "Info", 2: "Verbose", 3: "Debug"} @@ -316,7 +319,7 @@ class SupermemoXmlImporter(NoteImporter): return io.StringIO(str(source)) - def loadSource(self, source): + def loadSource(self, source: str) -> None: """Load source file and parse with xml.dom.minidom""" self.source = source self.logger("Load started...") @@ -326,7 +329,7 @@ class SupermemoXmlImporter(NoteImporter): self.logger("Load done.") # PARSE - def parse(self, node=None): + def parse(self, node: Optional[Union[Text, Element]] = None) -> None: "Parse method - parses document elements" if node is None and self.xmldoc is not None: @@ -344,7 +347,7 @@ class SupermemoXmlImporter(NoteImporter): self.parse(node.documentElement) - def parse_Element(self, node): + def parse_Element(self, node: Element) -> None: "Parse XML element" _method = "do_%s" % node.tagName @@ -355,7 +358,7 @@ class SupermemoXmlImporter(NoteImporter): self.logger("No handler for method %s" % _method, level=3) # print traceback.print_exc() - def parse_Text(self, node): + def parse_Text(self, node: Text) -> None: "Parse text inside elements. Text is stored into local buffer." text = node.data @@ -368,13 +371,13 @@ class SupermemoXmlImporter(NoteImporter): # pass # DO - def do_SuperMemoCollection(self, node): + def do_SuperMemoCollection(self, node: Element) -> None: "Process SM Collection" for child in node.childNodes: self.parse(child) - def do_SuperMemoElement(self, node): + def do_SuperMemoElement(self, node: Element) -> None: "Process SM Element (Type - Title,Topics)" self.logger("=" * 45, level=3) @@ -426,14 +429,14 @@ class SupermemoXmlImporter(NoteImporter): t = self.cntMeta["title"].pop() self.logger("End of topic \t- %s" % (t), level=2) - def do_Content(self, node): + def do_Content(self, node: Element) -> None: "Process SM element Content" for child in node.childNodes: if hasattr(child, "tagName") and child.firstChild is not None: self.cntElm[-1][child.tagName] = child.firstChild.data - def do_LearningData(self, node): + def do_LearningData(self, node: Element) -> None: "Process SM element LearningData" for child in node.childNodes: @@ -450,7 +453,7 @@ class SupermemoXmlImporter(NoteImporter): # for child in node.childNodes: self.parse(child) # self.cntElm[-1][node.tagName]=self.cntBuf.pop() - def do_Title(self, node): + def do_Title(self, node: Element) -> None: "Process SM element Title" t = self._decode_htmlescapes(node.firstChild.data) @@ -459,7 +462,7 @@ class SupermemoXmlImporter(NoteImporter): self.cntElm[-1]["lTitle"] = self.cntMeta["title"] self.logger("Start of topic \t- " + " / ".join(self.cntMeta["title"]), level=2) - def do_Type(self, node): + def do_Type(self, node: Element) -> None: "Process SM element Type" if len(self.cntBuf) >= 1: From 7c971837ffb4d3d591021a7cd9f270130e6489ac Mon Sep 17 00:00:00 2001 From: Alan Du Date: Thu, 27 Feb 2020 21:27:59 -0500 Subject: [PATCH 6/6] Add some more types --- pylib/anki/sched.py | 8 ++++---- pylib/anki/schedv2.py | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pylib/anki/sched.py b/pylib/anki/sched.py index d13d4154d..8e721fee0 100644 --- a/pylib/anki/sched.py +++ b/pylib/anki/sched.py @@ -639,7 +639,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""", card.queue = QUEUE_TYPE_DAY_LEARN_RELEARN self._logLrn(card, ease, conf, leaving, type, lastLeft) - def _delayForGrade(self, conf: Dict[str, Any], left: int) -> Union[int, float]: + def _delayForGrade(self, conf: Dict[str, Any], left: int) -> float: left = left % 1000 try: delay = conf["delays"][-left] @@ -923,7 +923,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l self._rescheduleRev(card, ease) self._logRev(card, ease, delay) - def _rescheduleLapse(self, card: Card) -> Union[int, float]: + def _rescheduleLapse(self, card: Card) -> float: conf = self._lapseConf(card) card.lastIvl = card.ivl if self._resched(card): @@ -977,7 +977,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l card.odid = 0 card.odue = 0 - def _logRev(self, card: Card, ease: int, delay: Union[int, float]) -> None: + def _logRev(self, card: Card, ease: int, delay: float) -> None: def log(): self.col.db.execute( "insert into revlog values (?,?,?,?,?,?,?,?,?)", @@ -1409,7 +1409,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" return self._nextRevIvl(card, ease) * 86400 # this isn't easily extracted from the learn code - def _nextLrnIvl(self, card: Card, ease: int) -> Union[int, float]: + def _nextLrnIvl(self, card: Card, ease: int) -> float: if card.queue == 0: card.left = self._startingLeft(card) conf = self._lrnConf(card) diff --git a/pylib/anki/schedv2.py b/pylib/anki/schedv2.py index 61e91e8e2..142d2eef7 100644 --- a/pylib/anki/schedv2.py +++ b/pylib/anki/schedv2.py @@ -992,7 +992,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l ivl = max(1, conf["minInt"], int(card.ivl * conf["mult"])) return ivl - def _rescheduleRev(self, card: Card, ease: int, early: Union[bool, int]) -> None: + def _rescheduleRev(self, card: Card, ease: int, early: bool) -> None: # update interval card.lastIvl = card.ivl if early: @@ -1075,7 +1075,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l return [ivl - fuzz, ivl + fuzz] def _constrainedIvl( - self, ivl: Union[int, float], conf: Dict[str, Any], prev: int, fuzz: bool + self, ivl: float, conf: Dict[str, Any], prev: int, fuzz: bool ) -> int: ivl = int(ivl * conf.get("ivlFct", 1)) if fuzz: