Type pylib/anki/schedv2.py

This commit is contained in:
Alan Du 2020-02-27 19:24:38 -05:00
parent b451f4e3f2
commit 2879dc1158

View file

@ -128,14 +128,15 @@ class Scheduler:
self._restorePreviewCard(card)
self._removeFromFiltered(card)
def counts(self, card: None = None) -> tuple:
def counts(self, card: Optional[Card] = None) -> Tuple[int, int, int]:
counts = [self.newCount, self.lrnCount, self.revCount]
if card:
idx = self.countIdx(card)
counts[idx] += 1
return tuple(counts)
new, lrn, rev = counts
return (new, lrn, rev)
def dueForecast(self, days=7) -> List:
def dueForecast(self, days: int = 7) -> List[Any]:
"Return counts over next DAYS. Includes today."
daysd = dict(
self.col.db.all(
@ -158,7 +159,7 @@ order by due"""
ret = [x[1] for x in sorted(daysd.items())]
return ret
def countIdx(self, card: Card) -> Any:
def countIdx(self, card: Card) -> int:
if card.queue in (QUEUE_TYPE_DAY_LEARN_RELEARN, QUEUE_TYPE_PREVIEW):
return 1
return card.queue
@ -179,7 +180,7 @@ order by due"""
g[key][1] += cnt
self.col.decks.save(g)
def extendLimits(self, new, rev) -> None:
def extendLimits(self, new: int, rev: int) -> None:
cur = self.col.decks.current()
parents = self.col.decks.parents(cur["id"])
children = [
@ -193,8 +194,10 @@ order by due"""
self.col.decks.save(g)
def _walkingCount(
self, limFn: Optional[Callable] = None, cntFn: Optional[Callable] = None
) -> Any:
self,
limFn: Optional[Callable[[Any], Optional[int]]] = None,
cntFn: Optional[Callable[[int, int], int]] = None,
) -> int:
tot = 0
pcounts: Dict[int, int] = {}
# for each of the active decks
@ -228,7 +231,7 @@ order by due"""
# Deck list
##########################################################################
def deckDueList(self) -> List[list]:
def deckDueList(self) -> List[List[Any]]:
"Returns [deckname, did, rev, lrn, new]"
self._checkDay()
self.col.decks.checkIntegrity()
@ -270,9 +273,7 @@ order by due"""
def deckDueTree(self) -> Any:
return self._groupChildren(self.deckDueList())
def _groupChildren(
self, grps: List[List]
) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]:
def _groupChildren(self, grps: List[List[Any]]) -> Any:
# first, split the group names into components
for g in grps:
g[0] = g[0].split("::")
@ -281,7 +282,7 @@ order by due"""
# then run main function
return self._groupChildrenMain(grps)
def _groupChildrenMain(self, grps: Any) -> Any:
def _groupChildrenMain(self, grps: List[List[Any]]) -> Any:
tree = []
# group and recurse
def key(grp):
@ -379,7 +380,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
self._newQueue: List[int] = []
self._updateNewCardRatio()
def _fillNew(self) -> Any:
def _fillNew(self) -> Optional[bool]:
if self._newQueue:
return True
if not self.newCount:
@ -406,6 +407,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
# removed from the queue but not buried
self._resetNew()
return self._fillNew()
return None
def _getNewCard(self) -> Optional[Card]:
if self._fillNew():
@ -423,7 +425,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
return
self.newCardModulus = 0
def _timeForNewCard(self) -> Optional[int]:
def _timeForNewCard(self) -> Optional[bool]:
"True if it's time to display a new card when distributing."
if not self.newCount:
return False
@ -432,10 +434,10 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
elif self.col.conf["newSpread"] == NEW_CARDS_FIRST:
return True
elif self.newCardModulus:
return self.reps and self.reps % self.newCardModulus == 0
return self.reps != 0 and self.reps % self.newCardModulus == 0
else:
# shouldn't reach
return False
return None
def _deckNewLimit(
self, did: int, fn: Callable[[Dict[str, Any]], int] = None
@ -453,7 +455,7 @@ did = ? and queue = {QUEUE_TYPE_NEW} limit ?)""",
lim = min(rem, lim)
return lim
def _newForDeck(self, did: int, lim: int) -> Any:
def _newForDeck(self, did: int, lim: int) -> int:
"New count for a single deck."
if not lim:
return 0
@ -466,14 +468,14 @@ select count() from
lim,
)
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> Any:
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> int:
"Limit for deck without parent limits."
if g["dyn"]:
return self.dynReportLimit
c = self.col.decks.confForDid(g["id"])
return max(0, c["new"]["perDay"] - g["newToday"][1])
def totalNewForCurrentDeck(self) -> Any:
def totalNewForCurrentDeck(self) -> int:
return self.col.db.scalar(
f"""
select count() from cards where id in (
@ -533,7 +535,7 @@ select count() from cards where did in %s and queue = {QUEUE_TYPE_PREVIEW}
self._lrnDids = self.col.decks.active()[:]
# sub-day learning
def _fillLrn(self) -> Any:
def _fillLrn(self) -> Union[bool, List[Any]]:
if not self.lrnCount:
return False
if self._lrnQueue:
@ -745,10 +747,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
return tot + tod * 1000
def _leftToday(
self,
delays: Union[List[int], List[Union[float, int]]],
left: int,
now: Optional[int] = None,
self, delays: List[int], left: int, now: Optional[int] = None,
) -> int:
"The number of steps that can be completed by the day cutoff."
if not now:
@ -803,7 +802,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
else:
ivl = -self._delayForGrade(conf, card.left)
def log():
def log() -> None:
self.col.db.execute(
"insert into revlog values (?,?,?,?,?,?,?,?,?)",
int(time.time() * 1000),
@ -824,7 +823,7 @@ did = ? and queue = {QUEUE_TYPE_DAY_LEARN_RELEARN} and due <= ? limit ?""",
time.sleep(0.01)
log()
def _lrnForDeck(self, did: int) -> Any:
def _lrnForDeck(self, did: int) -> int:
cnt = (
self.col.db.scalar(
f"""
@ -849,13 +848,13 @@ and due <= ? limit ?)""",
# Reviews
##########################################################################
def _currentRevLimit(self) -> Any:
def _currentRevLimit(self) -> int:
d = self.col.decks.get(self.col.decks.selected(), default=False)
return self._deckRevLimitSingle(d)
def _deckRevLimitSingle(
self, d: Dict[str, Any], parentLimit: Optional[int] = None
) -> Any:
) -> int:
# invalid deck selected?
if not d:
return 0