diff --git a/pylib/anki/collection.py b/pylib/anki/collection.py index 9a057e23d..d227391f2 100644 --- a/pylib/anki/collection.py +++ b/pylib/anki/collection.py @@ -281,7 +281,7 @@ class Collection: self.db.rollback() self.db.begin() - def reopen(self, after_full_sync=False) -> None: + def reopen(self, after_full_sync: bool = False) -> None: assert not self.db assert self.path.endswith(".anki2") @@ -410,7 +410,7 @@ class Collection: def cardCount(self) -> Any: return self.db.scalar("select count() from cards") - def remove_cards_and_orphaned_notes(self, card_ids: Sequence[int]): + def remove_cards_and_orphaned_notes(self, card_ids: Sequence[int]) -> None: "You probably want .remove_notes_by_card() instead." self._backend.remove_cards(card_ids=card_ids) @@ -506,7 +506,7 @@ class Collection: dupes = [] fields: Dict[int, int] = {} - def ordForMid(mid): + def ordForMid(mid: int) -> int: if mid not in fields: model = self.models.get(mid) for c, f in enumerate(model["flds"]): @@ -540,7 +540,10 @@ class Collection: ########################################################################## def build_search_string( - self, *terms: Union[str, SearchTerm], negate=False, match_any=False + self, + *terms: Union[str, SearchTerm], + negate: bool = False, + match_any: bool = False, ) -> str: """Helper function for the backend's search string operations. @@ -577,11 +580,11 @@ class Collection: except KeyError: return default - def set_config(self, key: str, val: Any): + def set_config(self, key: str, val: Any) -> None: self.setMod() self.conf.set(key, val) - def remove_config(self, key): + def remove_config(self, key: str) -> None: self.setMod() self.conf.remove(key) @@ -780,11 +783,11 @@ table.review-log {{ {revlog_style} }} # Logging ########################################################################## - def log(self, *args, **kwargs) -> None: + def log(self, *args: Any, **kwargs: Any) -> None: if not self._should_log: return - def customRepr(x): + def customRepr(x: Any) -> str: if isinstance(x, str): return x return pprint.pformat(x) @@ -866,7 +869,7 @@ table.review-log {{ {revlog_style} }} def get_preferences(self) -> Preferences: return self._backend.get_preferences() - def set_preferences(self, prefs: Preferences): + def set_preferences(self, prefs: Preferences) -> None: self._backend.set_preferences(prefs) diff --git a/pylib/anki/config.py b/pylib/anki/config.py index 46bb1e1b7..e77e3e7a8 100644 --- a/pylib/anki/config.py +++ b/pylib/anki/config.py @@ -21,6 +21,7 @@ from __future__ import annotations import copy import weakref from typing import Any +from weakref import ref import anki from anki.errors import NotFoundError @@ -46,7 +47,7 @@ class ConfigManager: # Legacy dict interface ######################### - def __getitem__(self, key): + def __getitem__(self, key: str) -> Any: val = self.get_immutable(key) if isinstance(val, list): print( @@ -61,28 +62,28 @@ class ConfigManager: else: return val - def __setitem__(self, key, value): + def __setitem__(self, key: str, value: Any) -> None: self.set(key, value) - def get(self, key, default=None): + def get(self, key: str, default: Any = None) -> Any: try: return self[key] except KeyError: return default - def setdefault(self, key, default): + def setdefault(self, key: str, default: Any) -> Any: if key not in self: self[key] = default return self[key] - def __contains__(self, key): + def __contains__(self, key: str) -> bool: try: self.get_immutable(key) return True except KeyError: return False - def __delitem__(self, key): + def __delitem__(self, key: str) -> None: self.remove(key) @@ -95,13 +96,13 @@ class ConfigManager: class WrappedList(list): - def __init__(self, conf, key, val): + def __init__(self, conf: ref[ConfigManager], key: str, val: Any) -> None: self.key = key self.conf = conf self.orig = copy.deepcopy(val) super().__init__(val) - def __del__(self): + def __del__(self) -> None: cur = list(self) conf = self.conf() if conf and self.orig != cur: @@ -109,13 +110,13 @@ class WrappedList(list): class WrappedDict(dict): - def __init__(self, conf, key, val): + def __init__(self, conf: ref[ConfigManager], key: str, val: Any) -> None: self.key = key self.conf = conf self.orig = copy.deepcopy(val) super().__init__(val) - def __del__(self): + def __del__(self) -> None: cur = dict(self) conf = self.conf() if conf and self.orig != cur: diff --git a/pylib/anki/decks.py b/pylib/anki/decks.py index 9c0aec102..5242b23ab 100644 --- a/pylib/anki/decks.py +++ b/pylib/anki/decks.py @@ -5,6 +5,8 @@ from __future__ import annotations import copy import pprint +import sys +import traceback from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple, Union import anki # pylint: disable=unused-import @@ -43,36 +45,37 @@ class DecksDictProxy: def __init__(self, col: anki.collection.Collection): self._col = col.weakref() - def _warn(self): + def _warn(self) -> None: + traceback.print_stack(file=sys.stdout) print("add-on should use methods on col.decks, not col.decks.decks dict") - def __getitem__(self, item): + def __getitem__(self, item: Any) -> Any: self._warn() return self._col.decks.get(int(item)) - def __setitem__(self, key, val): + def __setitem__(self, key: Any, val: Any) -> None: self._warn() self._col.decks.save(val) - def __len__(self): + def __len__(self) -> int: self._warn() return len(self._col.decks.all_names_and_ids()) - def keys(self): + def keys(self) -> Any: self._warn() return [str(nt.id) for nt in self._col.decks.all_names_and_ids()] - def values(self): + def values(self) -> Any: self._warn() return self._col.decks.all() - def items(self): + def items(self) -> Any: self._warn() return [(str(nt["id"]), nt) for nt in self._col.decks.all()] - def __contains__(self, item): + def __contains__(self, item: Any) -> bool: self._warn() - self._col.decks.have(item) + return self._col.decks.have(item) class DeckManager: @@ -97,7 +100,7 @@ class DeckManager: self.update(g, preserve_usn=False) # legacy - def flush(self): + def flush(self) -> None: pass def __repr__(self) -> str: @@ -135,7 +138,7 @@ class DeckManager: self.col._backend.remove_deck(did) def all_names_and_ids( - self, skip_empty_default=False, include_filtered=True + self, skip_empty_default: bool = False, include_filtered: bool = True ) -> Sequence[DeckNameID]: "A sorted sequence of deck names and IDs." return self.col._backend.get_deck_names( @@ -195,12 +198,12 @@ class DeckManager: ) ] - def collapse(self, did) -> None: + def collapse(self, did: int) -> None: deck = self.get(did) deck["collapsed"] = not deck["collapsed"] self.save(deck) - def collapseBrowser(self, did) -> None: + def collapseBrowser(self, did: int) -> None: deck = self.get(did) collapsed = deck.get("browserCollapsed", False) deck["browserCollapsed"] = not collapsed @@ -241,7 +244,7 @@ class DeckManager: return self.get_legacy(id) return None - def update(self, g: Deck, preserve_usn=True) -> None: + def update(self, g: Deck, preserve_usn: bool = True) -> None: "Add or update an existing deck. Used for syncing and merging." try: g["id"] = self.col._backend.add_or_update_deck_legacy( @@ -303,7 +306,7 @@ class DeckManager: except NotFoundError: return None - def update_config(self, conf: DeckConfig, preserve_usn=False) -> None: + def update_config(self, conf: DeckConfig, preserve_usn: bool = False) -> None: conf["id"] = self.col._backend.add_or_update_deck_config_legacy( config=to_json_bytes(conf), preserve_usn_and_mtime=preserve_usn ) @@ -325,7 +328,7 @@ class DeckManager: ) -> int: return self.add_config(name, clone_from)["id"] - def remove_config(self, id) -> None: + def remove_config(self, id: int) -> None: "Remove a configuration and update all decks using it." self.col.modSchema(check=True) for g in self.all(): @@ -341,14 +344,14 @@ class DeckManager: grp["conf"] = id self.save(grp) - def didsForConf(self, conf) -> List[int]: + def didsForConf(self, conf: DeckConfig) -> List[int]: dids = [] for deck in self.all(): if "conf" in deck and deck["conf"] == conf["id"]: dids.append(deck["id"]) return dids - def restoreToDefault(self, conf) -> None: + def restoreToDefault(self, conf: DeckConfig) -> None: oldOrder = conf["new"]["order"] new = from_json_bytes(self.col._backend.new_deck_config_legacy()) new["id"] = conf["id"] @@ -380,7 +383,7 @@ class DeckManager: return deck["name"] return None - def setDeck(self, cids, did) -> None: + def setDeck(self, cids: List[int], did: int) -> None: self.col.db.execute( "update cards set did=?,usn=?,mod=? where id in " + ids2str(cids), did, @@ -424,7 +427,7 @@ class DeckManager: self.col.conf["activeDecks"] = active # don't use this, it will likely go away - def update_active(self): + def update_active(self) -> None: self.select(self.current()["id"]) # Parents/children @@ -480,7 +483,7 @@ class DeckManager: # Change to Dict[int, "DeckManager.childMapNode"] when MyPy allow recursive type def childDids(self, did: int, childMap: DeckManager.childMapNode) -> List: - def gather(node: DeckManager.childMapNode, arr): + def gather(node: DeckManager.childMapNode, arr: List) -> None: for did, child in node.items(): arr.append(did) gather(child, arr) diff --git a/pylib/anki/find.py b/pylib/anki/find.py index 553c9b35e..af829cb7d 100644 --- a/pylib/anki/find.py +++ b/pylib/anki/find.py @@ -16,10 +16,10 @@ class Finder: self.col = col.weakref() print("Finder() is deprecated, please use col.find_cards() or .find_notes()") - def findCards(self, query, order): + def findCards(self, query: Any, order: Any) -> Any: return self.col.find_cards(query, order) - def findNotes(self, query): + def findNotes(self, query: Any) -> Any: return self.col.find_notes(query) @@ -55,7 +55,7 @@ def fieldNamesForNotes(col: Collection, nids: List[int]) -> List[str]: ########################################################################## -def fieldNames(col, downcase=True) -> List: +def fieldNames(col: Collection, downcase: bool = True) -> List: fields: Set[str] = set() for m in col.models.all(): for f in m["flds"]: diff --git a/pylib/anki/hooks.py b/pylib/anki/hooks.py index 8a65d7bf2..87ca01ee8 100644 --- a/pylib/anki/hooks.py +++ b/pylib/anki/hooks.py @@ -25,7 +25,7 @@ from anki.hooks_gen import * _hooks: Dict[str, List[Callable[..., Any]]] = {} -def runHook(hook: str, *args) -> None: +def runHook(hook: str, *args: Any) -> None: "Run all functions on hook." hookFuncs = _hooks.get(hook, None) if hookFuncs: @@ -37,7 +37,7 @@ def runHook(hook: str, *args) -> None: raise -def runFilter(hook: str, arg: Any, *args) -> Any: +def runFilter(hook: str, arg: Any, *args: Any) -> Any: hookFuncs = _hooks.get(hook, None) if hookFuncs: for func in hookFuncs: @@ -57,7 +57,7 @@ def addHook(hook: str, func: Callable) -> None: _hooks[hook].append(func) -def remHook(hook, func) -> None: +def remHook(hook: Any, func: Any) -> None: "Remove a function if is on hook." hook = _hooks.get(hook, []) if func in hook: @@ -72,10 +72,10 @@ def remHook(hook, func) -> None: # # If you call wrap() with pos='around', the original function will not be called # automatically but can be called with _old(). -def wrap(old, new, pos="after") -> Callable: +def wrap(old: Any, new: Any, pos: str = "after") -> Callable: "Override an existing function." - def repl(*args, **kwargs): + def repl(*args: Any, **kwargs: Any) -> Any: if pos == "after": old(*args, **kwargs) return new(*args, **kwargs) @@ -85,7 +85,7 @@ def wrap(old, new, pos="after") -> Callable: else: return new(_old=old, *args, **kwargs) - def decorator_wrapper(f, *args, **kwargs): + def decorator_wrapper(f: Any, *args: Any, **kwargs: Any) -> Any: return repl(*args, **kwargs) return decorator.decorator(decorator_wrapper)(old) diff --git a/pylib/anki/media.py b/pylib/anki/media.py index 0b294d45c..c9b3b293e 100644 --- a/pylib/anki/media.py +++ b/pylib/anki/media.py @@ -11,7 +11,7 @@ import time import urllib.error import urllib.parse import urllib.request -from typing import Any, Callable, List, Optional, Tuple +from typing import Any, Callable, List, Match, Optional, Tuple import anki import anki._backend.backend_pb2 as _pb @@ -197,7 +197,7 @@ class MediaManager: else: fn = urllib.parse.quote - def repl(match): + def repl(match: Match) -> str: tag = match.group(0) fname = match.group("fname") if re.match("(https?|ftp)://", fname): diff --git a/pylib/anki/models.py b/pylib/anki/models.py index e437171b3..363d12cc1 100644 --- a/pylib/anki/models.py +++ b/pylib/anki/models.py @@ -5,7 +5,9 @@ from __future__ import annotations import copy import pprint +import sys import time +import traceback from typing import Any, Dict, List, Optional, Sequence, Tuple, Union import anki # pylint: disable=unused-import @@ -39,36 +41,37 @@ class ModelsDictProxy: def __init__(self, col: anki.collection.Collection): self._col = col.weakref() - def _warn(self): + def _warn(self) -> None: + traceback.print_stack(file=sys.stdout) print("add-on should use methods on col.models, not col.models.models dict") - def __getitem__(self, item): + def __getitem__(self, item: Any) -> Any: self._warn() return self._col.models.get(int(item)) - def __setitem__(self, key, val): + def __setitem__(self, key: str, val: Any) -> None: self._warn() self._col.models.save(val) - def __len__(self): + def __len__(self) -> int: self._warn() return len(self._col.models.all_names_and_ids()) - def keys(self): + def keys(self) -> Any: self._warn() return [str(nt.id) for nt in self._col.models.all_names_and_ids()] - def values(self): + def values(self) -> Any: self._warn() return self._col.models.all() - def items(self): + def items(self) -> Any: self._warn() return [(str(nt["id"]), nt) for nt in self._col.models.all()] - def __contains__(self, item): + def __contains__(self, item: Any) -> bool: self._warn() - self._col.models.have(item) + return self._col.models.have(item) class ModelManager: @@ -123,7 +126,7 @@ class ModelManager: def _get_cached(self, ntid: int) -> Optional[NoteType]: return self._cache.get(ntid) - def _clear_cache(self): + def _clear_cache(self) -> None: self._cache = {} # Listing note types @@ -218,7 +221,7 @@ class ModelManager: "Delete model, and all its cards/notes." self.remove(m["id"]) - def remove_all_notetypes(self): + def remove_all_notetypes(self) -> None: for nt in self.all_names_and_ids(): self._remove_from_cache(nt.id) self.col._backend.remove_notetype(nt.id) @@ -236,7 +239,7 @@ class ModelManager: if existing_id is not None and existing_id != m["id"]: m["name"] += "-" + checksum(str(time.time()))[:5] - def update(self, m: NoteType, preserve_usn=True) -> None: + def update(self, m: NoteType, preserve_usn: bool = True) -> None: "Add or update an existing model. Use .save() instead." self._remove_from_cache(m["id"]) self.ensureNameUnique(m) diff --git a/pylib/anki/notes.py b/pylib/anki/notes.py index 4b33da9f8..456fd28bd 100644 --- a/pylib/anki/notes.py +++ b/pylib/anki/notes.py @@ -113,7 +113,7 @@ class Note: def __setitem__(self, key: str, value: str) -> None: self.fields[self._fieldOrd(key)] = value - def __contains__(self, key) -> bool: + def __contains__(self, key: str) -> bool: return key in self._fmap # Tags diff --git a/pylib/anki/sched.py b/pylib/anki/sched.py index 7c60fda30..ad4967a64 100644 --- a/pylib/anki/sched.py +++ b/pylib/anki/sched.py @@ -350,7 +350,7 @@ limit %d""" lastIvl = -(self._delayForGrade(conf, lastLeft)) ivl = card.ivl if leaving else -(self._delayForGrade(conf, card.left)) - def log(): + def log() -> None: self.col.db.execute( "insert into revlog values (?,?,?,?,?,?,?,?,?)", int(time.time() * 1000), @@ -450,7 +450,7 @@ and due <= ? limit ?)""", self._revQueue: List[Any] = [] self._revDids = self.col.decks.active()[:] - def _fillRev(self, recursing=False) -> bool: + def _fillRev(self, recursing: bool = False) -> bool: "True if a review card can be fetched." if self._revQueue: return True diff --git a/pylib/anki/schedv2.py b/pylib/anki/schedv2.py index 682974a29..b470eb1dc 100644 --- a/pylib/anki/schedv2.py +++ b/pylib/anki/schedv2.py @@ -166,7 +166,7 @@ class Scheduler: self._restorePreviewCard(card) self._removeFromFiltered(card) - def _reset_counts(self): + def _reset_counts(self) -> None: tree = self.deck_due_tree(self.col.decks.selected()) node = self.col.decks.find_deck_in_tree(tree, int(self.col.conf["curDeck"])) if not node: @@ -187,7 +187,7 @@ class Scheduler: new, lrn, rev = counts return (new, lrn, rev) - def _is_finished(self): + def _is_finished(self) -> bool: "Don't use this, it is a stop-gap until this code is refactored." return not any((self.newCount, self.revCount, self._immediate_learn_count)) @@ -229,8 +229,12 @@ order by due""" ########################################################################## def update_stats( - self, deck_id: int, new_delta=0, review_delta=0, milliseconds_delta=0 - ): + self, + deck_id: int, + new_delta: int = 0, + review_delta: int = 0, + milliseconds_delta: int = 0, + ) -> None: self.col._backend.update_stats( deck_id=deck_id, new_delta=new_delta, @@ -321,7 +325,7 @@ order by due""" self._newQueue: List[int] = [] self._updateNewCardRatio() - def _fillNew(self, recursing=False) -> bool: + def _fillNew(self, recursing: bool = False) -> bool: if self._newQueue: return True if not self.newCount: @@ -841,7 +845,7 @@ and due <= ? limit ?)""" def _resetRev(self) -> None: self._revQueue: List[int] = [] - def _fillRev(self, recursing=False) -> bool: + def _fillRev(self, recursing: bool = False) -> bool: "True if a review card can be fetched." if self._revQueue: return True @@ -947,7 +951,7 @@ select id from cards where did in %s and queue = {QUEUE_TYPE_REV} and due <= ? l self._removeFromFiltered(card) def _logRev(self, card: Card, ease: int, delay: int, type: int) -> None: - def log(): + def log() -> None: self.col.db.execute( "insert into revlog values (?,?,?,?,?,?,?,?,?)", int(time.time() * 1000), @@ -1344,7 +1348,7 @@ due = (case when odue>0 then odue else due end), odue = 0, odid = 0, usn = ? whe mode = BuryOrSuspendMode.BURY_SCHED self.col._backend.bury_or_suspend_cards(card_ids=ids, mode=mode) - def bury_note(self, note: Note): + def bury_note(self, note: Note) -> None: self.bury_cards(note.card_ids()) # legacy @@ -1472,7 +1476,7 @@ and (queue={QUEUE_TYPE_NEW} or (queue={QUEUE_TYPE_REV} and due<=?))""", def orderCards(self, did: int) -> None: self.col._backend.sort_deck(deck_id=did, randomize=False) - def resortConf(self, conf) -> None: + def resortConf(self, conf: DeckConfig) -> None: for did in self.col.decks.didsForConf(conf): if conf["new"]["order"] == 0: self.randomizeCards(did) diff --git a/pylib/anki/stats.py b/pylib/anki/stats.py index 24cdcc1b9..789d5e852 100644 --- a/pylib/anki/stats.py +++ b/pylib/anki/stats.py @@ -140,7 +140,7 @@ from revlog where id > ? """ relrn = relrn or 0 filt = filt or 0 # studied - def bold(s): + def bold(s: str) -> str: return "" + str(s) + "" if cards: @@ -298,7 +298,7 @@ group by day order by day""" # pylint: disable=invalid-unary-operand-type conf["xaxis"]["min"] = -days + 0.5 - def plot(id, data, ylabel, ylabel2): + def plot(id: str, data: Any, ylabel: str, ylabel2: str) -> str: return self._graph( id, data=data, conf=conf, xunit=chunk, ylabel=ylabel, ylabel2=ylabel2 ) @@ -333,7 +333,7 @@ group by day order by day""" # pylint: disable=invalid-unary-operand-type conf["xaxis"]["min"] = -days + 0.5 - def plot(id, data, ylabel, ylabel2): + def plot(id: str, data: Any, ylabel: str, ylabel2: str) -> str: return self._graph( id, data=data, conf=conf, xunit=chunk, ylabel=ylabel, ylabel2=ylabel2 ) diff --git a/pylib/anki/syncserver/__init__.py b/pylib/anki/syncserver/__init__.py index 71ae39196..aff0a35e0 100644 --- a/pylib/anki/syncserver/__init__.py +++ b/pylib/anki/syncserver/__init__.py @@ -11,9 +11,10 @@ import os import socket import sys import time +from http import HTTPStatus from io import BytesIO from tempfile import NamedTemporaryFile -from typing import Optional +from typing import Iterable, Optional try: import flask @@ -89,7 +90,7 @@ def handle_sync_request(method_str: str) -> Response: elif method == Method.FULL_DOWNLOAD: path = outdata.decode("utf8") - def stream_reply(): + def stream_reply() -> Iterable[bytes]: with open(path, "rb") as f: while chunk := f.read(16 * 1024): yield chunk @@ -106,7 +107,7 @@ def handle_sync_request(method_str: str) -> Response: return resp -def after_full_sync(): +def after_full_sync() -> None: # the server methods do not reopen the collection after a full sync, # so we need to col.reopen(after_full_sync=False) @@ -146,15 +147,17 @@ def get_method( @app.route("/", methods=["POST"]) -def handle_request(pathin: str): +def handle_request(pathin: str) -> Response: path = pathin print(int(time.time()), flask.request.remote_addr, path) if path.startswith("sync/"): return handle_sync_request(path.split("/", maxsplit=1)[1]) + else: + return flask.make_response("not found", HTTPStatus.NOT_FOUND) -def folder(): +def folder() -> str: folder = os.getenv("FOLDER", os.path.expanduser("~/.syncserver")) if not os.path.exists(folder): print("creating", folder) @@ -162,11 +165,11 @@ def folder(): return folder -def col_path(): +def col_path() -> str: return os.path.join(folder(), "collection.server.anki2") -def serve(): +def serve() -> None: global col col = Collection(col_path(), server=True) diff --git a/pylib/anki/tags.py b/pylib/anki/tags.py index e64e9d434..67ae46c20 100644 --- a/pylib/anki/tags.py +++ b/pylib/anki/tags.py @@ -13,7 +13,7 @@ from __future__ import annotations import pprint import re -from typing import Collection, List, Optional, Sequence, Tuple +from typing import Collection, List, Match, Optional, Sequence, Tuple import anki # pylint: disable=unused-import import anki._backend.backend_pb2 as _pb @@ -139,7 +139,7 @@ class TagManager: def remFromStr(self, deltags: str, tags: str) -> str: "Delete tags if they exist." - def wildcard(pat: str, repl: str): + def wildcard(pat: str, repl: str) -> Match: pat = re.escape(pat).replace("\\*", ".*") return re.match("^" + pat + "$", repl, re.IGNORECASE) diff --git a/pylib/mypy.ini b/pylib/mypy.ini index 7324ddfbc..074377dad 100644 --- a/pylib/mypy.ini +++ b/pylib/mypy.ini @@ -9,6 +9,14 @@ warn_redundant_casts = True warn_unused_configs = True strict_equality = true +[mypy-anki.*] +disallow_untyped_defs = True +[mypy-anki.importing.*] +disallow_untyped_defs = False +[mypy-anki.exporting] +disallow_untyped_defs = False + + [mypy-win32file] ignore_missing_imports = True [mypy-win32pipe]