Merge pull request #469 from alanhdu/monkeytype

Monkeytype the rest of pylib [2/n]
This commit is contained in:
Damien Elmes 2020-02-28 17:25:39 +10:00 committed by GitHub
commit 6828b98c99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 267 additions and 214 deletions

View file

@ -1,10 +1,11 @@
# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import re
import sre_constants
import unicodedata
from typing import Any, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Any, List, Optional, Set, Tuple, Union, cast
from anki import hooks
from anki.consts import *
@ -18,12 +19,15 @@ from anki.utils import (
stripHTMLMedia,
)
if TYPE_CHECKING:
from anki.collection import _Collection
# Find
##########################################################################
class Finder:
def __init__(self, col) -> None:
def __init__(self, col: Optional[_Collection]) -> None:
self.col = col
self.search = dict(
added=self._findAdded,
@ -42,7 +46,7 @@ class Finder:
self.search["is"] = self._findCardState
hooks.search_terms_prepared(self.search)
def findCards(self, query, order=False) -> Any:
def findCards(self, query: str, order: Union[bool, str] = False) -> List[Any]:
"Return a list of card ids for QUERY."
tokens = self._tokenize(query)
preds, args = self._where(tokens)
@ -59,7 +63,7 @@ class Finder:
res.reverse()
return res
def findNotes(self, query) -> Any:
def findNotes(self, query: str) -> List[Any]:
tokens = self._tokenize(query)
preds, args = self._where(tokens)
if preds is None:
@ -83,8 +87,8 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
# Tokenizing
######################################################################
def _tokenize(self, query) -> List:
inQuote = False
def _tokenize(self, query: str) -> List[str]:
inQuote: Union[bool, str] = False
tokens = []
token = ""
for c in query:
@ -137,7 +141,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
# Query building
######################################################################
def _where(self, tokens) -> Tuple[Any, Optional[List[str]]]:
def _where(self, tokens: List[str]) -> Tuple[str, Optional[List[str]]]:
# state and query
s: Dict[str, Any] = dict(isnot=False, isor=False, join=False, q="", bad=False)
args: List[Any] = []
@ -197,7 +201,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
return None, None
return s["q"], args
def _query(self, preds, order) -> str:
def _query(self, preds: str, order: str) -> str:
# can we skip the note table?
if "n." not in preds and "n." not in order:
sql = "select c.id from cards c where "
@ -216,12 +220,12 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
# Ordering
######################################################################
def _order(self, order) -> Tuple[Any, Any]:
def _order(self, order: Union[bool, str]) -> Tuple[str, bool]:
if not order:
return "", False
elif order is not True:
# custom order string provided
return " order by " + order, False
return " order by " + cast(str, order), False
# use deck default
type = self.col.conf["sortType"]
sort = None
@ -253,8 +257,8 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
# Commands
######################################################################
def _findTag(self, args) -> str:
(val, args) = args
def _findTag(self, args: Tuple[str, List[Any]]) -> str:
(val, list_args) = args
if val == "none":
return 'n.tags = ""'
val = val.replace("*", "%")
@ -262,11 +266,11 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
val = "% " + val
if not val.endswith("%") or val.endswith("\\%"):
val += " %"
args.append(val)
list_args.append(val)
return "n.tags like ? escape '\\'"
def _findCardState(self, args) -> Optional[str]:
(val, args) = args
def _findCardState(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
if val in ("review", "new", "learn"):
if val == "review":
n = 2
@ -290,17 +294,16 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
# unknown
return None
def _findFlag(self, args) -> Optional[str]:
(val, args) = args
def _findFlag(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
if not val or len(val) != 1 or val not in "01234":
return None
val = int(val)
mask = 2 ** 3 - 1
return "(c.flags & %d) == %d" % (mask, val)
return "(c.flags & %d) == %d" % (mask, int(val))
def _findRated(self, args) -> Optional[str]:
def _findRated(self, args: Tuple[str, List[Any]]) -> Optional[str]:
# days(:optional_ease)
(val, args) = args
(val, __) = args
r = val.split(":")
try:
days = int(r[0])
@ -316,8 +319,8 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000
return "c.id in (select cid from revlog where id>%d %s)" % (cutoff, ease)
def _findAdded(self, args) -> Optional[str]:
(val, args) = args
def _findAdded(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
try:
days = int(val)
except ValueError:
@ -325,20 +328,20 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
cutoff = (self.col.sched.dayCutoff - 86400 * days) * 1000
return "c.id > %d" % cutoff
def _findProp(self, args) -> Optional[str]:
def _findProp(self, args: Tuple[str, List[Any]]) -> Optional[str]:
# extract
(val, args) = args
m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", val)
(strval, __) = args
m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", strval)
if not m:
return None
prop, cmp, val = m.groups()
prop, cmp, strval = m.groups()
prop = prop.lower() # pytype: disable=attribute-error
# is val valid?
try:
if prop == "ease":
val = float(val)
val = float(strval)
else:
val = int(val)
val = int(strval)
except ValueError:
return None
# is prop valid?
@ -356,32 +359,32 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
q.append("(%s %s %s)" % (prop, cmp, val))
return " and ".join(q)
def _findText(self, val, args) -> str:
def _findText(self, val: str, args: List[str]) -> str:
val = val.replace("*", "%")
args.append("%" + val + "%")
args.append("%" + val + "%")
return "(n.sfld like ? escape '\\' or n.flds like ? escape '\\')"
def _findNids(self, args) -> Optional[str]:
(val, args) = args
def _findNids(self, args: Tuple[str, List[Any]]) -> Optional[str]:
(val, __) = args
if re.search("[^0-9,]", val):
return None
return "n.id in (%s)" % val
def _findCids(self, args) -> Optional[str]:
(val, args) = args
(val, __) = args
if re.search("[^0-9,]", val):
return None
return "c.id in (%s)" % val
def _findMid(self, args) -> Optional[str]:
(val, args) = args
(val, __) = args
if re.search("[^0-9]", val):
return None
return "n.mid = %s" % val
def _findModel(self, args) -> str:
(val, args) = args
def _findModel(self, args: Tuple[str, List[Any]]) -> str:
(val, __) = args
ids = []
val = val.lower()
for m in self.col.models.all():
@ -389,9 +392,9 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
ids.append(m["id"])
return "n.mid in %s" % ids2str(ids)
def _findDeck(self, args) -> Optional[str]:
def _findDeck(self, args: Tuple[str, List[Any]]) -> Optional[str]:
# if searching for all decks, skip
(val, args) = args
(val, __) = args
if val == "*":
return "skip"
# deck types
@ -422,9 +425,9 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
sids = ids2str(ids)
return "c.did in %s or c.odid in %s" % (sids, sids)
def _findTemplate(self, args) -> str:
def _findTemplate(self, args: Tuple[str, List[Any]]) -> str:
# were we given an ordinal number?
(val, args) = args
(val, __) = args
try:
num = int(val) - 1
except:
@ -445,7 +448,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """
lims.append("(n.mid = %s and c.ord = %s)" % (m["id"], t["ord"]))
return " or ".join(lims)
def _findField(self, field, val) -> Optional[str]:
def _findField(self, field: str, val: str) -> Optional[str]:
field = field.lower()
val = val.replace("*", "%")
# find models that have that field
@ -481,7 +484,7 @@ where mid in %s and flds like ? escape '\\'"""
def _findDupes(self, args) -> Optional[str]:
# caller must call stripHTMLMedia on passed val
(val, args) = args
(val, __) = args
try:
mid, val = val.split(",", 1)
except OSError:
@ -500,9 +503,17 @@ where mid in %s and flds like ? escape '\\'"""
##########################################################################
def findReplace(col, nids, src, dst, regex=False, field=None, fold=True) -> int:
def findReplace(
col: _Collection,
nids: List[int],
src: str,
dst: str,
regex: bool = False,
field: Optional[str] = None,
fold: bool = True,
) -> int:
"Find and replace fields in a note."
mmap = {}
mmap: Dict[str, Any] = {}
if field:
for m in col.models.all():
for f in m["flds"]:
@ -516,10 +527,10 @@ def findReplace(col, nids, src, dst, regex=False, field=None, fold=True) -> int:
dst = dst.replace("\\", "\\\\")
if fold:
src = "(?i)" + src
regex = re.compile(src)
compiled_re = re.compile(src)
def repl(str):
return re.sub(regex, dst, str)
def repl(s: str):
return compiled_re.sub(dst, s)
d = []
snids = ids2str(nids)
@ -577,7 +588,9 @@ def fieldNamesForNotes(col, nids) -> List:
# Find duplicates
##########################################################################
# returns array of ("dupestr", [nids])
def findDupes(col, fieldName, search="") -> List[Tuple[Any, List]]:
def findDupes(
col: _Collection, fieldName: str, search: str = ""
) -> List[Tuple[Any, List]]:
# limit search to notes with applicable field name
if search:
search = "(" + search + ") "

View file

@ -8,8 +8,11 @@ import sys
import time
import unicodedata
from string import capwords
from typing import List, Optional, Union
from xml.dom import minidom
from xml.dom.minidom import Element, Text
from anki.collection import _Collection
from anki.importing.noteimp import ForeignCard, ForeignNote, NoteImporter
from anki.lang import _, ngettext
from anki.stdmodels import addBasicModel
@ -26,7 +29,7 @@ class SmartDict(dict):
x.get('first_name').
"""
def __init__(self, *a, **kw):
def __init__(self, *a, **kw) -> None:
if a:
if isinstance(type(a[0]), dict):
kw.update(a[0])
@ -42,7 +45,7 @@ class SmartDict(dict):
class SuperMemoElement(SmartDict):
"SmartDict wrapper to store SM Element data"
def __init__(self, *a, **kw):
def __init__(self, *a, **kw) -> None:
SmartDict.__init__(self, *a, **kw)
# default content
self.__dict__["lTitle"] = None
@ -80,7 +83,7 @@ class SupermemoXmlImporter(NoteImporter):
Code should be upgrade to support importing of SM2006 exports.
"""
def __init__(self, col, file):
def __init__(self, col: _Collection, file: str) -> None:
"""Initialize internal varables.
Pameters to be exposed to GUI are stored in self.META"""
NoteImporter.__init__(self, col, file)
@ -124,13 +127,13 @@ class SupermemoXmlImporter(NoteImporter):
## TOOLS
def _fudgeText(self, text):
def _fudgeText(self, text: str) -> str:
"Replace sm syntax to Anki syntax"
text = text.replace("\n\r", "<br>")
text = text.replace("\n", "<br>")
return text
def _unicode2ascii(self, str):
def _unicode2ascii(self, str: str) -> str:
"Remove diacritic punctuation from strings (titles)"
return "".join(
[
@ -140,7 +143,7 @@ class SupermemoXmlImporter(NoteImporter):
]
)
def _decode_htmlescapes(self, s):
def _decode_htmlescapes(self, s: str) -> str:
"""Unescape HTML code."""
# In case of bad formated html you can import MinimalSoup etc.. see btflsoup source code
from bs4 import BeautifulSoup as btflsoup
@ -153,7 +156,7 @@ class SupermemoXmlImporter(NoteImporter):
return str(btflsoup(s, "html.parser"))
def _afactor2efactor(self, af):
def _afactor2efactor(self, af: float) -> float:
# Adapted from <http://www.supermemo.com/beta/xml/xml-core.htm>
# Ranges for A-factors and E-factors
@ -177,7 +180,7 @@ class SupermemoXmlImporter(NoteImporter):
## DEFAULT IMPORTER METHODS
def foreignNotes(self):
def foreignNotes(self) -> List[ForeignNote]:
# Load file and parse it by minidom
self.loadSource(self.file)
@ -195,12 +198,12 @@ class SupermemoXmlImporter(NoteImporter):
)
return self.notes
def fields(self):
def fields(self) -> int:
return 2
## PARSER METHODS
def addItemToCards(self, item):
def addItemToCards(self, item: SuperMemoElement) -> None:
"This method actually do conversion"
# new anki card
@ -274,7 +277,7 @@ class SupermemoXmlImporter(NoteImporter):
self.notes.append(note)
def logger(self, text, level=1):
def logger(self, text: str, level: int = 1) -> None:
"Wrapper for Anki logger"
dLevels = {0: "", 1: "Info", 2: "Verbose", 3: "Debug"}
@ -316,7 +319,7 @@ class SupermemoXmlImporter(NoteImporter):
return io.StringIO(str(source))
def loadSource(self, source):
def loadSource(self, source: str) -> None:
"""Load source file and parse with xml.dom.minidom"""
self.source = source
self.logger("Load started...")
@ -326,7 +329,7 @@ class SupermemoXmlImporter(NoteImporter):
self.logger("Load done.")
# PARSE
def parse(self, node=None):
def parse(self, node: Optional[Union[Text, Element]] = None) -> None:
"Parse method - parses document elements"
if node is None and self.xmldoc is not None:
@ -344,7 +347,7 @@ class SupermemoXmlImporter(NoteImporter):
self.parse(node.documentElement)
def parse_Element(self, node):
def parse_Element(self, node: Element) -> None:
"Parse XML element"
_method = "do_%s" % node.tagName
@ -355,7 +358,7 @@ class SupermemoXmlImporter(NoteImporter):
self.logger("No handler for method %s" % _method, level=3)
# print traceback.print_exc()
def parse_Text(self, node):
def parse_Text(self, node: Text) -> None:
"Parse text inside elements. Text is stored into local buffer."
text = node.data
@ -368,13 +371,13 @@ class SupermemoXmlImporter(NoteImporter):
# pass
# DO
def do_SuperMemoCollection(self, node):
def do_SuperMemoCollection(self, node: Element) -> None:
"Process SM Collection"
for child in node.childNodes:
self.parse(child)
def do_SuperMemoElement(self, node):
def do_SuperMemoElement(self, node: Element) -> None:
"Process SM Element (Type - Title,Topics)"
self.logger("=" * 45, level=3)
@ -426,14 +429,14 @@ class SupermemoXmlImporter(NoteImporter):
t = self.cntMeta["title"].pop()
self.logger("End of topic \t- %s" % (t), level=2)
def do_Content(self, node):
def do_Content(self, node: Element) -> None:
"Process SM element Content"
for child in node.childNodes:
if hasattr(child, "tagName") and child.firstChild is not None:
self.cntElm[-1][child.tagName] = child.firstChild.data
def do_LearningData(self, node):
def do_LearningData(self, node: Element) -> None:
"Process SM element LearningData"
for child in node.childNodes:
@ -450,7 +453,7 @@ class SupermemoXmlImporter(NoteImporter):
# for child in node.childNodes: self.parse(child)
# self.cntElm[-1][node.tagName]=self.cntBuf.pop()
def do_Title(self, node):
def do_Title(self, node: Element) -> None:
"Process SM element Title"
t = self._decode_htmlescapes(node.firstChild.data)
@ -459,7 +462,7 @@ class SupermemoXmlImporter(NoteImporter):
self.cntElm[-1]["lTitle"] = self.cntMeta["title"]
self.logger("Start of topic \t- " + " / ".join(self.cntMeta["title"]), level=2)
def do_Type(self, node):
def do_Type(self, node: Element) -> None:
"Process SM element Type"
if len(self.cntBuf) >= 1:

View file

@ -178,7 +178,9 @@ def proto_progress_to_native(progress: pb.Progress) -> Progress:
class RustBackend:
def __init__(self, col_path: str, media_folder_path: str, media_db_path: str):
def __init__(
self, col_path: str, media_folder_path: str, media_db_path: str
) -> None:
ftl_folder = os.path.join(anki.lang.locale_folder, "fluent")
init_msg = pb.BackendInit(
collection_path=col_path,
@ -340,7 +342,7 @@ class RustBackend:
)
).format_time_span
def studied_today(self, cards: int, seconds: float,) -> str:
def studied_today(self, cards: int, seconds: float) -> str:
return self._run_command(
pb.BackendInput(
studied_today=pb.StudiedTodayIn(cards=cards, seconds=seconds)
@ -376,7 +378,7 @@ class I18nBackend:
)
self._backend = ankirspy.open_i18n(init_msg.SerializeToString())
def translate(self, key: TR, **kwargs: Union[str, int, float]):
def translate(self, key: TR, **kwargs: Union[str, int, float]) -> str:
return self._backend.translate(
translate_string_in(key, **kwargs).SerializeToString()
)

View file

@ -8,7 +8,7 @@ import random
import time
from heapq import *
from operator import itemgetter
from typing import List, Optional, Set
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import anki
from anki import hooks
@ -56,14 +56,14 @@ class Scheduler:
return card
return None
def reset(self):
def reset(self) -> None:
self._updateCutoff()
self._resetLrn()
self._resetRev()
self._resetNew()
self._haveQueues = True
def answerCard(self, card, ease):
def answerCard(self, card: Card, ease: int) -> None:
self.col.log()
assert 1 <= ease <= 4
self.col.markReview(card)
@ -71,7 +71,7 @@ class Scheduler:
self._burySiblings(card)
card.reps += 1
# former is for logging new cards, latter also covers filt. decks
card.wasNew = card.type == CARD_TYPE_NEW
card.wasNew = card.type == CARD_TYPE_NEW # type: ignore
wasNewQ = card.queue == QUEUE_TYPE_NEW
if wasNewQ:
# came from the new queue, move to learning
@ -102,7 +102,7 @@ class Scheduler:
card.usn = self.col.usn()
card.flushSched()
def counts(self, card=None):
def counts(self, card: Optional[Card] = None) -> Tuple[int, int, int]:
counts = [self.newCount, self.lrnCount, self.revCount]
if card:
idx = self.countIdx(card)
@ -110,9 +110,11 @@ class Scheduler:
counts[1] += card.left // 1000
else:
counts[idx] += 1
return tuple(counts)
def dueForecast(self, days=7):
new, lrn, rev = counts
return (new, lrn, rev)
def dueForecast(self, days: int = 7) -> List[Any]:
"Return counts over next DAYS. Includes today."
daysd = dict(
self.col.db.all(
@ -135,12 +137,12 @@ order by due"""
ret = [x[1] for x in sorted(daysd.items())]
return ret
def countIdx(self, card):
def countIdx(self, card: Card) -> int:
if card.queue == QUEUE_TYPE_DAY_LEARN_RELEARN:
return 1
return card.queue
def answerButtons(self, card):
def answerButtons(self, card: Card) -> int:
if card.odue:
# normal review in dyn deck?
if card.odid and card.queue == QUEUE_TYPE_REV:
@ -154,7 +156,7 @@ order by due"""
else:
return 3
def unburyCards(self):
def unburyCards(self) -> None:
"Unbury cards."
self.col.conf["lastUnburied"] = self.today
self.col.log(
@ -166,7 +168,7 @@ order by due"""
f"update cards set queue=type where queue = {QUEUE_TYPE_SIBLING_BURIED}"
)
def unburyCardsForDeck(self):
def unburyCardsForDeck(self) -> None:
sids = ids2str(self.col.decks.active())
self.col.log(
self.col.db.list(
@ -184,14 +186,14 @@ order by due"""
# Rev/lrn/time daily stats
##########################################################################
def _updateStats(self, card, type, cnt=1):
def _updateStats(self, card: Card, type: str, cnt: int = 1) -> None:
key = type + "Today"
for g in [self.col.decks.get(card.did)] + self.col.decks.parents(card.did):
# add
g[key][1] += cnt
self.col.decks.save(g)
def extendLimits(self, new, rev):
def extendLimits(self, new: int, rev: int) -> None:
cur = self.col.decks.current()
parents = self.col.decks.parents(cur["id"])
children = [
@ -204,7 +206,11 @@ order by due"""
g["revToday"][1] -= rev
self.col.decks.save(g)
def _walkingCount(self, limFn=None, cntFn=None):
def _walkingCount(
self,
limFn: Optional[Callable[[Any], Optional[int]]] = None,
cntFn: Optional[Callable[[int, int], int]] = None,
) -> int:
tot = 0
pcounts: Dict[int, int] = {}
# for each of the active decks
@ -238,7 +244,7 @@ order by due"""
# Deck list
##########################################################################
def deckDueList(self):
def deckDueList(self) -> List[List[Any]]:
"Returns [deckname, did, rev, lrn, new]"
self._checkDay()
self.col.decks.checkIntegrity()
@ -274,10 +280,10 @@ order by due"""
lims[deck["name"]] = [nlim, rlim]
return data
def deckDueTree(self):
def deckDueTree(self) -> Any:
return self._groupChildren(self.deckDueList())
def _groupChildren(self, grps):
def _groupChildren(self, grps: List[List[Any]]) -> Any:
# first, split the group names into components
for g in grps:
g[0] = g[0].split("::")
@ -286,7 +292,7 @@ order by due"""
# then run main function
return self._groupChildrenMain(grps)
def _groupChildrenMain(self, grps):
def _groupChildrenMain(self, grps: List[List[Any]]) -> Any:
tree = []
# group and recurse
def key(grp):
@ -328,7 +334,7 @@ order by due"""
# Getting the next card
##########################################################################
def _getCard(self):
def _getCard(self) -> Optional[Card]:
"Return the next due card id, or None."
# learning card due?
c = self._getLrnCard()
@ -357,7 +363,7 @@ order by due"""
# New cards
##########################################################################
def _resetNewCount(self):
def _resetNewCount(self) -> None:
cntFn = lambda did, lim: self.col.db.scalar(
f"""
select count() from (select 1 from cards where
@ -367,13 +373,13 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
)
self.newCount = self._walkingCount(self._deckNewLimitSingle, cntFn)
def _resetNew(self):
def _resetNew(self) -> None:
self._resetNewCount()
self._newDids = self.col.decks.active()[:]
self._newQueue = []
self._newQueue: List[Any] = []
self._updateNewCardRatio()
def _fillNew(self):
def _fillNew(self) -> Optional[bool]:
if self._newQueue:
return True
if not self.newCount:
@ -400,13 +406,15 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
# removed from the queue but not buried
self._resetNew()
return self._fillNew()
return None
def _getNewCard(self):
def _getNewCard(self) -> Optional[Card]:
if self._fillNew():
self.newCount -= 1
return self.col.getCard(self._newQueue.pop())
return None
def _updateNewCardRatio(self):
def _updateNewCardRatio(self) -> None:
if self.col.conf["newSpread"] == NEW_CARDS_DISTRIBUTE:
if self.newCount:
self.newCardModulus = (self.newCount + self.revCount) // self.newCount
@ -416,7 +424,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
return
self.newCardModulus = 0
def _timeForNewCard(self):
def _timeForNewCard(self) -> Optional[bool]:
"True if it's time to display a new card when distributing."
if not self.newCount:
return False
@ -425,9 +433,12 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
elif self.col.conf["newSpread"] == NEW_CARDS_FIRST:
return True
elif self.newCardModulus:
return self.reps and self.reps % self.newCardModulus == 0
return self.reps != 0 and self.reps % self.newCardModulus == 0
return None
def _deckNewLimit(self, did, fn=None):
def _deckNewLimit(
self, did: int, fn: Optional[Callable[[Dict[str, Any]], int]] = None
) -> int:
if not fn:
fn = self._deckNewLimitSingle
sel = self.col.decks.get(did)
@ -441,7 +452,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
lim = min(rem, lim)
return lim
def _newForDeck(self, did, lim):
def _newForDeck(self, did: int, lim: int) -> int:
"New count for a single deck."
if not lim:
return 0
@ -454,14 +465,14 @@ select count() from
lim,
)
def _deckNewLimitSingle(self, g):
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> int:
"Limit for deck without parent limits."
if g["dyn"]:
return self.reportLimit
c = self.col.decks.confForDid(g["id"])
return max(0, c["new"]["perDay"] - g["newToday"][1])
def totalNewForCurrentDeck(self):
def totalNewForCurrentDeck(self) -> int:
return self.col.db.scalar(
f"""
select count() from cards where id in (
@ -473,7 +484,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_NEW} limit ?)"""
# Learning queues
##########################################################################
def _resetLrnCount(self):
def _resetLrnCount(self) -> None:
# sub-day
self.lrnCount = (
self.col.db.scalar(
@ -494,14 +505,14 @@ and due <= ? limit %d"""
self.today,
)
def _resetLrn(self):
def _resetLrn(self) -> None:
self._resetLrnCount()
self._lrnQueue = []
self._lrnDayQueue = []
self._lrnQueue: List[Any] = []
self._lrnDayQueue: List[Any] = []
self._lrnDids = self.col.decks.active()[:]
# sub-day learning
def _fillLrn(self):
def _fillLrn(self) -> Union[bool, List[Any]]:
if not self.lrnCount:
return False
if self._lrnQueue:
@ -518,7 +529,7 @@ limit %d"""
self._lrnQueue.sort()
return self._lrnQueue
def _getLrnCard(self, collapse=False):
def _getLrnCard(self, collapse: bool = False) -> Optional[Card]:
if self._fillLrn():
cutoff = time.time()
if collapse:
@ -528,9 +539,10 @@ limit %d"""
card = self.col.getCard(id)
self.lrnCount -= card.left // 1000
return card
return None
# daily learning
def _fillLrnDay(self):
def _fillLrnDay(self) -> Optional[bool]:
if not self.lrnCount:
return False
if self._lrnDayQueue:
@ -557,16 +569,18 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
return True
# nothing left in the deck; move to next
self._lrnDids.pop(0)
return None
def _getLrnDayCard(self):
def _getLrnDayCard(self) -> Optional[Card]:
if self._fillLrnDay():
self.lrnCount -= 1
return self.col.getCard(self._lrnDayQueue.pop())
return None
def _answerLrnCard(self, card, ease):
def _answerLrnCard(self, card: Card, ease: int) -> None:
# ease 1=no, 2=yes, 3=remove
conf = self._lrnConf(card)
if card.odid and not card.wasNew:
if card.odid and not card.wasNew: # type: ignore
type = REVLOG_CRAM
elif card.type == CARD_TYPE_REV:
type = REVLOG_RELRN
@ -625,7 +639,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
card.queue = QUEUE_TYPE_DAY_LEARN_RELEARN
self._logLrn(card, ease, conf, leaving, type, lastLeft)
def _delayForGrade(self, conf, left):
def _delayForGrade(self, conf: Dict[str, Any], left: int) -> float:
left = left % 1000
try:
delay = conf["delays"][-left]
@ -637,13 +651,13 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
delay = 1
return delay * 60
def _lrnConf(self, card):
def _lrnConf(self, card: Card) -> Dict[str, Any]:
if card.type == CARD_TYPE_REV:
return self._lapseConf(card)
else:
return self._newConf(card)
def _rescheduleAsRev(self, card, conf, early):
def _rescheduleAsRev(self, card: Card, conf: Dict[str, Any], early: bool) -> None:
lapse = card.type == CARD_TYPE_REV
if lapse:
if self._resched(card):
@ -666,7 +680,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
card.queue = card.type = CARD_TYPE_NEW
card.due = self.col.nextID("pos")
def _startingLeft(self, card):
def _startingLeft(self, card: Card) -> int:
if card.type == CARD_TYPE_REV:
conf = self._lapseConf(card)
else:
@ -675,7 +689,9 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
tod = self._leftToday(conf["delays"], tot)
return tot + tod * 1000
def _leftToday(self, delays, left, now=None):
def _leftToday(
self, delays: List[int], left: int, now: Optional[int] = None
) -> int:
"The number of steps that can be completed by the day cutoff."
if not now:
now = intTime()
@ -688,7 +704,9 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
ok = i
return ok + 1
def _graduatingIvl(self, card, conf, early, adj=True):
def _graduatingIvl(
self, card: Card, conf: Dict[str, Any], early: bool, adj: bool = True
) -> int:
if card.type == CARD_TYPE_REV:
# lapsed card being relearnt
if card.odid:
@ -706,13 +724,21 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
else:
return ideal
def _rescheduleNew(self, card, conf, early):
def _rescheduleNew(self, card: Card, conf: Dict[str, Any], early: bool) -> None:
"Reschedule a new card that's graduated for the first time."
card.ivl = self._graduatingIvl(card, conf, early)
card.due = self.today + card.ivl
card.factor = conf["initialFactor"]
def _logLrn(self, card, ease, conf, leaving, type, lastLeft):
def _logLrn(
self,
card: Card,
ease: int,
conf: Dict[str, Any],
leaving: bool,
type: int,
lastLeft: int,
) -> None:
lastIvl = -(self._delayForGrade(conf, lastLeft))
ivl = card.ivl if leaving else -(self._delayForGrade(conf, card.left))
@ -737,7 +763,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
time.sleep(0.01)
log()
def removeLrn(self, ids=None):
def removeLrn(self, ids: Optional[List[int]] = None) -> None:
"Remove cards from the learning queues."
if ids:
extra = " and id in " + ids2str(ids)
@ -763,7 +789,7 @@ where queue in ({QUEUE_TYPE_LRN},{QUEUE_TYPE_DAY_LEARN_RELEARN}) and type = {CAR
)
)
def _lrnForDeck(self, did):
def _lrnForDeck(self, did: int) -> int:
cnt = (
self.col.db.scalar(
f"""
@ -788,16 +814,16 @@ and due <= ? limit ?)""",
# Reviews
##########################################################################
def _deckRevLimit(self, did):
def _deckRevLimit(self, did: int) -> int:
return self._deckNewLimit(did, self._deckRevLimitSingle)
def _deckRevLimitSingle(self, d):
def _deckRevLimitSingle(self, d: Dict[str, Any]) -> int:
if d["dyn"]:
return self.reportLimit
c = self.col.decks.confForDid(d["id"])
return max(0, c["rev"]["perDay"] - d["revToday"][1])
def _revForDeck(self, did, lim):
def _revForDeck(self, did: int, lim: int) -> int:
lim = min(lim, self.reportLimit)
return self.col.db.scalar(
f"""
@ -809,7 +835,7 @@ and due <= ? limit ?)""",
lim,
)
def _resetRevCount(self):
def _resetRevCount(self) -> None:
def cntFn(did, lim):
return self.col.db.scalar(
f"""
@ -822,12 +848,12 @@ did = ? and queue = {QUEUE_TYPE_REV} and due <= ? limit %d)"""
self.revCount = self._walkingCount(self._deckRevLimitSingle, cntFn)
def _resetRev(self):
def _resetRev(self) -> None:
self._resetRevCount()
self._revQueue = []
self._revQueue: List[Any] = []
self._revDids = self.col.decks.active()[:]
def _fillRev(self):
def _fillRev(self) -> Optional[bool]:
if self._revQueue:
return True
if not self.revCount:
@ -868,12 +894,15 @@ did = ? and queue = {QUEUE_TYPE_REV} and due <= ? limit ?""",
self._resetRev()
return self._fillRev()
def _getRevCard(self):
return None
def _getRevCard(self) -> Optional[Card]:
if self._fillRev():
self.revCount -= 1
return self.col.getCard(self._revQueue.pop())
return None
def totalRevForCurrentDeck(self):
def totalRevForCurrentDeck(self) -> int:
return self.col.db.scalar(
f"""
select count() from cards where id in (
@ -886,15 +915,15 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
# Answering a review card
##########################################################################
def _answerRevCard(self, card, ease):
delay = 0
def _answerRevCard(self, card: Card, ease: int) -> None:
delay: float = 0
if ease == BUTTON_ONE:
delay = self._rescheduleLapse(card)
else:
self._rescheduleRev(card, ease)
self._logRev(card, ease, delay)
def _rescheduleLapse(self, card):
def _rescheduleLapse(self, card: Card) -> float:
conf = self._lapseConf(card)
card.lastIvl = card.ivl
if self._resched(card):
@ -906,7 +935,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
if card.odid:
card.odue = card.due
# if suspended as a leech, nothing to do
delay = 0
delay: float = 0
if self._checkLeech(card, conf) and card.queue == QUEUE_TYPE_SUSPENDED:
return delay
# if no relearning steps, nothing to do
@ -930,10 +959,10 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
card.queue = QUEUE_TYPE_DAY_LEARN_RELEARN
return delay
def _nextLapseIvl(self, card, conf):
def _nextLapseIvl(self, card: Card, conf: Dict[str, Any]) -> int:
return max(conf["minInt"], int(card.ivl * conf["mult"]))
def _rescheduleRev(self, card, ease):
def _rescheduleRev(self, card: Card, ease: int) -> None:
# update interval
card.lastIvl = card.ivl
if self._resched(card):
@ -948,7 +977,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
card.odid = 0
card.odue = 0
def _logRev(self, card, ease, delay):
def _logRev(self, card: Card, ease: int, delay: float) -> None:
def log():
self.col.db.execute(
"insert into revlog values (?,?,?,?,?,?,?,?,?)",
@ -973,7 +1002,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
# Interval management
##########################################################################
def _nextRevIvl(self, card, ease):
def _nextRevIvl(self, card: Card, ease: int) -> int:
"Ideal next interval for CARD, given EASE."
delay = self._daysLate(card)
conf = self._revConf(card)
@ -992,11 +1021,11 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
# interval capped?
return min(interval, conf["maxIvl"])
def _fuzzedIvl(self, ivl):
def _fuzzedIvl(self, ivl: int) -> int:
min, max = self._fuzzIvlRange(ivl)
return random.randint(min, max)
def _fuzzIvlRange(self, ivl):
def _fuzzIvlRange(self, ivl: int) -> List[int]:
if ivl < 2:
return [1, 1]
elif ivl == 2:
@ -1011,24 +1040,24 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
fuzz = max(fuzz, 1)
return [ivl - fuzz, ivl + fuzz]
def _constrainedIvl(self, ivl, conf, prev):
def _constrainedIvl(self, ivl: float, conf: Dict[str, Any], prev: int) -> int:
"Integer interval after interval factor and prev+1 constraints applied."
new = ivl * conf.get("ivlFct", 1)
return int(max(new, prev + 1))
def _daysLate(self, card):
def _daysLate(self, card: Card) -> int:
"Number of days later than scheduled."
due = card.odue if card.odid else card.due
return max(0, self.today - due)
def _updateRevIvl(self, card, ease):
def _updateRevIvl(self, card: Card, ease: int) -> None:
idealIvl = self._nextRevIvl(card, ease)
card.ivl = min(
max(self._adjRevIvl(card, idealIvl), card.ivl + 1),
self._revConf(card)["maxIvl"],
)
def _adjRevIvl(self, card, idealIvl):
def _adjRevIvl(self, card: Card, idealIvl: int) -> int:
if self._spreadRev:
idealIvl = self._fuzzedIvl(idealIvl)
return idealIvl
@ -1036,7 +1065,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
# Dynamic deck handling
##########################################################################
def rebuildDyn(self, did=None):
def rebuildDyn(self, did: Optional[int] = None) -> Optional[List[int]]:
"Rebuild a dynamic deck."
did = did or self.col.decks.selected()
deck = self.col.decks.get(did)
@ -1045,12 +1074,12 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
self.emptyDyn(did)
ids = self._fillDyn(deck)
if not ids:
return
return None
# and change to our new deck
self.col.decks.select(did)
return ids
def _fillDyn(self, deck):
def _fillDyn(self, deck: Dict[str, Any]) -> List[int]:
search, limit, order = deck["terms"][0]
orderlimit = self._dynOrder(order, limit)
if search.strip():
@ -1066,7 +1095,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
self._moveToDyn(deck["id"], ids)
return ids
def emptyDyn(self, did, lim=None):
def emptyDyn(self, did: Optional[int], lim: Optional[str] = None) -> None:
if not lim:
lim = "did = %s" % did
self.col.log(self.col.db.list("select id from cards where %s" % lim))
@ -1080,10 +1109,10 @@ due = odue, odue = 0, odid = 0, usn = ? where %s"""
self.col.usn(),
)
def remFromDyn(self, cids):
def remFromDyn(self, cids: List[int]) -> None:
self.emptyDyn(None, "id in %s and odid" % ids2str(cids))
def _dynOrder(self, o, l):
def _dynOrder(self, o: int, l: int) -> str:
if o == DYN_OLDEST:
t = "(select max(id) from revlog where cid=c.id)"
elif o == DYN_RANDOM:
@ -1110,7 +1139,7 @@ due = odue, odue = 0, odid = 0, usn = ? where %s"""
t = "c.due"
return t + " limit %d" % l
def _moveToDyn(self, did, ids):
def _moveToDyn(self, did: int, ids: List[int]) -> None:
deck = self.col.decks.get(did)
data = []
t = intTime()
@ -1134,7 +1163,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
data,
)
def _dynIvlBoost(self, card):
def _dynIvlBoost(self, card: Card) -> int:
assert card.odid and card.type == CARD_TYPE_REV
assert card.factor
elapsed = card.ivl - (card.odue - self.today)
@ -1146,7 +1175,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
# Leeches
##########################################################################
def _checkLeech(self, card, conf) -> bool:
def _checkLeech(self, card: Card, conf: Dict[str, Any]) -> bool:
"Leech handler. True if card was a leech."
lf = conf["leechFails"]
if not lf:
@ -1176,10 +1205,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
# Tools
##########################################################################
def _cardConf(self, card):
def _cardConf(self, card: Card) -> Dict[str, Any]:
return self.col.decks.confForDid(card.did)
def _newConf(self, card):
def _newConf(self, card: Card) -> Dict[str, Any]:
conf = self._cardConf(card)
# normal deck
if not card.odid:
@ -1199,7 +1228,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
perDay=self.reportLimit,
)
def _lapseConf(self, card):
def _lapseConf(self, card: Card) -> Dict[str, Any]:
conf = self._cardConf(card)
# normal deck
if not card.odid:
@ -1218,7 +1247,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
resched=conf["resched"],
)
def _revConf(self, card):
def _revConf(self, card: Card) -> Dict[str, Any]:
conf = self._cardConf(card)
# normal deck
if not card.odid:
@ -1226,10 +1255,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
# dynamic deck
return self.col.decks.confForDid(card.odid)["rev"]
def _deckLimit(self):
def _deckLimit(self) -> str:
return ids2str(self.col.decks.active())
def _resched(self, card):
def _resched(self, card: Card) -> bool:
conf = self._cardConf(card)
if not conf["dyn"]:
return True
@ -1238,7 +1267,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
# Daily cutoff
##########################################################################
def _updateCutoff(self):
def _updateCutoff(self) -> None:
oldToday = self.today
# days since col created
self.today = int((time.time() - self.col.crt) // 86400)
@ -1261,7 +1290,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
if unburied < self.today:
self.unburyCards()
def _checkDay(self):
def _checkDay(self) -> None:
# check if the day has rolled over
if time.time() > self.dayCutoff:
self.reset()
@ -1269,7 +1298,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
# Deck finished state
##########################################################################
def finishedMsg(self):
def finishedMsg(self) -> str:
return (
"<b>"
+ _("Congratulations! You have finished this deck for now.")
@ -1277,7 +1306,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?"""
+ self._nextDueMsg()
)
def _nextDueMsg(self):
def _nextDueMsg(self) -> str:
line = []
# the new line replacements are so we don't break translations
# in a point release
@ -1321,7 +1350,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
)
return "<p>".join(line)
def revDue(self):
def revDue(self) -> Optional[int]:
"True if there are any rev cards due."
return self.col.db.scalar(
(
@ -1332,7 +1361,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
self.today,
)
def newDue(self):
def newDue(self) -> Optional[int]:
"True if there are any new cards due."
return self.col.db.scalar(
(
@ -1342,7 +1371,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
% self._deckLimit()
)
def haveBuried(self):
def haveBuried(self) -> bool:
sdids = ids2str(self.col.decks.active())
cnt = self.col.db.scalar(
f"select 1 from cards where queue = {QUEUE_TYPE_SIBLING_BURIED} and did in %s limit 1"
@ -1353,7 +1382,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
# Next time reports
##########################################################################
def nextIvlStr(self, card, ease, short=False):
def nextIvlStr(self, card: Card, ease: int, short: bool = False) -> str:
"Return the next interval for CARD as a string."
ivl_secs = self.nextIvl(card, ease)
if not ivl_secs:
@ -1365,7 +1394,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
s = "<" + s
return s
def nextIvl(self, card, ease):
def nextIvl(self, card: Card, ease: int) -> float:
"Return the next interval for CARD, in seconds."
if card.queue in (QUEUE_TYPE_NEW, QUEUE_TYPE_LRN, QUEUE_TYPE_DAY_LEARN_RELEARN):
return self._nextLrnIvl(card, ease)
@ -1380,7 +1409,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
return self._nextRevIvl(card, ease) * 86400
# this isn't easily extracted from the learn code
def _nextLrnIvl(self, card, ease):
def _nextLrnIvl(self, card: Card, ease: int) -> float:
if card.queue == 0:
card.left = self._startingLeft(card)
conf = self._lrnConf(card)
@ -1405,7 +1434,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
# Suspending
##########################################################################
def suspendCards(self, ids):
def suspendCards(self, ids: List[int]) -> None:
"Suspend cards."
self.col.log(ids)
self.remFromDyn(ids)
@ -1417,7 +1446,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
self.col.usn(),
)
def unsuspendCards(self, ids):
def unsuspendCards(self, ids: List[int]) -> None:
"Unsuspend cards."
self.col.log(ids)
self.col.db.execute(
@ -1427,7 +1456,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
self.col.usn(),
)
def buryCards(self, cids):
def buryCards(self, cids: List[int]) -> None:
self.col.log(cids)
self.remFromDyn(cids)
self.removeLrn(cids)
@ -1439,7 +1468,7 @@ update cards set queue={QUEUE_TYPE_SIBLING_BURIED},mod=?,usn=? where id in """
self.col.usn(),
)
def buryNote(self, nid):
def buryNote(self, nid: int) -> None:
"Bury all cards for note until next session."
cids = self.col.db.list(
"select id from cards where nid = ? and queue >= 0", nid
@ -1449,7 +1478,7 @@ update cards set queue={QUEUE_TYPE_SIBLING_BURIED},mod=?,usn=? where id in """
# Sibling spacing
##########################################################################
def _burySiblings(self, card):
def _burySiblings(self, card: Card) -> None:
toBury = []
nconf = self._newConf(card)
buryNew = nconf.get("bury", True)
@ -1493,7 +1522,7 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""",
# Resetting
##########################################################################
def forgetCards(self, ids):
def forgetCards(self, ids: List[int]) -> None:
"Put cards at the end of the new queue."
self.remFromDyn(ids)
self.col.db.execute(
@ -1509,7 +1538,7 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""",
self.sortCards(ids, start=pmax + 1)
self.col.log(ids)
def reschedCards(self, ids, imin, imax):
def reschedCards(self, ids: List[int], imin: int, imax: int) -> None:
"Put cards in review queue with a new interval in days (min, max)."
d = []
t = self.today
@ -1535,7 +1564,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""",
)
self.col.log(ids)
def resetCards(self, ids):
def resetCards(self, ids: List[int]) -> None:
"Completely reset cards for export."
sids = ids2str(ids)
# we want to avoid resetting due number of existing new cards on export
@ -1555,7 +1584,14 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""",
# Repositioning new cards
##########################################################################
def sortCards(self, cids, start=1, step=1, shuffle=False, shift=False):
def sortCards(
self,
cids: List[int],
start: int = 1,
step: int = 1,
shuffle: bool = False,
shift: bool = False,
) -> None:
scids = ids2str(cids)
now = intTime()
nids = []
@ -1605,15 +1641,15 @@ and due >= ? and queue = {QUEUE_TYPE_NEW}"""
"update cards set due=:due,mod=:now,usn=:usn where id = :cid", d
)
def randomizeCards(self, did):
def randomizeCards(self, did: int) -> None:
cids = self.col.db.list("select id from cards where did = ?", did)
self.sortCards(cids, shuffle=True)
def orderCards(self, did):
def orderCards(self, did: int) -> None:
cids = self.col.db.list("select id from cards where did = ? order by nid", did)
self.sortCards(cids)
def resortConf(self, conf):
def resortConf(self, conf) -> None:
for did in self.col.decks.didsForConf(conf):
if conf["new"]["order"] == 0:
self.randomizeCards(did)
@ -1621,7 +1657,7 @@ and due >= ? and queue = {QUEUE_TYPE_NEW}"""
self.orderCards(did)
# for post-import
def maybeRandomizeDeck(self, did=None):
def maybeRandomizeDeck(self, did: Optional[int] = None) -> None:
if not did:
did = self.col.decks.selected()
conf = self.col.decks.confForDid(did)

View file

@ -128,14 +128,15 @@ class Scheduler:
self._restorePreviewCard(card)
self._removeFromFiltered(card)
def counts(self, card: None = None) -> tuple:
def counts(self, card: Optional[Card] = None) -> Tuple[int, int, int]:
counts = [self.newCount, self.lrnCount, self.revCount]
if card:
idx = self.countIdx(card)
counts[idx] += 1
return tuple(counts)
new, lrn, rev = counts
return (new, lrn, rev)
def dueForecast(self, days=7) -> List:
def dueForecast(self, days: int = 7) -> List[Any]:
"Return counts over next DAYS. Includes today."
daysd = dict(
self.col.db.all(
@ -158,7 +159,7 @@ order by due"""
ret = [x[1] for x in sorted(daysd.items())]
return ret
def countIdx(self, card: Card) -> Any:
def countIdx(self, card: Card) -> int:
if card.queue in (QUEUE_TYPE_DAY_LEARN_RELEARN, QUEUE_TYPE_PREVIEW):
return 1
return card.queue
@ -179,7 +180,7 @@ order by due"""
g[key][1] += cnt
self.col.decks.save(g)
def extendLimits(self, new, rev) -> None:
def extendLimits(self, new: int, rev: int) -> None:
cur = self.col.decks.current()
parents = self.col.decks.parents(cur["id"])
children = [
@ -193,8 +194,10 @@ order by due"""
self.col.decks.save(g)
def _walkingCount(
self, limFn: Optional[Callable] = None, cntFn: Optional[Callable] = None
) -> Any:
self,
limFn: Optional[Callable[[Any], Optional[int]]] = None,
cntFn: Optional[Callable[[int, int], int]] = None,
) -> int:
tot = 0
pcounts: Dict[int, int] = {}
# for each of the active decks
@ -228,7 +231,7 @@ order by due"""
# Deck list
##########################################################################
def deckDueList(self) -> List[list]:
def deckDueList(self) -> List[List[Any]]:
"Returns [deckname, did, rev, lrn, new]"
self._checkDay()
self.col.decks.checkIntegrity()
@ -270,9 +273,7 @@ order by due"""
def deckDueTree(self) -> Any:
return self._groupChildren(self.deckDueList())
def _groupChildren(
self, grps: List[List]
) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]:
def _groupChildren(self, grps: List[List[Any]]) -> Any:
# first, split the group names into components
for g in grps:
g[0] = g[0].split("::")
@ -281,7 +282,7 @@ order by due"""
# then run main function
return self._groupChildrenMain(grps)
def _groupChildrenMain(self, grps: Any) -> Any:
def _groupChildrenMain(self, grps: List[List[Any]]) -> Any:
tree = []
# group and recurse
def key(grp):
@ -379,7 +380,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
self._newQueue: List[int] = []
self._updateNewCardRatio()
def _fillNew(self) -> Any:
def _fillNew(self) -> Optional[bool]:
if self._newQueue:
return True
if not self.newCount:
@ -406,6 +407,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
# removed from the queue but not buried
self._resetNew()
return self._fillNew()
return None
def _getNewCard(self) -> Optional[Card]:
if self._fillNew():
@ -423,7 +425,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
return
self.newCardModulus = 0
def _timeForNewCard(self) -> Optional[int]:
def _timeForNewCard(self) -> Optional[bool]:
"True if it's time to display a new card when distributing."
if not self.newCount:
return False
@ -432,10 +434,10 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
elif self.col.conf["newSpread"] == NEW_CARDS_FIRST:
return True
elif self.newCardModulus:
return self.reps and self.reps % self.newCardModulus == 0
return self.reps != 0 and self.reps % self.newCardModulus == 0
else:
# shouldn't reach
return False
return None
def _deckNewLimit(
self, did: int, fn: Callable[[Dict[str, Any]], int] = None
@ -453,7 +455,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
lim = min(rem, lim)
return lim
def _newForDeck(self, did: int, lim: int) -> Any:
def _newForDeck(self, did: int, lim: int) -> int:
"New count for a single deck."
if not lim:
return 0
@ -466,14 +468,14 @@ select count() from
lim,
)
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> Any:
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> int:
"Limit for deck without parent limits."
if g["dyn"]:
return self.dynReportLimit
c = self.col.decks.confForDid(g["id"])
return max(0, c["new"]["perDay"] - g["newToday"][1])
def totalNewForCurrentDeck(self) -> Any:
def totalNewForCurrentDeck(self) -> int:
return self.col.db.scalar(
f"""
select count() from cards where id in (
@ -533,7 +535,7 @@ select count() from cards where did in %s and queue = {QUEUE_TYPE_PREVIEW}
self._lrnDids = self.col.decks.active()[:]
# sub-day learning
def _fillLrn(self) -> Any:
def _fillLrn(self) -> Union[bool, List[Any]]:
if not self.lrnCount:
return False
if self._lrnQueue:
@ -745,10 +747,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
return tot + tod * 1000
def _leftToday(
self,
delays: Union[List[int], List[Union[float, int]]],
left: int,
now: Optional[int] = None,
self, delays: List[int], left: int, now: Optional[int] = None,
) -> int:
"The number of steps that can be completed by the day cutoff."
if not now:
@ -803,7 +802,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
else:
ivl = -self._delayForGrade(conf, card.left)
def log():
def log() -> None:
self.col.db.execute(
"insert into revlog values (?,?,?,?,?,?,?,?,?)",
int(time.time() * 1000),
@ -824,7 +823,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
time.sleep(0.01)
log()
def _lrnForDeck(self, did: int) -> Any:
def _lrnForDeck(self, did: int) -> int:
cnt = (
self.col.db.scalar(
f"""
@ -849,13 +848,13 @@ and due <= ? limit ?)""",
# Reviews
##########################################################################
def _currentRevLimit(self) -> Any:
def _currentRevLimit(self) -> int:
d = self.col.decks.get(self.col.decks.selected(), default=False)
return self._deckRevLimitSingle(d)
def _deckRevLimitSingle(
self, d: Dict[str, Any], parentLimit: Optional[int] = None
) -> Any:
) -> int:
# invalid deck selected?
if not d:
return 0
@ -993,7 +992,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
ivl = max(1, conf["minInt"], int(card.ivl * conf["mult"]))
return ivl
def _rescheduleRev(self, card: Card, ease: int, early: Union[bool, int]) -> None:
def _rescheduleRev(self, card: Card, ease: int, early: bool) -> None:
# update interval
card.lastIvl = card.ivl
if early:
@ -1076,7 +1075,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
return [ivl - fuzz, ivl + fuzz]
def _constrainedIvl(
self, ivl: Union[int, float], conf: Dict[str, Any], prev: int, fuzz: bool
self, ivl: float, conf: Dict[str, Any], prev: int, fuzz: bool
) -> int:
ivl = int(ivl * conf.get("ivlFct", 1))
if fuzz: