add a bunch of annotations for mypy

This commit is contained in:
Damien Elmes 2019-12-20 15:07:40 +10:00
parent 068b10103c
commit b6b8df2dcf
24 changed files with 196 additions and 162 deletions

View file

@ -7,7 +7,8 @@ import time
from anki.hooks import runHook
from anki.utils import intTime, timestampID, joinFields
from anki.consts import *
from typing import Any, Optional
from anki.notes import Note
from typing import Any, Optional, Dict
# Cards
##########################################################################
@ -21,6 +22,10 @@ from typing import Any, Optional
# - lrn queue: integer timestamp
class Card:
_qa: Optional[Dict[str,str]]
_note: Optional[Note]
timerStarted: Optional[float]
lastIvl: Optional[int]
def __init__(self, col, id: Optional[int] = None) -> None:
from anki.collection import _Collection
@ -133,10 +138,10 @@ lapses=?, left=?, odue=?, odid=?, did=? where id = ?""",
data = [self.id, f.id, m['id'], self.odid or self.did, self.ord,
f.stringTags(), f.joinedFields(), self.flags]
if browser:
args = (t.get('bqfmt'), t.get('bafmt'))
args = [t.get('bqfmt'), t.get('bafmt')]
else:
args = tuple()
self._qa = self.col._renderQA(data, *args)
args = []
self._qa = self.col._renderQA(data, *args) # type: ignore
return self._qa
def note(self, reload: bool = False) -> Any:
@ -176,6 +181,7 @@ lapses=?, left=?, odue=?, odid=?, did=? where id = ?""",
self.model(), joinFields(self.note().fields))
if self.ord not in ords:
return True
return False
def __repr__(self) -> str:
d = dict(self.__dict__)

View file

