add more typing, and enable checks for missing types for most of pylib

This commit is contained in:
Damien Elmes 2021-01-31 21:38:36 +10:00
parent bb92dde2d7
commit 1741ce1ed8
14 changed files with 112 additions and 87 deletions

View file

@ -281,7 +281,7 @@ class Collection:
self.db.rollback()
self.db.begin()
def reopen(self, after_full_sync=False) -> None:
def reopen(self, after_full_sync: bool = False) -> None:
assert not self.db
assert self.path.endswith(".anki2")
@ -410,7 +410,7 @@ class Collection:
def cardCount(self) -> Any:
return self.db.scalar("select count() from cards")
def remove_cards_and_orphaned_notes(self, card_ids: Sequence[int]):
def remove_cards_and_orphaned_notes(self, card_ids: Sequence[int]) -> None:
"You probably want .remove_notes_by_card() instead."
self._backend.remove_cards(card_ids=card_ids)
@ -506,7 +506,7 @@ class Collection:
dupes = []
fields: Dict[int, int] = {}
def ordForMid(mid):
def ordForMid(mid: int) -> int:
if mid not in fields:
model = self.models.get(mid)
for c, f in enumerate(model["flds"]):
@ -540,7 +540,10 @@ class Collection:
##########################################################################
def build_search_string(
self, *terms: Union[str, SearchTerm], negate=False, match_any=False
self,
*terms: Union[str, SearchTerm],
negate: bool = False,
match_any: bool = False,
) -> str:
"""Helper function for the backend's search string operations.
@ -577,11 +580,11 @@ class Collection:
except KeyError:
return default
def set_config(self, key: str, val: Any):
def set_config(self, key: str, val: Any) -> None:
self.setMod()
self.conf.set(key, val)
def remove_config(self, key):
def remove_config(self, key: str) -> None:
self.setMod()
self.conf.remove(key)
@ -780,11 +783,11 @@ table.review-log {{ {revlog_style} }}
# Logging
##########################################################################
def log(self, *args, **kwargs) -> None:
def log(self, *args: Any, **kwargs: Any) -> None:
if not self._should_log:
return
def customRepr(x):
def customRepr(x: Any) -> str:
if isinstance(x, str):
return x
return pprint.pformat(x)
@ -866,7 +869,7 @@ table.review-log {{ {revlog_style} }}
def get_preferences(self) -> Preferences:
return self._backend.get_preferences()
def set_preferences(self, prefs: Preferences):
def set_preferences(self, prefs: Preferences) -> None:
self._backend.set_preferences(prefs)

View file

@ -21,6 +21,7 @@ from __future__ import annotations
import copy
import weakref
from typing import Any
from weakref import ref
import anki
from anki.errors import NotFoundError
@ -46,7 +47,7 @@ class ConfigManager:
# Legacy dict interface
#########################
def __getitem__(self, key):
def __getitem__(self, key: str) -> Any:
val = self.get_immutable(key)
if isinstance(val, list):
print(
@ -61,28 +62,28 @@ class ConfigManager:
else:
return val
def __setitem__(self, key, value):
def __setitem__(self, key: str, value: Any) -> None:
self.set(key, value)
def get(self, key, default=None):
def get(self, key: str, default: Any = None) -> Any:
try:
return self[key]
except KeyError:
return default
def setdefault(self, key, default):
def setdefault(self, key: str, default: Any) -> Any:
if key not in self:
self[key] = default
return self[key]
def __contains__(self, key):
def __contains__(self, key: str) -> bool:
try:
self.get_immutable(key)
return True
except KeyError:
return False
def __delitem__(self, key):
def __delitem__(self, key: str) -> None:
self.remove(key)
@ -95,13 +96,13 @@ class ConfigManager:
class WrappedList(list):
def __init__(self, conf, key, val):
def __init__(self, conf: ref[ConfigManager], key: str, val: Any) -> None:
self.key = key
self.conf = conf
self.orig = copy.deepcopy(val)
super().__init__(val)
def __del__(self):
def __del__(self) -> None:
cur = list(self)
conf = self.conf()
if conf and self.orig != cur:
@ -109,13 +110,13 @@ class WrappedList(list):
class WrappedDict(dict):
def __init__(self, conf, key, val):
def __init__(self, conf: ref[ConfigManager], key: str, val: Any) -> None:
self.key = key
self.conf = conf
self.orig = copy.deepcopy(val)
super().__init__(val)
def __del__(self):
def __del__(self) -> None:
cur = dict(self)
conf = self.conf()
if conf and self.orig != cur:

View file

@ -5,6 +5,8 @@ from __future__ import annotations
import copy
import pprint
import sys
import traceback
from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import anki # pylint: disable=unused-import
@ -43,36 +45,37 @@ class DecksDictProxy:
def __init__(self, col: anki.collection.Collection):
self._col = col.weakref()
def _warn(self):
def _warn(self) -> None:
traceback.print_stack(file=sys.stdout)
print("add-on should use methods on col.decks, not col.decks.decks dict")
def __getitem__(self, item):
def __getitem__(self, item: Any) -> Any:
self._warn()
return self._col.decks.get(int(item))
def __setitem__(self, key, val):
def __setitem__(self, key: Any, val: Any) -> None:
self._warn()
self._col.decks.save(val)
def __len__(self):
def __len__(self) -> int:
self._warn()
return len(self._col.decks.all_names_and_ids())
def keys(self):
def keys(self) -> Any:
self._warn()
return [str(nt.id) for nt in self._col.decks.all_names_and_ids()]
def values(self):
def values(self) -> Any:
self._warn()
return self._col.decks.all()
def items(self):
def items(self) -> Any:
self._warn()
return [(str(nt["id"]), nt) for nt in self._col.decks.all()]
def __contains__(self, item):
def __contains__(self, item: Any) -> bool:
self._warn()
self._col.decks.have(item)
return self._col.decks.have(item)
class DeckManager:
@ -97,7 +100,7 @@ class DeckManager:
self.update(g, preserve_usn=False)
# legacy
def flush(self):
def flush(self) -> None:
pass
def __repr__(self) -> str:
@ -135,7 +138,7 @@ class DeckManager:
self.col._backend.remove_deck(did)
def all_names_and_ids(
self, skip_empty_default=False, include_filtered=True
self, skip_empty_default: bool = False, include_filtered: bool = True
) -> Sequence[DeckNameID]:
"A sorted sequence of deck names and IDs."
return self.col._backend.get_deck_names(
@ -195,12 +198,12 @@ class DeckManager:
)
]
def collapse(self, did) -> None:
def collapse(self, did: int) -> None:
deck = self.get(did)
deck["collapsed"] = not deck["collapsed"]
self.save(deck)
def collapseBrowser(self, did) -> None:
def collapseBrowser(self, did: int) -> None:
deck = self.get(did)
collapsed = deck.get("browserCollapsed", False)
deck["browserCollapsed"] = not collapsed
@ -241,7 +244,7 @@ class DeckManager:
return self.get_legacy(id)
return None
def update(self, g: Deck, preserve_usn=True) -> None:
def update(self, g: Deck, preserve_usn: bool = True) -> None:
"Add or update an existing deck. Used for syncing and merging."
try:
g["id"] = self.col._backend.add_or_update_deck_legacy(
@ -303,7 +306,7 @@ class DeckManager:
except NotFoundError:
return None
def update_config(self, conf: DeckConfig, preserve_usn=False) -> None:
def update_config(self, conf: DeckConfig, preserve_usn: bool = False) -> None:
conf["id"] = self.col._backend.add_or_update_deck_config_legacy(
config=to_json_bytes(conf), preserve_usn_and_mtime=preserve_usn
)
@ -325,7 +328,7 @@ class DeckManager:
) -> int:
return self.add_config(name, clone_from)["id"]
def remove_config(self, id) -> None:
def remove_config(self, id: int) -> None:
"Remove a configuration and update all decks using it."
self.col.modSchema(check=True)
for g in self.all():
@ -341,14 +344,14 @@ class DeckManager:
grp["conf"] = id
self.save(grp)
def didsForConf(self, conf) -> List[int]:
def didsForConf(self, conf: DeckConfig) -> List[int]:
dids = []
for deck in self.all():
if "conf" in deck and deck["conf"] == conf["id"]:
dids.append(deck["id"])
return dids
def restoreToDefault(self, conf) -> None:
def restoreToDefault(self, conf: DeckConfig) -> None:
oldOrder = conf["new"]["order"]
new = from_json_bytes(self.col._backend.new_deck_config_legacy())
new["id"] = conf["id"]
@ -380,7 +383,7 @@ class DeckManager:
return deck["name"]
return None
def setDeck(self, cids, did) -> None:
def setDeck(self, cids: List[int], did: int) -> None:
self.col.db.execute(
"update cards set did=?,usn=?,mod=? where id in " + ids2str(cids),
did,
@ -424,7 +427,7 @@ class DeckManager:
self.col.conf["activeDecks"] = active
# don't use this, it will likely go away
def update_active(self):
def update_active(self) -> None:
self.select(self.current()["id"])
# Parents/children
@ -480,7 +483,7 @@ class DeckManager:
# Change to Dict[int, "DeckManager.childMapNode"] when MyPy allow recursive type
def childDids(self, did: int, childMap: DeckManager.childMapNode) -> List:
def gather(node: DeckManager.childMapNode, arr):
def gather(node: DeckManager.childMapNode, arr: List) -> None:
for did, child in node.items():
arr.append(did)
gather(child, arr)

View file

@ -16,10 +16,10 @@ class Finder:
self.col = col.weakref()
print("Finder() is deprecated, please use col.find_cards() or .find_notes()")
def findCards(self, query, order):
def findCards(self, query: Any, order: Any) -> Any:
return self.col.find_cards(query, order)
def findNotes(self, query):
def findNotes(self, query: Any) -> Any:
return self.col.find_notes(query)
@ -55,7 +55,7 @@ def fieldNamesForNotes(col: Collection, nids: List[int]) -> List[str]:
##########################################################################
def fieldNames(col, downcase=True) -> List:
def fieldNames(col: Collection, downcase: bool = True) -> List:
fields: Set[str] = set()
for m in col.models.all():
for f in m["flds"]:

View file

@ -25,7 +25,7 @@ from anki.hooks_gen import *
_hooks: Dict[str, List[Callable[..., Any]]] = {}
def runHook(hook: str, *args) -> None:
def runHook(hook: str, *args: Any) -> None:
"Run all functions on hook."
hookFuncs = _hooks.get(hook, None)
if hookFuncs:
@ -37,7 +37,7 @@ def runHook(hook: str, *args) -> None:
raise
def runFilter(hook: str, arg: Any, *args) -> Any:
def runFilter(hook: str, arg: Any, *args: Any) -> Any:
hookFuncs = _hooks.get(hook, None)
if hookFuncs:
for func in hookFuncs:
@ -57,7 +57,7 @@ def addHook(hook: str, func: Callable) -> None:
_hooks[hook].append(func)
def remHook(hook, func) -> None:
def remHook(hook: Any, func: Any) -> None:
"Remove a function if is on hook."
hook = _hooks.get(hook, [])
if func in hook:
@ -72,10 +72,10 @@ def remHook(hook, func) -> None:
#
# If you call wrap() with pos='around', the original function will not be called
# automatically but can be called with _old().
def wrap(old, new, pos="after") -> Callable:
def wrap(old: Any, new: Any, pos: str = "after") -> Callable:
"Override an existing function."
def repl(*args, **kwargs):
def repl(*args: Any, **kwargs: Any) -> Any:
if pos == "after":
old(*args, **kwargs)
return new(*args, **kwargs)
@ -85,7 +85,7 @@ def wrap(old, new, pos="after") -> Callable:
else:
return new(_old=old, *args, **kwargs)
def decorator_wrapper(f, *args, **kwargs):
def decorator_wrapper(f: Any, *args: Any, **kwargs: Any) -> Any:
return repl(*args, **kwargs)
return decorator.decorator(decorator_wrapper)(old)

View file

@ -11,7 +11,7 @@ import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Any, Callable, List, Optional, Tuple
from typing import Any, Callable, List, Match, Optional, Tuple
import anki
import anki._backend.backend_pb2 as _pb
@ -197,7 +197,7 @@ class MediaManager:
else:
fn = urllib.parse.quote
def repl(match):
def repl(match: Match) -> str:
tag = match.group(0)
fname = match.group("fname")
if re.match("(https?|ftp)://", fname):

View file

@ -5,7 +5,9 @@ from __future__ import annotations
import copy
import pprint
import sys
import time
import traceback
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import anki # pylint: disable=unused-import
@ -39,36 +41,37 @@ class ModelsDictProxy:
def __init__(self, col: anki.collection.Collection):
self._col = col.weakref()
def _warn(self):
def _warn(self) -> None:
traceback.print_stack(file=sys.stdout)
print("add-on should use methods on col.models, not col.models.models dict")
def __getitem__(self, item):
def __getitem__(self, item: Any) -> Any:
self._warn()
return self._col.models.get(int(item))
def __setitem__(self, key, val):
def __setitem__(self, key: str, val: Any) -> None:
self._warn()
self._col.models.save(val)
def __len__(self):
def __len__(self) -> int:
self._warn()
return len(self._col.models.all_names_and_ids())
def keys(self):
def keys(self) -> Any:
self._warn()
return [str(nt.id) for nt in self._col.models.all_names_and_ids()]
def values(self):
def values(self) -> Any:
self._warn()
return self._col.models.all()
def items(self):
def items(self) -> Any:
self._warn()
return [(str(nt["id"]), nt) for nt in self._col.models.all()]
def __contains__(self, item):
def __contains__(self, item: Any) -> bool:
self._warn()
self._col.models.have(item)
return self._col.models.have(item)
class ModelManager:
@ -123,7 +126,7 @@ class ModelManager:
def _get_cached(self, ntid: int) -> Optional[NoteType]:
return self._cache.get(ntid)
def _clear_cache(self):
def _clear_cache(self) -> None:
self._cache = {}
# Listing note types
@ -218,7 +221,7 @@ class ModelManager:
"Delete model, and all its cards/notes."
self.remove(m["id"])
def remove_all_notetypes(self):
def remove_all_notetypes(self) -> None:
for nt in self.all_names_and_ids():
self._remove_from_cache(nt.id)
self.col._backend.remove_notetype(nt.id)
@ -236,7 +239,7 @@ class ModelManager:
if existing_id is not None and existing_id != m["id"]:
m["name"] += "-" + checksum(str(time.time()))[:5]
def update(self, m: NoteType, preserve_usn=True) -> None:
def update(self, m: NoteType, preserve_usn: bool = True) -> None:
"Add or update an existing model. Use .save() instead."
self._remove_from_cache(m["id"])
self.ensureNameUnique(m)

View file

@ -113,7 +113,7 @@ class Note:
def __setitem__(self, key: str, value: str) -> None:
self.fields[self._fieldOrd(key)] = value
def __contains__(self, key) -> bool:
def __contains__(self, key: str) -> bool:
return key in self._fmap
# Tags

View file

@ -350,7 +350,7 @@ limit %d"""
lastIvl = -(self._delayForGrade(conf, lastLeft))
ivl = card.ivl if leaving else -(self._delayForGrade(conf, card.left))
def log():
def log() -> None:
self.col.db.execute(
"insert into revlog values (?,?,?,?,?,?,?,?,?)",
int(time.time() * 1000),
@ -450,7 +450,7 @@ and due <= ? limit ?)""",
self._revQueue: List[Any] = []
self._revDids = self.col.decks.active()[:]
def _fillRev(self, recursing=False) -> bool:
def _fillRev(self, recursing: bool = False) -> bool:
"True if a review card can be fetched."
if self._revQueue:
return True

View file

@ -166,7 +166,7 @@ class Scheduler:
self._restorePreviewCard(card)
self._removeFromFiltered(card)
def _reset_counts(self):
def _reset_counts(self) -> None:
tree = self.deck_due_tree(self.col.decks.selected())
node = self.col.decks.find_deck_in_tree(tree, int(self.col.conf["curDeck"]))
if not node:
@ -187,7 +187,7 @@ class Scheduler:
new, lrn, rev = counts
return (new, lrn, rev)
def _is_finished(self):
def _is_finished(self) -> bool:
"Don't use this, it is a stop-gap until this code is refactored."
return not any((self.newCount, self.revCount, self._immediate_learn_count))
@ -229,8 +229,12 @@ order by due"""
##########################################################################
def update_stats(
self, deck_id: int, new_delta=0, review_delta=0, milliseconds_delta=0
):
self,
deck_id: int,
new_delta: int = 0,
review_delta: int = 0,
milliseconds_delta: int = 0,
) -> None:
self.col._backend.update_stats(
deck_id=deck_id,
new_delta=new_delta,
@ -321,7 +325,7 @@ order by due"""
self._newQueue: List[int] = []
self._updateNewCardRatio()
def _fillNew(self, recursing=False) -> bool:
def _fillNew(self, recursing: bool = False) -> bool:
if self._newQueue:
return True
if not self.newCount:
@ -841,7 +845,7 @@ and due <= ? limit ?)"""
def _resetRev(self) -> None:
self._revQueue: List[int] = []
def _fillRev(self, recursing=False) -> bool:
def _fillRev(self, recursing: bool = False) -> bool:
"True if a review card can be fetched."
if self._revQueue:
return True
@ -947,7 +951,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l
self._removeFromFiltered(card)
def _logRev(self, card: Card, ease: int, delay: int, type: int) -> None:
def log():
def log() -> None:
self.col.db.execute(
"insert into revlog values (?,?,?,?,?,?,?,?,?)",
int(time.time() * 1000),
@ -1344,7 +1348,7 @@ due = (case when odue>0 then odue else due end), odue = 0, odid = 0, usn = ? whe
mode = BuryOrSuspendMode.BURY_SCHED
self.col._backend.bury_or_suspend_cards(card_ids=ids, mode=mode)
def bury_note(self, note: Note):
def bury_note(self, note: Note) -> None:
self.bury_cards(note.card_ids())
# legacy
@ -1472,7 +1476,7 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""",
def orderCards(self, did: int) -> None:
self.col._backend.sort_deck(deck_id=did, randomize=False)
def resortConf(self, conf) -> None:
def resortConf(self, conf: DeckConfig) -> None:
for did in self.col.decks.didsForConf(conf):
if conf["new"]["order"] == 0:
self.randomizeCards(did)

View file

@ -140,7 +140,7 @@ from revlog where id > ? """
relrn = relrn or 0
filt = filt or 0
# studied
def bold(s):
def bold(s: str) -> str:
return "<b>" + str(s) + "</b>"
if cards:
@ -298,7 +298,7 @@ group by day order by day"""
# pylint: disable=invalid-unary-operand-type
conf["xaxis"]["min"] = -days + 0.5
def plot(id, data, ylabel, ylabel2):
def plot(id: str, data: Any, ylabel: str, ylabel2: str) -> str:
return self._graph(
id, data=data, conf=conf, xunit=chunk, ylabel=ylabel, ylabel2=ylabel2
)
@ -333,7 +333,7 @@ group by day order by day"""
# pylint: disable=invalid-unary-operand-type
conf["xaxis"]["min"] = -days + 0.5
def plot(id, data, ylabel, ylabel2):
def plot(id: str, data: Any, ylabel: str, ylabel2: str) -> str:
return self._graph(
id, data=data, conf=conf, xunit=chunk, ylabel=ylabel, ylabel2=ylabel2
)

View file

@ -11,9 +11,10 @@ import os
import socket
import sys
import time
from http import HTTPStatus
from io import BytesIO
from tempfile import NamedTemporaryFile
from typing import Optional
from typing import Iterable, Optional
try:
import flask
@ -89,7 +90,7 @@ def handle_sync_request(method_str: str) -> Response:
elif method == Method.FULL_DOWNLOAD:
path = outdata.decode("utf8")
def stream_reply():
def stream_reply() -> Iterable[bytes]:
with open(path, "rb") as f:
while chunk := f.read(16 * 1024):
yield chunk
@ -106,7 +107,7 @@ def handle_sync_request(method_str: str) -> Response:
return resp
def after_full_sync():
def after_full_sync() -> None:
# the server methods do not reopen the collection after a full sync,
# so we need to
col.reopen(after_full_sync=False)
@ -146,15 +147,17 @@ def get_method(
@app.route("/<path:pathin>", methods=["POST"])
def handle_request(pathin: str):
def handle_request(pathin: str) -> Response:
path = pathin
print(int(time.time()), flask.request.remote_addr, path)
if path.startswith("sync/"):
return handle_sync_request(path.split("/", maxsplit=1)[1])
else:
return flask.make_response("not found", HTTPStatus.NOT_FOUND)
def folder():
def folder() -> str:
folder = os.getenv("FOLDER", os.path.expanduser("~/.syncserver"))
if not os.path.exists(folder):
print("creating", folder)
@ -162,11 +165,11 @@ def folder():
return folder
def col_path():
def col_path() -> str:
return os.path.join(folder(), "collection.server.anki2")
def serve():
def serve() -> None:
global col
col = Collection(col_path(), server=True)

View file

@ -13,7 +13,7 @@ from __future__ import annotations
import pprint
import re
from typing import Collection, List, Optional, Sequence, Tuple
from typing import Collection, List, Match, Optional, Sequence, Tuple
import anki # pylint: disable=unused-import
import anki._backend.backend_pb2 as _pb
@ -139,7 +139,7 @@ class TagManager:
def remFromStr(self, deltags: str, tags: str) -> str:
"Delete tags if they exist."
def wildcard(pat: str, repl: str):
def wildcard(pat: str, repl: str) -> Match:
pat = re.escape(pat).replace("\\*", ".*")
return re.match("^" + pat + "$", repl, re.IGNORECASE)

View file

@ -9,6 +9,14 @@ warn_redundant_casts = True
warn_unused_configs = True
strict_equality = true
[mypy-anki.*]
disallow_untyped_defs = True
[mypy-anki.importing.*]
disallow_untyped_defs = False
[mypy-anki.exporting]
disallow_untyped_defs = False
[mypy-win32file]
ignore_missing_imports = True
[mypy-win32pipe]