@ -59,11 +59,20 @@ def timezoneOffset() -> int:
else:
return time.timezone//60
from anki.schedv2 import Scheduler
from anki.sched import Scheduler as V1Scheduler
from anki.schedv2 import Scheduler as V2Scheduler
# this is initialized by storage.Collection
class _Collection:
sched: Scheduler
db: Optional[DB]
sched: Union[V1Scheduler, V2Scheduler]
crt: int
mod: int
scm: int
dty: bool # no longer used
_usn: int
ls: int
conf: Dict[str, Any]
_undo: List[Any]
def __init__(self, db: DB, server: bool = False, log: bool = False) -> None:
self._debugLog = log
@ -111,11 +120,9 @@ class _Collection:
def _loadScheduler(self) -> None:
ver = self.schedVer()
if ver == 1:
from anki.sched import Scheduler
self.sched = V1Scheduler(self)
elif ver == 2:
from anki.schedv2 import Scheduler
self.sched = Scheduler(self)
self.sched = V2Scheduler(self)
def changeSchedulerVer(self, ver: int) -> None:
if ver == self.schedVer():
@ -126,8 +133,7 @@ class _Collection:
self.modSchema(check=True)
self.clearUndo()
from anki.schedv2 import Scheduler
v2Sched = Scheduler(self)
v2Sched = V2Scheduler(self)
if ver == 1:
v2Sched.moveToV1()
@ -149,14 +155,14 @@ class _Collection:
self.dty, # no longer used
self._usn,
self.ls,
self.conf,
conf,
models,
decks,
dconf,
tags) = self.db.first("""
select crt, mod, scm, dty, usn, ls,
conf, models, decks, dconf, tags from col""")
self.conf = json.loads(self.conf)
self.conf = json.loads(conf) # type: ignore
self.models.load(models)
self.decks.load(decks, dconf)
self.tags.load(tags)
@ -197,6 +203,7 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""",
if time.time() - self._lastSave > 300:
self.save()
return True
return None
def lock(self) -> None:
# make sure we don't accidentally bump mod time
@ -361,13 +368,13 @@ crt=?, mod=?, scm=?, dty=?, usn=?, ls=?, conf=?""",
ok.append(t)
return ok
def genCards(self, nids: List[int]) -> List:
def genCards(self, nids: List[int]) -> List[int]:
"Generate cards for non-empty templates, return ids to remove."
# build map of (nid,ord) so we don't create dupes
snids = ids2str(nids)
have = {}
dids = {}
dues = {}
have: Dict[int,Dict[int, int]] = {}
dids: Dict[int,Optional[int]] = {}
dues: Dict[int,int] = {}
for id, nid, ord, did, due, odue, odid, type in self.db.execute(
"select id, nid, ord, did, due, odue, odid, type from cards where nid in "+snids):
# existing cards
@ -514,8 +521,8 @@ select id from notes where id in %s and id not in (select nid from cards)""" %
ids2str(nids))
self._remNotes(nids)
def emptyCids(self) -> List:
rem = []
def emptyCids(self) -> List[int]:
rem: List[int] = []
for m in self.models.all():
rem += self.genCards(self.models.nids(m))
return rem
@ -592,7 +599,7 @@ where c.nid = n.id and c.id in %s group by nid""" % ids2str(cids)):
fields['Card'] = template['name']
fields['c%d' % (data[4]+1)] = "1"
# render q & a
d = dict(id=data[0])
d: Dict[str,Any] = dict(id=data[0])
qfmt = qfmt or template['qfmt']
afmt = afmt or template['afmt']
for (type, format) in (("q", qfmt), ("a", afmt)):
@ -664,7 +671,7 @@ where c.nid == f.id
self._startTime = time.time()
self._startReps = self.sched.reps
def timeboxReached(self) -> Optional[Union[bool, Tuple[Any, int]]]:
def timeboxReached(self) -> Union[bool, Tuple[Any, int]]:
"Return (elapsedTime, reps) if timebox reached, or False."
if not self.conf['timeLim']:
# timeboxing disabled
@ -672,6 +679,7 @@ where c.nid == f.id
elapsed = time.time() - self._startTime
if elapsed > self.conf['timeLim']:
return (self.conf['timeLim'], self.sched.reps - self._startReps)
return False
# Undo
##########################################################################
@ -694,7 +702,7 @@ where c.nid == f.id
self._undoOp()
def markReview(self, card: Card) -> None:
old = []
old: List[Any] = []
if self._undo:
if self._undo[0] == 1:
old = self._undo[2]
@ -746,17 +754,17 @@ where c.nid == f.id
# DB maintenance
##########################################################################
def basicCheck(self) -> Optional[bool]:
def basicCheck(self) -> bool:
"Basic integrity check for syncing. True if ok."
# cards without notes
if self.db.scalar("""
select 1 from cards where nid not in (select id from notes) limit 1"""):
return
return False
# notes without cards or models
if self.db.scalar("""
select 1 from notes where id not in (select distinct nid from cards)
or mid not in %s limit 1""" % ids2str(self.models.ids())):
return
return False
# invalid ords
for m in self.models.all():
# ignore clozes
@ -767,7 +775,7 @@ select 1 from cards where ord not in %s and nid in (
select id from notes where mid = ?) limit 1""" %
ids2str([t['ord'] for t in m['tmpls']]),
m['id']):
return
return False
return True
def fixIntegrity(self) -> Tuple[Any, bool]:

View file

@ -11,12 +11,11 @@ from anki.hooks import runHook
from anki.consts import *
from anki.lang import _
from anki.errors import DeckRenameError
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Set, Union
# fixmes:
# - make sure users can't set grad interval < 1
from typing import Any, Dict, List, Optional, Union
defaultDeck = {
'newToday': [0, 0], # currentDay, count
'revToday': [0, 0],
@ -92,6 +91,8 @@ defaultConf = {
}
class DeckManager:
decks: Dict[str, Any]
dconf: Dict[str, Any]
# Registry save/load
#############################################################
@ -457,7 +458,7 @@ class DeckManager:
def _checkDeckTree(self) -> None:
decks = self.col.decks.all()
decks.sort(key=operator.itemgetter('name'))
names = set()
names: Set[str] = set()
for deck in decks:
# two decks with the same name?
@ -527,7 +528,7 @@ class DeckManager:
arr.append(did)
gather(child, arr)
arr = []
arr: List = []
gather(childMap[did], arr)
return arr
@ -537,7 +538,7 @@ class DeckManager:
# go through all decks, sorted by name
for deck in sorted(self.all(), key=operator.itemgetter("name")):
node = {}
node: Dict[int, Any] = {}
childMap[deck['id']] = node
# add note to immediate parent
@ -552,7 +553,7 @@ class DeckManager:
def parents(self, did: int, nameMap: Optional[Any] = None) -> List:
"All parents of did."
# get parent and grandparent names
parents = []
parents: List[str] = []
for part in self.get(did)['name'].split("::")[:-1]:
if not parents:
parents.append(part)

View file

@ -202,6 +202,7 @@ class AnkiExporter(Exporter):
if int(m['id']) in mids:
self.dst.models.update(m)
# decks
dids: List[int]
if not self.did:
dids = []
else:
@ -294,7 +295,7 @@ class AnkiPackageExporter(AnkiExporter):
z.writestr("media", json.dumps(media))
z.close()
def doExport(self, z: ZipFile, path: str) -> Dict[str, str]:
def doExport(self, z: ZipFile, path: str) -> Dict[str, str]: # type: ignore
# export into the anki2 file
colfile = path.replace(".apkg", ".anki2")
AnkiExporter.exportInto(self, colfile)

View file

@ -9,7 +9,7 @@ import unicodedata
from anki.utils import ids2str, splitFields, joinFields, intTime, fieldChecksum, stripHTMLMedia
from anki.consts import *
from anki.hooks import *
from typing import Any, List, Optional, Tuple
from typing import Any, List, Optional, Tuple, Set
# Find
@ -130,20 +130,20 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
def _where(self, tokens) -> Tuple[Any, Optional[List[str]]]:
# state and query
s = dict(isnot=False, isor=False, join=False, q="", bad=False)
args = []
s: Dict[str, Any] = dict(isnot=False, isor=False, join=False, q="", bad=False)
args: List[Any] = []
def add(txt, wrap=True):
# failed command?
if not txt:
# if it was to be negated then we can just ignore it
if s['isnot']:
s['isnot'] = False
return
return None, None
else:
s['bad'] = True
return
return None, None
elif txt == "skip":
return
return None, None
# do we need a conjunction?
if s['join']:
if s['isor']:
@ -273,11 +273,14 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
(c.queue in (2,3) and c.due <= %d) or
(c.queue = 1 and c.due <= %d)""" % (
self.col.sched.today, self.col.sched.dayCutoff)
else:
# unknown
return None
def _findFlag(self, args) -> Optional[str]:
(val, args) = args
if not val or len(val)!=1 or val not in "01234":
return
return None
val = int(val)
mask = 2**3 - 1
return "(c.flags & %d) == %d" % (mask, val)
@ -289,13 +292,13 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
try:
days = int(r[0])
except ValueError:
return
return None
days = min(days, 31)
# ease
ease = ""
if len(r) > 1:
if r[1] not in ("1", "2", "3", "4"):
return
return None
ease = "and ease=%s" % r[1]
cutoff = (self.col.sched.dayCutoff - 86400*days)*1000
return ("c.id in (select cid from revlog where id>%d %s)" %
@ -306,7 +309,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
try:
days = int(val)
except ValueError:
return
return None
cutoff = (self.col.sched.dayCutoff - 86400*days)*1000
return "c.id > %d" % cutoff
@ -315,7 +318,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
(val, args) = args
m = re.match("(^.+?)(<=|>=|!=|=|<|>)(.+?$)", val)
if not m:
return
return None
prop, cmp, val = m.groups()
prop = prop.lower() # pytype: disable=attribute-error
# is val valid?
@ -325,10 +328,10 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
else:
val = int(val)
except ValueError:
return
return None
# is prop valid?
if prop not in ("due", "ivl", "reps", "lapses", "ease"):
return
return None
# query
q = []
if prop == "due":
@ -350,19 +353,19 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
def _findNids(self, args) -> Optional[str]:
(val, args) = args
if re.search("[^0-9,]", val):
return
return None
return "n.id in (%s)" % val
def _findCids(self, args) -> Optional[str]:
(val, args) = args
if re.search("[^0-9,]", val):
return
return None
return "c.id in (%s)" % val
def _findMid(self, args) -> Optional[str]:
(val, args) = args
if re.search("[^0-9]", val):
return
return None
return "n.mid = %s" % val
def _findModel(self, args) -> str:
@ -401,7 +404,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
if re.match("(?i)"+val, unicodedata.normalize("NFC", d['name'])):
ids.update(dids(d['id']))
if not ids:
return
return None
sids = ids2str(ids)
return "c.did in %s or c.odid in %s" % (sids, sids)
@ -440,7 +443,7 @@ select distinct(n.id) from cards c, notes n where c.nid=n.id and """+preds
mods[str(m['id'])] = (m, f['ord'])
if not mods:
# nothing has that field
return
return None
# gather nids
regex = re.escape(val).replace("_", ".").replace(re.escape("%"), ".*")
nids = []
@ -456,7 +459,7 @@ where mid in %s and flds like ? escape '\\'""" % (
if re.search("(?si)^"+regex+"$", strg):
nids.append(id)
except sre_constants.error:
return
return None
if not nids:
return "0"
return "n.id in %s" % ids2str(nids)
@ -467,7 +470,7 @@ where mid in %s and flds like ? escape '\\'""" % (
try:
mid, val = val.split(",", 1)
except OSError:
return
return None
csum = fieldChecksum(val)
nids = []
for nid, flds in self.col.db.execute(
@ -531,7 +534,7 @@ def findReplace(col, nids, src, dst, regex=False, field=None, fold=True) -> int:
return len(d)
def fieldNames(col, downcase=True) -> List:
fields = set()
fields: Set[str] = set()
for m in col.models.all():
for f in m['flds']:
name=f['name'].lower() if downcase else f['name']
@ -540,7 +543,7 @@ def fieldNames(col, downcase=True) -> List:
return list(fields)
def fieldNamesForNotes(col, nids) -> List:
fields = set()
fields: Set[str] = set()
mids = col.db.list("select distinct mid from notes where id in %s" % ids2str(nids))
for mid in mids:
model = col.models.get(mid)
@ -558,9 +561,9 @@ def findDupes(col, fieldName, search="") -> List[Tuple[Any, List]]:
search = "("+search+") "
search += "'%s:*'" % fieldName
# go through notes
vals = {}
vals: Dict[str, List[int]] = {}
dupes = []
fields = {}
fields: Dict[int, int] = {}
def ordForMid(mid):
if mid not in fields:
model = col.models.get(mid)

View file

@ -14,33 +14,32 @@ automatically but can be called with _old().
"""
import decorator
from typing import Dict, List, Callable, Any
from typing import List, Any, Callable, Dict
# Hooks
##############################################################################
from typing import Callable, Dict, Union
_hooks: Dict[str, List[Callable[..., Any]]] = {}
def runHook(hook: str, *args) -> None:
"Run all functions on hook."
hook = _hooks.get(hook, None)
if hook:
for func in hook:
hookFuncs = _hooks.get(hook, None)
if hookFuncs:
for func in hookFuncs:
try:
func(*args)
except:
hook.remove(func)
hookFuncs.remove(func)
raise
def runFilter(hook: str, arg: Any, *args) -> Any:
hook = _hooks.get(hook, None)
if hook:
for func in hook:
hookFuncs = _hooks.get(hook, None)
if hookFuncs:
for func in hookFuncs:
try:
arg = func(arg, *args)
except:
hook.remove(func)
hookFuncs.remove(func)
raise
return arg

View file

@ -106,8 +106,8 @@ compatMap = {
threadLocal = threading.local()
# global defaults
currentLang = None
currentTranslation = None
currentLang: Any = None
currentTranslation: Any = None
def localTranslation() -> Any:
"Return the translation local to this thread, or the default."

View file

@ -6,10 +6,8 @@ import re, os, shutil, html
from anki.utils import checksum, call, namedtmp, tmpdir, isMac, stripHTML
from anki.hooks import addHook
from anki.lang import _
from typing import Any
from typing import Any, Dict, List, Optional, Union
pngCommands = [
["latex", "-interaction=nonstopmode", "tmp.tex"],
["dvipng", "-D", "200", "-T", "tight", "tmp.dvi", "-o", "tmp.png"]

View file

@ -128,18 +128,19 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
def dir(self) -> Any:
return self._dir
def _isFAT32(self) -> Optional[bool]:
def _isFAT32(self) -> bool:
if not isWin:
return
return False
# pylint: disable=import-error
import win32api, win32file # pytype: disable=import-error
try:
name = win32file.GetVolumeNameForVolumeMountPoint(self._dir[:3])
except:
# mapped & unmapped network drive; pray that it's not vfat
return
return False
if win32api.GetVolumeInformation(name)[4].lower().startswith("fat"):
return True
return False
# Adding media
##########################################################################
@ -200,7 +201,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
def filesInStr(self, mid: Union[int, str], string: str, includeRemote: bool = False) -> List[str]:
l = []
model = self.col.models.get(mid)
strings = []
strings: List[str] = []
if model['type'] == MODEL_CLOZE and "{{c" in string:
# if the field has clozes in it, we'll need to expand the
# possibilities so we can render latex
@ -248,6 +249,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return txt
def escapeImages(self, string: str, unescape: bool = False) -> str:
fn: Callable
if unescape:
fn = urllib.parse.unquote
else:
@ -458,7 +460,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
self.db.commit()
def _changes(self) -> Tuple[List[Tuple[str, int]], List[str]]:
self.cache = {}
self.cache: Dict[str, Any] = {}
for (name, csum, mod) in self.db.execute(
"select fname, csum, mtime from media where csum is not null"):
# previous entries may not have been in NFC form
@ -614,7 +616,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
# normalize name
name = unicodedata.normalize("NFC", name)
# save file
with open(name, "wb") as f:
with open(name, "wb") as f: # type: ignore
f.write(data)
# update db
media.append((name, csum, self._mtime(name), 0))

View file

@ -3,21 +3,19 @@
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
import copy, re, json
from typing import Dict, Any
from anki.utils import intTime, joinFields, splitFields, ids2str,\
checksum
from anki.lang import _
from anki.consts import *
from anki.hooks import runHook
import time
from typing import List, Optional, Tuple, Union
from typing import Tuple, Union, Any, Callable, Dict, List, Optional
# Models
##########################################################################
# - careful not to add any lists/dicts/etc here, as they aren't deep copied
from typing import Any, Callable, Dict, List, Optional
defaultModel = {
'sortf': 0,
'did': 1,
@ -73,6 +71,7 @@ defaultTemplate = {
}
class ModelManager:
models: Dict[str, Any]
# Saving/loading registry
#############################################################
@ -112,6 +111,7 @@ class ModelManager:
from anki.stdmodels import addBasicModel
addBasicModel(self.col)
return True
return None
# Retrieving and creating models
#############################################################

View file

@ -8,6 +8,7 @@ from typing import List, Tuple
from typing import Any, Optional
class Note:
tags: List[str]
def __init__(self, col, model: Optional[Any] = None, id: Optional[int] = None) -> None:
assert not (model and id)
@ -34,12 +35,12 @@ class Note:
self.mod,
self.usn,
self.tags,
self.fields,
fields,
self.flags,
self.data) = self.col.db.first("""
select guid, mid, mod, usn, tags, flds, flags, data
from notes where id = ?""", self.id)
self.fields = splitFields(self.fields)
self.fields = splitFields(fields)
self.tags = self.col.tags.split(self.tags)
self._model = self.col.models.get(self.mid)
self._fmap = self.col.models.fieldMap(self._model)

View file

@ -23,7 +23,9 @@ from anki.hooks import runHook
from anki.cards import Card
#from anki.collection import _Collection
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
from typing import Any, Callable, Dict, List, Optional, Union, Tuple, Set
class Scheduler:
name = "std2"
haveCustomStudy = True
@ -35,7 +37,7 @@ class Scheduler:
self.reportLimit = 1000
self.dynReportLimit = 99999
self.reps = 0
self.today = None
self.today: Optional[int] = None
self._haveQueues = False
self._lrnCutoff = 0
self._updateCutoff()
@ -179,7 +181,7 @@ order by due""" % self._deckLimit(),
def _walkingCount(self, limFn: Optional[Callable] = None, cntFn: Optional[Callable] = None) -> Any:
tot = 0
pcounts = {}
pcounts: Dict[int, int] = {}
# for each of the active decks
nameMap = self.col.decks.nameMap()
for did in self.col.decks.active():
@ -217,7 +219,7 @@ order by due""" % self._deckLimit(),
self.col.decks.checkIntegrity()
decks = self.col.decks.all()
decks.sort(key=itemgetter('name'))
lims = {}
lims: Dict[str, List[int]] = {}
data = []
def parent(name):
parts = name.split("::")
@ -260,18 +262,18 @@ order by due""" % self._deckLimit(),
# then run main function
return self._groupChildrenMain(grps)
def _groupChildrenMain(self, grps: List[List[Union[List[str], int]]]) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]:
def _groupChildrenMain(self, grps: Any) -> Any:
tree = []
# group and recurse
def key(grp):
return grp[0][0]
for (head, tail) in itertools.groupby(grps, key=key):
tail = list(tail)
tail = list(tail) # type: ignore
did = None
rev = 0
new = 0
lrn = 0
children = []
children: Any = []
for c in tail:
if len(c[0]) == 1:
# current node
@ -350,7 +352,7 @@ did = ? and queue = 0 limit ?)""", did, lim)
def _resetNew(self) -> None:
self._resetNewCount()
self._newDids = self.col.decks.active()[:]
self._newQueue = []
self._newQueue: List[int] = []
self._updateNewCardRatio()
def _fillNew(self) -> Any:
@ -403,8 +405,11 @@ did = ? and queue = 0 limit ?)""", did, lim)
return True
elif self.newCardModulus:
return self.reps and self.reps % self.newCardModulus == 0
else:
# shouldn't reach
return False
def _deckNewLimit(self, did: int, fn: None = None) -> Any:
def _deckNewLimit(self, did: int, fn: Callable[[Dict[str, Any]], int] = None) -> Any:
if not fn:
fn = self._deckNewLimitSingle
sel = self.col.decks.get(did)
@ -476,8 +481,8 @@ select count() from cards where did in %s and queue = 4
def _resetLrn(self) -> None:
self._updateLrnCutoff(force=True)
self._resetLrnCount()
self._lrnQueue = []
self._lrnDayQueue = []
self._lrnQueue: List[Tuple[int,int]] = []
self._lrnDayQueue: List[int] = []
self._lrnDids = self.col.decks.active()[:]
# sub-day learning
@ -531,6 +536,8 @@ did = ? and queue = 3 and due <= ? limit ?""",
return True
# nothing left in the deck; move to next
self._lrnDids.pop(0)
# shouldn't reach here
return False
def _getLrnDayCard(self) -> Any:
if self._fillLrnDay():
@ -678,14 +685,14 @@ did = ? and queue = 3 and due <= ? limit ?""",
tod = self._leftToday(conf['delays'], tot)
return tot + tod*1000
def _leftToday(self, delays: Union[List[int], List[Union[float, int]]], left: int, now: None = None) -> int:
def _leftToday(self, delays: Union[List[int], List[Union[float, int]]], left: int, now: Optional[int] = None) -> int:
"The number of steps that can be completed by the day cutoff."
if not now:
now = intTime()
delays = delays[-left:]
ok = 0
for i in range(len(delays)):
now += delays[i]*60
now += int(delays[i]*60)
if now > self.dayCutoff:
break
ok = i
@ -787,7 +794,7 @@ did in %s and queue = 2 and due <= ? limit %d)""" % (
def _resetRev(self) -> None:
self._resetRevCount()
self._revQueue = []
self._revQueue: List[int] = []
def _fillRev(self) -> Any:
if self._revQueue:
@ -1009,7 +1016,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
self.emptyDyn(did)
cnt = self._fillDyn(deck)
if not cnt:
return
return None
# and change to our new deck
self.col.decks.select(did)
return cnt
@ -1120,7 +1127,7 @@ where id = ?
"Leech handler. True if card was a leech."
lf = conf['leechFails']
if not lf:
return
return None
# if over threshold or every half threshold reps after that
if (card.lapses >= lf and
(card.lapses-lf) % (max(lf // 2, 1)) == 0):
@ -1135,6 +1142,7 @@ where id = ?
# notify UI
runHook("leech", card)
return True
return None
# Tools
##########################################################################
@ -1521,7 +1529,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""",
scids = ids2str(cids)
now = intTime()
nids = []
nidsSet = set()
nidsSet: Set[int] = set()
for id in cids:
nid = self.col.db.scalar("select nid from cards where id = ?", id)
if nid not in nidsSet:

View file

@ -67,6 +67,7 @@ processingChain = [
]
# don't show box on windows
si: Optional[Any]
if sys.platform == "win32":
si = subprocess.STARTUPINFO() # pytype: disable=module-attr
try:
@ -93,7 +94,7 @@ from anki.mpv import MPV, MPVBase
_player: Optional[Callable[[Any], Any]]
_queueEraser: Optional[Callable[[], Any]]
_soundReg: str
mpvManager: Optional["MpvManager"] = None
mpvPath, mpvEnv = _packagedCmd(["mpv"])
@ -135,8 +136,6 @@ def setMpvConfigBase(base) -> None:
"--include="+mpvConfPath,
]
mpvManager = None
def setupMPV() -> None:
global mpvManager, _player, _queueEraser
mpvManager = MpvManager()
@ -185,14 +184,12 @@ if isWin:
cleanupOldMplayerProcesses()
mplayerQueue: List[str] = []
mplayerManager = None
mplayerReader = None
mplayerEvt = threading.Event()
mplayerClear = False
class MplayerMonitor(threading.Thread):
mplayer = None
mplayer: Optional[subprocess.Popen] = None
deadPlayers: List[subprocess.Popen] = []
def run(self) -> NoReturn:
@ -273,6 +270,8 @@ class MplayerMonitor(threading.Thread):
mplayerEvt.clear()
raise Exception("Did you install mplayer?")
mplayerManager: Optional[MplayerMonitor] = None
def queueMplayer(path) -> None:
ensureMplayerThreads()
if isWin and os.path.exists(path):
@ -326,7 +325,7 @@ try:
PYAU_FORMAT = pyaudio.paInt16
PYAU_CHANNELS = 1
PYAU_INPUT_INDEX = None
PYAU_INPUT_INDEX: Optional[int] = None
except:
pyaudio = None

View file

@ -8,7 +8,7 @@ import json
from anki.utils import fmtTimeSpan, ids2str
from anki.lang import _, ngettext
from typing import Any, List, Tuple, Optional
from typing import Any, List, Tuple, Optional, Dict
# Card stats
@ -253,7 +253,7 @@ from revlog where id > ? """+lim, (self.col.sched.dayCutoff-86400)*1000)
return txt
def _dueInfo(self, tot, num) -> str:
i = []
i: List[str] = []
self._line(i, _("Total"), ngettext("%d review", "%d reviews", tot) % tot)
self._line(i, _("Average"), self._avgDay(
tot, num, _("reviews")))
@ -289,7 +289,7 @@ group by day order by day""" % (self._limit(), lim),
data = self._added(days, chunk)
if not data:
return ""
conf = dict(
conf: Dict[str, Any] = dict(
xaxis=dict(tickDecimals=0, max=0.5),
yaxes=[dict(min=0), dict(position="right", min=0)])
if days is not None:
@ -309,7 +309,7 @@ group by day order by day""" % (self._limit(), lim),
if not period:
# base off date of earliest added card
period = self._deckAge('add')
i = []
i: List[str] = []
self._line(i, _("Total"), ngettext("%d card", "%d cards", tot) % tot)
self._line(i, _("Average"), self._avgDay(tot, period, _("cards")))
txt += self._lineTbl(i)
@ -321,7 +321,7 @@ group by day order by day""" % (self._limit(), lim),
data = self._done(days, chunk)
if not data:
return ""
conf = dict(
conf: Dict[str, Any] = dict(
xaxis=dict(tickDecimals=0, max=0.5),
yaxes=[dict(min=0), dict(position="right", min=0)])
if days is not None:
@ -371,7 +371,7 @@ group by day order by day""" % (self._limit(), lim),
if not period:
# base off earliest repetition date
period = self._deckAge('review')
i = []
i: List[str] = []
self._line(i, _("Days studied"),
_("<b>%(pct)d%%</b> (%(x)s of %(y)s)") % dict(
x=studied, y=period, pct=studied/float(period)*100),
@ -406,9 +406,9 @@ group by day order by day""" % (self._limit(), lim),
return self._lineTbl(i), int(tot)
def _splitRepData(self, data, spec) -> Tuple[List[dict], List[Tuple[Any, Any]]]:
sep = {}
sep: Dict[int, Any] = {}
totcnt = {}
totd = {}
totd: Dict[int, Any] = {}
alltot = []
allcnt = 0
for (n, col, lab) in spec:
@ -541,7 +541,7 @@ group by day order by day)""" % lim,
], conf=dict(
xaxis=dict(min=-0.5, max=ivlmax+0.5),
yaxes=[dict(), dict(position="right", max=105)]))
i = []
i: List[str] = []
self._line(i, _("Average interval"), fmtTimeSpan(avg*86400))
self._line(i, _("Longest interval"), fmtTimeSpan(max_*86400))
return txt + self._lineTbl(i)
@ -565,7 +565,7 @@ select count(), avg(ivl), max(ivl) from cards where did in %s and queue = 2""" %
# 3 + 4 + 4 + spaces on sides and middle = 15
# yng starts at 1+3+1 = 5
# mtr starts at 5+4+1 = 10
d = {'lrn':[], 'yng':[], 'mtr':[]}
d: Dict[str, List] = {'lrn':[], 'yng':[], 'mtr':[]}
types = ("lrn", "yng", "mtr")
eases = self._eases()
for (type, ease, cnt) in eases:
@ -651,7 +651,7 @@ order by thetype, ease""" % (ease4repl, lim))
shifted = []
counts = []
mcount = 0
trend = []
trend: List[Tuple[int,int]] = []
peak = 0
for d in data:
hour = (d[0] - 4) % 24
@ -727,7 +727,7 @@ group by hour having count() > 30 order by hour""" % lim,
(_("Suspended+Buried"), colSusp))):
d.append(dict(data=div[c], label="%s: %s" % (t, div[c]), color=col))
# text data
i = []
i: List[str] = []
(c, f) = self.col.db.first("""
select count(id), count(distinct nid) from cards
where did in %s """ % self._limit())
@ -839,8 +839,8 @@ from cards where did in %s""" % self._limit())
elif type == "fill":
conf['series']['lines'] = dict(show=True, fill=True)
elif type == "pie":
width /= 2.3
height *= 1.5
width = int(float(width)/2.3)
height = int(float(height)*1.5)
ylabel = ""
conf['series']['pie'] = dict(
show=True,

View file

@ -14,9 +14,7 @@ from anki.collection import _Collection
from anki.consts import *
from anki.stdmodels import addBasicModel, addClozeModel, addForwardReverse, \
addForwardOptionalReverse, addBasicTypingModel
from typing import Any, Dict, List, Optional, Tuple, Type, Union
_Collection: Type[_Collection]
from typing import Any, Dict, Tuple
def Collection(path: str, lock: bool = True, server: bool = False, log: bool = False) -> _Collection:
"Open a new or existing collection. Path must be unicode."

View file

@ -8,6 +8,7 @@ import random
import requests
import json
import os
import sqlite3
from anki.db import DB, DBError
from anki.utils import ids2str, intTime, platDesc, checksum, devMode
@ -20,7 +21,6 @@ from typing import Any, Dict, List, Optional, Tuple, Union
# syncing vars
HTTP_TIMEOUT = 90
HTTP_PROXY = None
HTTP_BUF_SIZE = 64*1024
class UnexpectedSchemaChange(Exception):
@ -30,6 +30,7 @@ class UnexpectedSchemaChange(Exception):
##########################################################################
class Syncer:
cursor: Optional[sqlite3.Cursor]
def __init__(self, col, server=None) -> None:
self.col = col
@ -38,7 +39,7 @@ class Syncer:
# these are set later; provide dummy values for type checking
self.lnewer = False
self.maxUsn = 0
self.tablesLeft = []
self.tablesLeft: List[str] = []
def sync(self) -> str:
"Returns 'noChanges', 'fullSync', 'success', etc"
@ -148,7 +149,7 @@ class Syncer:
def _gravesChunk(self, graves: Dict) -> Tuple[Dict, Optional[Dict]]:
lim = 250
chunk = dict(notes=[], cards=[], decks=[])
chunk: Dict[str, Any] = dict(notes=[], cards=[], decks=[])
for cat in "notes", "cards", "decks":
if lim and graves[cat]:
chunk[cat] = graves[cat][:lim]
@ -245,7 +246,7 @@ class Syncer:
self.tablesLeft = ["revlog", "cards", "notes"]
self.cursor = None
def cursorForTable(self, table) -> Any:
def cursorForTable(self, table) -> sqlite3.Cursor:
lim = self.usnLim()
x = self.col.db.execute
d = (self.maxUsn, lim)
@ -263,7 +264,7 @@ select id, guid, mid, mod, %d, tags, flds, '', '', flags, data
from notes where %s""" % d)
def chunk(self) -> dict:
buf = dict(done=False)
buf: Dict[str, Any] = dict(done=False)
lim = 250
while self.tablesLeft and lim:
curTable = self.tablesLeft[0]
@ -505,7 +506,7 @@ class HttpSyncer:
self.hkey = hkey
self.skey = checksum(str(random.random()))[:8]
self.client = client or AnkiRequestsClient()
self.postVars = {}
self.postVars: Dict[str,str] = {}
self.hostNum = hostNum
self.prefix = "sync/"
@ -532,7 +533,7 @@ class HttpSyncer:
bdry = b"--"+BOUNDARY
buf = io.BytesIO()
# post vars
self.postVars['c'] = 1 if comp else 0
self.postVars['c'] = "1" if comp else "0"
for (key, value) in list(self.postVars.items()):
buf.write(bdry + b"\r\n")
buf.write(
@ -550,7 +551,7 @@ Content-Type: application/octet-stream\r\n\r\n""")
if comp:
tgt = gzip.GzipFile(mode="wb", fileobj=buf, compresslevel=comp)
else:
tgt = buf
tgt = buf # type: ignore
while 1:
data = fobj.read(65536)
if not data:
@ -668,7 +669,7 @@ class FullSyncer(HttpSyncer):
tpath = self.col.path + ".tmp"
if cont == "upgradeRequired":
runHook("sync", "upgradeRequired")
return
return None
open(tpath, "wb").write(cont)
# check the received file is ok
d = DB(tpath)
@ -683,6 +684,7 @@ class FullSyncer(HttpSyncer):
os.unlink(self.col.path)
os.rename(tpath, self.col.path)
self.col = None
return None
def upload(self) -> bool:
"True if upload successful."

View file

@ -14,7 +14,8 @@ import json
from anki.utils import intTime, ids2str
from anki.hooks import runHook
import re
from typing import Any, List, Tuple
from typing import Any, List, Tuple, Callable, Dict
class TagManager:
@ -23,7 +24,7 @@ class TagManager:
def __init__(self, col) -> None:
self.col = col
self.tags = {}
self.tags: Dict[str, int] = {}
def load(self, json_) -> None:
self.tags = json.loads(json_)
@ -95,6 +96,7 @@ class TagManager:
if add:
self.register(newTags)
# find notes missing the tags
fn: Callable[[str, str], str]
if add:
l = "tags not "
fn = self.addToStr

View file

@ -1,11 +1,11 @@
import re
from anki.utils import stripHTML, stripHTMLMedia
from anki.hooks import runFilter
from typing import Any, Callable, NoReturn, Optional
from typing import Any, Callable, Pattern, Dict
clozeReg = r"(?si)\{\{(c)%s::(.*?)(::(.*?))?\}\}"
modifiers = {}
modifiers: Dict[str, Callable] = {}
def modifier(symbol) -> Callable[[Any], Any]:
"""Decorator for associating a function with a Mustache tag modifier.
@ -35,10 +35,10 @@ def get_or_attr(obj, name, default=None) -> Any:
class Template:
# The regular expression used to find a #section
section_re = None
section_re: Pattern = None
# The regular expression used to find a tag.
tag_re = None
tag_re: Pattern = None
# Opening tag delimiter
otag = '{{'
@ -58,8 +58,8 @@ class Template:
template = self.render_sections(template, context)
result = self.render_tags(template, context)
if encoding is not None:
result = result.encode(encoding)
# if encoding is not None:
# result = result.encode(encoding)
return result
def compile_regexps(self) -> None:
@ -72,7 +72,7 @@ class Template:
tag = r"%(otag)s(#|=|&|!|>|\{)?(.+?)\1?%(ctag)s+"
self.tag_re = re.compile(tag % tags)
def render_sections(self, template, context) -> NoReturn:
def render_sections(self, template, context) -> str:
"""Expands sections."""
while 1:
match = self.section_re.search(template)
@ -238,12 +238,12 @@ class Template:
return txt
@modifier('=')
def render_delimiter(self, tag_name=None, context=None) -> Optional[str]:
def render_delimiter(self, tag_name=None, context=None) -> str:
"""Changes the Mustache delimiter."""
try:
self.otag, self.ctag = tag_name.split(' ')
except ValueError:
# invalid
return
return ''
self.compile_regexps()
return ''

View file

@ -12,18 +12,18 @@ class View:
# The name of this template. If none is given the View will try
# to infer it based on the class name.
template_name = None
template_name: str = None
# Absolute path to the template itself. Pystache will try to guess
# if it's not provided.
template_file = None
template_file: str = None
# Contents of the template.
template = None
template: str = None
# Character encoding of the template file. If None, Pystache will not
# do any decoding of the template.
template_encoding = None
template_encoding: str = None
def __init__(self, template=None, context=None, **kwargs) -> None:
self.template = template

View file

@ -22,10 +22,10 @@ from anki.lang import _, ngettext
# some add-ons expect json to be in the utils module
import json # pylint: disable=unused-import
from typing import Any, Optional, Tuple
from anki.db import DB
from typing import Any, Iterator, List, Union
from typing import Any, Iterator, List, Union, Optional, Tuple
_tmpdir: Optional[str]
# Time handling
@ -339,12 +339,12 @@ def call(argv: List[str], wait: bool = True, **kwargs) -> int:
"Execute a command. If WAIT, return exit code."
# ensure we don't open a separate window for forking process on windows
if isWin:
si = subprocess.STARTUPINFO() # pytype: disable=module-attr
si = subprocess.STARTUPINFO() # type: ignore
try:
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # pytype: disable=module-attr
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
except:
# pylint: disable=no-member
si.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW # pytype: disable=module-attr
si.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW # type: ignore
else:
si = None
# run
@ -387,6 +387,7 @@ def invalidFilename(str, dirsep=True) -> Optional[str]:
return "\\"
elif str.strip().startswith("."):
return "."
return None
def platDesc() -> str:
# we may get an interrupted system call, so try this in a loop

View file

@ -10,7 +10,7 @@ import tempfile
import builtins
import locale
import gettext
from typing import Optional
from typing import Optional, Any
from aqt.qt import *
import anki.lang
@ -127,8 +127,8 @@ dialogs = DialogManager()
# loaded, and we need the Qt language to match the gettext language or
# translated shortcuts will not work.
_gtrans = None
_qtrans = None
_gtrans: Optional[Any] = None
_qtrans: Optional[QTranslator] = None
def setupLang(pm, app, force=None):
global _gtrans, _qtrans

View file

@ -1,7 +1,9 @@
# Copyright: Ankitects Pty Ltd and contributors
# -*- coding: utf-8 -*-
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from typing import Optional
from anki.collection import _Collection
from aqt.qt import *
from http import HTTPStatus
import http.server
@ -45,7 +47,7 @@ class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
class MediaServer(threading.Thread):
_port = None
_port: Optional[int] = None
_ready = threading.Event()
daemon = True
@ -69,7 +71,7 @@ class MediaServer(threading.Thread):
class RequestHandler(http.server.SimpleHTTPRequestHandler):
timeout = 1
mw = None
mw: Optional[_Collection] = None
def do_GET(self):
f = self.send_head()

View file

@ -1,6 +1,7 @@
# Copyright: Ankitects Pty Ltd and contributors
# -*- coding: utf-8 -*-
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from typing import Optional
from aqt.qt import *
import re, os, sys, subprocess
@ -424,8 +425,8 @@ def downArrow():
# Tooltips
######################################################################
_tooltipTimer = None
_tooltipLabel = None
_tooltipTimer: Optional[QTimer] = None
_tooltipLabel: Optional[str] = None
def tooltip(msg, period=3000, parent=None):
global _tooltipTimer, _tooltipLabel

View file

@ -1,6 +1,8 @@
[mypy]
python_version = 3.6
pretty = true
no_strict_optional = true
show_error_codes = true
[mypy-win32file]
ignore_missing_imports = True