From d8d7e78b6b9564454bca0a713b022977988ba633 Mon Sep 17 00:00:00 2001 From: Damien Elmes Date: Fri, 20 Dec 2019 16:33:49 +1000 Subject: [PATCH] remove typings from some other files not used frequently enough to deal with the mypy errors they're causing at the moment --- anki/importing/mnemo.py | 10 +- anki/importing/supermemo_xml.py | 49 ++++---- anki/mpv.py | 71 +++++------ anki/sched.py | 210 ++++++++++++++++---------------- 4 files changed, 167 insertions(+), 173 deletions(-) diff --git a/anki/importing/mnemo.py b/anki/importing/mnemo.py index 09e159f4e..294253882 100644 --- a/anki/importing/mnemo.py +++ b/anki/importing/mnemo.py @@ -101,7 +101,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""): def fields(self): return self._fields - def _mungeField(self, fld) -> str: + def _mungeField(self, fld): # \n -> br fld = re.sub("\r?\n", "
", fld) # latex differences @@ -110,7 +110,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""): fld = re.sub(")?", "[sound:\\1]", fld) return fld - def _addFronts(self, notes, model=None, fields=("f", "b")) -> None: + def _addFronts(self, notes, model=None, fields=("f", "b")): data = [] for orig in notes: # create a foreign note object @@ -135,7 +135,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""): # import self.importNotes(data) - def _addFrontBacks(self, notes) -> None: + def _addFrontBacks(self, notes): m = addBasicModel(self.col) m['name'] = "Mnemosyne-FrontBack" mm = self.col.models @@ -145,7 +145,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""): mm.addTemplate(m, t) self._addFronts(notes, m) - def _addVocabulary(self, notes) -> None: + def _addVocabulary(self, notes): mm = self.col.models m = mm.new("Mnemosyne-Vocabulary") for f in "Expression", "Pronunciation", "Meaning", "Notes": @@ -164,7 +164,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""): mm.add(m) self._addFronts(notes, m, fields=("f", "p_1", "m_1", "n")) - def _addCloze(self, notes) -> None: + def _addCloze(self, notes): data = [] notes = list(notes.values()) for orig in notes: diff --git a/anki/importing/supermemo_xml.py b/anki/importing/supermemo_xml.py index 9d32733b8..e2d428725 100644 --- a/anki/importing/supermemo_xml.py +++ b/anki/importing/supermemo_xml.py @@ -13,10 +13,7 @@ from anki.lang import ngettext from xml.dom import minidom from string import capwords import re, unicodedata, time -from typing import Any, List, Optional -from anki.collection import _Collection -from xml.dom.minidom import Element, Text class SmartDict(dict): """ See http://www.peterbe.com/plog/SmartDict @@ -28,7 +25,7 @@ class SmartDict(dict): x.get('first_name'). """ - def __init__(self, *a, **kw) -> None: + def __init__(self, *a, **kw): if a: if isinstance(type(a[0]), dict): kw.update(a[0]) @@ -43,7 +40,7 @@ class SmartDict(dict): class SuperMemoElement(SmartDict): "SmartDict wrapper to store SM Element data" - def __init__(self, *a, **kw) -> None: + def __init__(self, *a, **kw): SmartDict.__init__(self, *a, **kw) #default content self.__dict__['lTitle'] = None @@ -82,7 +79,7 @@ class SupermemoXmlImporter(NoteImporter): Code should be upgrade to support importing of SM2006 exports. """ - def __init__(self, col: _Collection, file: str) -> None: + def __init__(self, col, file): """Initialize internal varables. Pameters to be exposed to GUI are stored in self.META""" NoteImporter.__init__(self, col, file) @@ -122,17 +119,17 @@ class SupermemoXmlImporter(NoteImporter): ## TOOLS - def _fudgeText(self, text: str) -> Any: + def _fudgeText(self, text): "Replace sm syntax to Anki syntax" text = text.replace("\n\r", "
") text = text.replace("\n", "
") return text - def _unicode2ascii(self,str: str) -> str: + def _unicode2ascii(self,str): "Remove diacritic punctuation from strings (titles)" return "".join([ c for c in unicodedata.normalize('NFKD', str) if not unicodedata.combining(c)]) - def _decode_htmlescapes(self,s: str) -> str: + def _decode_htmlescapes(self,s): """Unescape HTML code.""" #In case of bad formated html you can import MinimalSoup etc.. see btflsoup source code from bs4 import BeautifulSoup as btflsoup @@ -145,7 +142,7 @@ class SupermemoXmlImporter(NoteImporter): return str(btflsoup(s, "html.parser")) - def _afactor2efactor(self, af: float) -> Any: + def _afactor2efactor(self, af): # Adapted from # Ranges for A-factors and E-factors @@ -169,7 +166,7 @@ class SupermemoXmlImporter(NoteImporter): ## DEFAULT IMPORTER METHODS - def foreignNotes(self) -> List[ForeignNote]: + def foreignNotes(self): # Load file and parse it by minidom self.loadSource(self.file) @@ -185,12 +182,12 @@ class SupermemoXmlImporter(NoteImporter): self.log.append(ngettext("%d card imported.", "%d cards imported.", self.total) % self.total) return self.notes - def fields(self) -> int: + def fields(self): return 2 ## PARSER METHODS - def addItemToCards(self,item: SuperMemoElement) -> None: + def addItemToCards(self,item): "This method actually do conversion" # new anki card @@ -250,7 +247,7 @@ class SupermemoXmlImporter(NoteImporter): self.notes.append(note) - def logger(self,text: str,level: int = 1) -> None: + def logger(self,text,level=1): "Wrapper for Anki logger" dLevels={0:'',1:'Info',2:'Verbose',3:'Debug'} @@ -262,7 +259,7 @@ class SupermemoXmlImporter(NoteImporter): # OPEN AND LOAD - def openAnything(self,source) -> Any: + def openAnything(self,source): "Open any source / actually only openig of files is used" if source == "-": @@ -285,7 +282,7 @@ class SupermemoXmlImporter(NoteImporter): import io return io.StringIO(str(source)) - def loadSource(self, source: str) -> None: + def loadSource(self, source): """Load source file and parse with xml.dom.minidom""" self.source = source self.logger('Load started...') @@ -296,7 +293,7 @@ class SupermemoXmlImporter(NoteImporter): # PARSE - def parse(self, node: Optional[Any] = None) -> None: + def parse(self, node=None): "Parse method - parses document elements" if node is None and self.xmldoc is not None: @@ -309,12 +306,12 @@ class SupermemoXmlImporter(NoteImporter): else: self.logger('No handler for method %s' % _method, level=3) - def parse_Document(self, node) -> None: + def parse_Document(self, node): "Parse XML document" self.parse(node.documentElement) - def parse_Element(self, node: Element) -> None: + def parse_Element(self, node): "Parse XML element" _method = "do_%s" % node.tagName @@ -325,7 +322,7 @@ class SupermemoXmlImporter(NoteImporter): self.logger('No handler for method %s' % _method, level=3) #print traceback.print_exc() - def parse_Text(self, node: Text) -> None: + def parse_Text(self, node): "Parse text inside elements. Text is stored into local buffer." text = node.data @@ -339,12 +336,12 @@ class SupermemoXmlImporter(NoteImporter): # DO - def do_SuperMemoCollection(self, node: Element) -> None: + def do_SuperMemoCollection(self, node): "Process SM Collection" for child in node.childNodes: self.parse(child) - def do_SuperMemoElement(self, node: Element) -> None: + def do_SuperMemoElement(self, node): "Process SM Element (Type - Title,Topics)" self.logger('='*45, level=3) @@ -394,14 +391,14 @@ class SupermemoXmlImporter(NoteImporter): t = self.cntMeta['title'].pop() self.logger('End of topic \t- %s' % (t), level=2) - def do_Content(self, node: Element) -> None: + def do_Content(self, node): "Process SM element Content" for child in node.childNodes: if hasattr(child,'tagName') and child.firstChild is not None: self.cntElm[-1][child.tagName]=child.firstChild.data - def do_LearningData(self, node: Element) -> None: + def do_LearningData(self, node): "Process SM element LearningData" for child in node.childNodes: @@ -418,7 +415,7 @@ class SupermemoXmlImporter(NoteImporter): # for child in node.childNodes: self.parse(child) # self.cntElm[-1][node.tagName]=self.cntBuf.pop() - def do_Title(self, node: Element) -> None: + def do_Title(self, node): "Process SM element Title" t = self._decode_htmlescapes(node.firstChild.data) @@ -428,7 +425,7 @@ class SupermemoXmlImporter(NoteImporter): self.logger('Start of topic \t- ' + " / ".join(self.cntMeta['title']), level=2) - def do_Type(self, node: Element) -> None: + def do_Type(self, node): "Process SM element Type" if len(self.cntBuf) >=1 : diff --git a/anki/mpv.py b/anki/mpv.py index c111f1343..1f23fa6f6 100644 --- a/anki/mpv.py +++ b/anki/mpv.py @@ -39,6 +39,8 @@ import inspect from distutils.spawn import find_executable # pylint: disable=import-error,no-name-in-module from queue import Queue, Empty, Full +from typing import Dict, Optional + class MPVError(Exception): pass @@ -56,7 +58,6 @@ class MPVTimeoutError(MPVError): pass from anki.utils import isWin -from typing import Any if isWin: # pylint: disable=import-error import win32file, win32pipe, pywintypes, winerror # pytype: disable=import-error @@ -66,7 +67,7 @@ class MPVBase: """ executable = find_executable("mpv") - popenEnv = None + popenEnv: Optional[Dict[str,str]] = None default_argv = [ "--idle", @@ -77,7 +78,7 @@ class MPVBase: "--keep-open=no", ] - def __init__(self, window_id=None, debug=False) -> None: + def __init__(self, window_id=None, debug=False): self.window_id = window_id self.debug = debug @@ -88,18 +89,18 @@ class MPVBase: self._prepare_thread() self._start_thread() - def __del__(self) -> None: + def __del__(self): self._stop_thread() self._stop_process() self._stop_socket() - def _thread_id(self) -> int: + def _thread_id(self): return threading.get_ident() # # Process # - def _prepare_process(self) -> None: + def _prepare_process(self): """Prepare the argument list for the mpv process. """ self.argv = [self.executable] @@ -108,12 +109,12 @@ class MPVBase: if self.window_id is not None: self.argv += ["--wid", str(self.window_id)] - def _start_process(self) -> None: + def _start_process(self): """Start the mpv process. """ self._proc = subprocess.Popen(self.argv, env=self.popenEnv) - def _stop_process(self) -> None: + def _stop_process(self): """Stop the mpv process. """ if hasattr(self, "_proc"): @@ -126,7 +127,7 @@ class MPVBase: # # Socket communication # - def _prepare_socket(self) -> None: + def _prepare_socket(self): """Create a random socket filename which we pass to mpv with the --input-unix-socket option. """ @@ -137,7 +138,7 @@ class MPVBase: os.close(fd) os.remove(self._sock_filename) - def _start_socket(self) -> None: + def _start_socket(self): """Wait for the mpv process to create the unix socket and finish startup. """ @@ -174,7 +175,7 @@ class MPVBase: else: raise MPVProcessError("unable to start process") - def _stop_socket(self) -> None: + def _stop_socket(self): """Clean up the socket. """ if hasattr(self, "_sock"): @@ -185,7 +186,7 @@ class MPVBase: except OSError: pass - def _prepare_thread(self) -> None: + def _prepare_thread(self): """Set up the queues for the communication threads. """ self._request_queue = Queue(1) @@ -193,14 +194,14 @@ class MPVBase: self._event_queue = Queue() self._stop_event = threading.Event() - def _start_thread(self) -> None: + def _start_thread(self): """Start up the communication threads. """ self._thread = threading.Thread(target=self._reader) self._thread.daemon = True self._thread.start() - def _stop_thread(self) -> None: + def _stop_thread(self): """Stop the communication threads. """ if hasattr(self, "_stop_event"): @@ -208,7 +209,7 @@ class MPVBase: if hasattr(self, "_thread"): self._thread.join() - def _reader(self) -> None: + def _reader(self): """Read the incoming json messages from the unix socket that is connected to the mpv process. Pass them on to the message handler. """ @@ -250,21 +251,21 @@ class MPVBase: # # Message handling # - def _compose_message(self, message) -> bytes: + def _compose_message(self, message): """Return a json representation from a message dictionary. """ # XXX may be strict is too strict ;-) data = json.dumps(message) return data.encode("utf8", "strict") + b"\n" - def _parse_message(self, data) -> Any: + def _parse_message(self, data): """Return a message dictionary from a json representation. """ # XXX may be strict is too strict ;-) data = data.decode("utf8", "strict") return json.loads(data) - def _handle_message(self, message) -> None: + def _handle_message(self, message): """Handle different types of incoming messages, i.e. responses to commands or asynchronous events. """ @@ -284,7 +285,7 @@ class MPVBase: else: raise MPVCommunicationError("invalid message %r" % message) - def _send_message(self, message, timeout=None) -> None: + def _send_message(self, message, timeout=None): """Send a message/command to the mpv process, message must be a dictionary of the form {"command": ["arg1", "arg2", ...]}. Responses from the mpv process must be collected using _get_response(). @@ -321,7 +322,7 @@ class MPVBase: raise MPVCommunicationError("broken sender socket") data = data[size:] - def _get_response(self, timeout=None) -> Any: + def _get_response(self, timeout=None): """Collect the response message to a previous request. If there was an error a MPVCommandError exception is raised, otherwise the command specific data is returned. @@ -336,7 +337,7 @@ class MPVBase: else: return message.get("data") - def _get_event(self, timeout=None) -> Any: + def _get_event(self, timeout=None): """Collect a single event message that has been received out-of-band from the mpv process. If a timeout is specified and there have not been any events during that period, None is returned. @@ -346,7 +347,7 @@ class MPVBase: except Empty: return None - def _send_request(self, message, timeout=None, _retry=1) -> Any: + def _send_request(self, message, timeout=None, _retry=1): """Send a command to the mpv process and collect the result. """ self.ensure_running() @@ -366,12 +367,12 @@ class MPVBase: # # Public API # - def is_running(self) -> bool: + def is_running(self): """Return True if the mpv process is still active. """ return self._proc.poll() is None - def ensure_running(self) -> None: + def ensure_running(self): if not self.is_running(): self._stop_thread() self._stop_process() @@ -383,7 +384,7 @@ class MPVBase: self._prepare_thread() self._start_thread() - def close(self) -> None: + def close(self): """Shutdown the mpv process and our communication setup. """ if self.is_running(): @@ -414,7 +415,7 @@ class MPV(MPVBase): threads to the same MPV instance are synchronized. """ - def __init__(self, *args, **kwargs) -> None: + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._callbacks = {} @@ -464,7 +465,7 @@ class MPV(MPVBase): # # Event/callback API # - def _event_reader(self) -> None: + def _event_reader(self): """Collect incoming event messages and call the event handler. """ while not self._stop_event.is_set(): @@ -474,7 +475,7 @@ class MPV(MPVBase): self._handle_event(message) - def _handle_event(self, message) -> None: + def _handle_event(self, message): """Lookup and call the callbacks for a particular event message. """ if message["event"] == "property-change": @@ -488,7 +489,7 @@ class MPV(MPVBase): else: callback() - def register_callback(self, name, callback) -> None: + def register_callback(self, name, callback): """Register a function `callback` for the event `name`. """ try: @@ -498,7 +499,7 @@ class MPV(MPVBase): self._callbacks.setdefault(name, []).append(callback) - def unregister_callback(self, name, callback) -> None: + def unregister_callback(self, name, callback): """Unregister a previously registered function `callback` for the event `name`. """ @@ -512,7 +513,7 @@ class MPV(MPVBase): except ValueError: raise MPVError("callback %r not registered for event %r" % (callback, name)) - def register_property_callback(self, name, callback) -> int: + def register_property_callback(self, name, callback): """Register a function `callback` for the property-change event on property `name`. """ @@ -534,7 +535,7 @@ class MPV(MPVBase): self._property_serials[(name, callback)] = serial return serial - def unregister_property_callback(self, name, callback) -> None: + def unregister_property_callback(self, name, callback): """Unregister a previously registered function `callback` for the property-change event on property `name`. """ @@ -554,17 +555,17 @@ class MPV(MPVBase): # # Public API # - def command(self, *args, timeout=1) -> Any: + def command(self, *args, timeout=1): """Execute a single command on the mpv process and return the result. """ return self._send_request({"command": list(args)}, timeout=timeout) - def get_property(self, name) -> Any: + def get_property(self, name): """Return the value of property `name`. """ return self.command("get_property", name) - def set_property(self, name, value) -> Any: + def set_property(self, name, value): """Set the value of property `name`. """ return self.command("set_property", name, value) diff --git a/anki/sched.py b/anki/sched.py index 20e65c9de..dcce7963f 100644 --- a/anki/sched.py +++ b/anki/sched.py @@ -14,10 +14,6 @@ from anki.lang import _ from anki.consts import * from anki.hooks import runHook -from anki.cards import Card -#from anki.collection import _Collection -from typing import Any, Callable, Dict, List, Optional, Union, Tuple - # queue types: 0=new/cram, 1=lrn, 2=rev, 3=day lrn, -1=suspended, -2=buried # revlog types: 0=lrn, 1=rev, 2=relrn, 3=cram # positive revlog intervals are in days (rev), negative in seconds (lrn) @@ -28,7 +24,7 @@ class Scheduler: _spreadRev = True _burySiblingsOnAnswer = True - def __init__(self, col) -> None: + def __init__(self, col): self.col = col self.queueLimit = 50 self.reportLimit = 1000 @@ -37,7 +33,7 @@ class Scheduler: self._haveQueues = False self._updateCutoff() - def getCard(self) -> Any: + def getCard(self): "Pop the next card from the queue. None if finished." self._checkDay() if not self._haveQueues: @@ -51,14 +47,14 @@ class Scheduler: card.startTimer() return card - def reset(self) -> None: + def reset(self): self._updateCutoff() self._resetLrn() self._resetRev() self._resetNew() self._haveQueues = True - def answerCard(self, card: Card, ease: int) -> None: + def answerCard(self, card, ease): self.col.log() assert 1 <= ease <= 4 self.col.markReview(card) @@ -97,7 +93,7 @@ class Scheduler: card.usn = self.col.usn() card.flushSched() - def counts(self, card: None = None) -> tuple: + def counts(self, card=None): counts = [self.newCount, self.lrnCount, self.revCount] if card: idx = self.countIdx(card) @@ -107,7 +103,7 @@ class Scheduler: counts[idx] += 1 return tuple(counts) - def dueForecast(self, days=7) -> List: + def dueForecast(self, days=7): "Return counts over next DAYS. Includes today." daysd = dict(self.col.db.all(""" select due, count() from cards @@ -125,12 +121,12 @@ order by due""" % self._deckLimit(), ret = [x[1] for x in sorted(daysd.items())] return ret - def countIdx(self, card: Card) -> Any: + def countIdx(self, card): if card.queue == 3: return 1 return card.queue - def answerButtons(self, card: Card) -> int: + def answerButtons(self, card): if card.odue: # normal review in dyn deck? if card.odid and card.queue == 2: @@ -144,7 +140,7 @@ order by due""" % self._deckLimit(), else: return 3 - def unburyCards(self) -> None: + def unburyCards(self): "Unbury cards." self.col.conf['lastUnburied'] = self.today self.col.log( @@ -152,7 +148,7 @@ order by due""" % self._deckLimit(), self.col.db.execute( "update cards set queue=type where queue = -2") - def unburyCardsForDeck(self) -> None: + def unburyCardsForDeck(self): sids = ids2str(self.col.decks.active()) self.col.log( self.col.db.list("select id from cards where queue = -2 and did in %s" @@ -164,7 +160,7 @@ order by due""" % self._deckLimit(), # Rev/lrn/time daily stats ########################################################################## - def _updateStats(self, card: Card, type: str, cnt: int = 1) -> None: + def _updateStats(self, card, type, cnt=1): key = type+"Today" for g in ([self.col.decks.get(card.did)] + self.col.decks.parents(card.did)): @@ -172,7 +168,7 @@ order by due""" % self._deckLimit(), g[key][1] += cnt self.col.decks.save(g) - def extendLimits(self, new, rev) -> None: + def extendLimits(self, new, rev): cur = self.col.decks.current() parents = self.col.decks.parents(cur['id']) children = [self.col.decks.get(did) for (name, did) in @@ -183,7 +179,7 @@ order by due""" % self._deckLimit(), g['revToday'][1] -= rev self.col.decks.save(g) - def _walkingCount(self, limFn: Optional[Callable] = None, cntFn: Optional[Callable] = None) -> Any: + def _walkingCount(self, limFn=None, cntFn=None): tot = 0 pcounts = {} # for each of the active decks @@ -217,7 +213,7 @@ order by due""" % self._deckLimit(), # Deck list ########################################################################## - def deckDueList(self) -> List[list]: + def deckDueList(self): "Returns [deckname, did, rev, lrn, new]" self._checkDay() self.col.decks.checkIntegrity() @@ -251,10 +247,10 @@ order by due""" % self._deckLimit(), lims[deck['name']] = [nlim, rlim] return data - def deckDueTree(self) -> Any: + def deckDueTree(self): return self._groupChildren(self.deckDueList()) - def _groupChildren(self, grps: List[List]) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]: + def _groupChildren(self, grps): # first, split the group names into components for g in grps: g[0] = g[0].split("::") @@ -263,7 +259,7 @@ order by due""" % self._deckLimit(), # then run main function return self._groupChildrenMain(grps) - def _groupChildrenMain(self, grps: List[List[Union[List[str], int]]]) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]: + def _groupChildrenMain(self, grps): tree = [] # group and recurse def key(grp): @@ -304,7 +300,7 @@ order by due""" % self._deckLimit(), # Getting the next card ########################################################################## - def _getCard(self) -> Any: + def _getCard(self): "Return the next due card id, or None." # learning card due? c = self._getLrnCard() @@ -333,19 +329,19 @@ order by due""" % self._deckLimit(), # New cards ########################################################################## - def _resetNewCount(self) -> None: + def _resetNewCount(self): cntFn = lambda did, lim: self.col.db.scalar(""" select count() from (select 1 from cards where did = ? and queue = 0 limit ?)""", did, lim) self.newCount = self._walkingCount(self._deckNewLimitSingle, cntFn) - def _resetNew(self) -> None: + def _resetNew(self): self._resetNewCount() self._newDids = self.col.decks.active()[:] self._newQueue = [] self._updateNewCardRatio() - def _fillNew(self) -> Any: + def _fillNew(self): if self._newQueue: return True if not self.newCount: @@ -369,12 +365,12 @@ did = ? and queue = 0 limit ?)""", did, lim) self._resetNew() return self._fillNew() - def _getNewCard(self) -> Any: + def _getNewCard(self): if self._fillNew(): self.newCount -= 1 return self.col.getCard(self._newQueue.pop()) - def _updateNewCardRatio(self) -> None: + def _updateNewCardRatio(self): if self.col.conf['newSpread'] == NEW_CARDS_DISTRIBUTE: if self.newCount: self.newCardModulus = ( @@ -385,7 +381,7 @@ did = ? and queue = 0 limit ?)""", did, lim) return self.newCardModulus = 0 - def _timeForNewCard(self) -> Optional[int]: + def _timeForNewCard(self): "True if it's time to display a new card when distributing." if not self.newCount: return False @@ -396,7 +392,7 @@ did = ? and queue = 0 limit ?)""", did, lim) elif self.newCardModulus: return self.reps and self.reps % self.newCardModulus == 0 - def _deckNewLimit(self, did: int, fn: Optional[Callable] = None) -> Any: + def _deckNewLimit(self, did, fn=None): if not fn: fn = self._deckNewLimitSingle sel = self.col.decks.get(did) @@ -410,7 +406,7 @@ did = ? and queue = 0 limit ?)""", did, lim) lim = min(rem, lim) return lim - def _newForDeck(self, did: int, lim: int) -> Any: + def _newForDeck(self, did, lim): "New count for a single deck." if not lim: return 0 @@ -419,14 +415,14 @@ did = ? and queue = 0 limit ?)""", did, lim) select count() from (select 1 from cards where did = ? and queue = 0 limit ?)""", did, lim) - def _deckNewLimitSingle(self, g: Dict[str, Any]) -> Any: + def _deckNewLimitSingle(self, g): "Limit for deck without parent limits." if g['dyn']: return self.reportLimit c = self.col.decks.confForDid(g['id']) return max(0, c['new']['perDay'] - g['newToday'][1]) - def totalNewForCurrentDeck(self) -> Any: + def totalNewForCurrentDeck(self): return self.col.db.scalar( """ select count() from cards where id in ( @@ -436,7 +432,7 @@ select id from cards where did in %s and queue = 0 limit ?)""" # Learning queues ########################################################################## - def _resetLrnCount(self) -> None: + def _resetLrnCount(self): # sub-day self.lrnCount = self.col.db.scalar(""" select sum(left/1000) from (select left from cards where @@ -449,14 +445,14 @@ select count() from cards where did in %s and queue = 3 and due <= ? limit %d""" % (self._deckLimit(), self.reportLimit), self.today) - def _resetLrn(self) -> None: + def _resetLrn(self): self._resetLrnCount() self._lrnQueue = [] self._lrnDayQueue = [] self._lrnDids = self.col.decks.active()[:] # sub-day learning - def _fillLrn(self) -> Any: + def _fillLrn(self): if not self.lrnCount: return False if self._lrnQueue: @@ -469,7 +465,7 @@ limit %d""" % (self._deckLimit(), self.reportLimit), lim=self.dayCutoff) self._lrnQueue.sort() return self._lrnQueue - def _getLrnCard(self, collapse: bool = False) -> Any: + def _getLrnCard(self, collapse=False): if self._fillLrn(): cutoff = time.time() if collapse: @@ -481,7 +477,7 @@ limit %d""" % (self._deckLimit(), self.reportLimit), lim=self.dayCutoff) return card # daily learning - def _fillLrnDay(self) -> Optional[bool]: + def _fillLrnDay(self): if not self.lrnCount: return False if self._lrnDayQueue: @@ -505,15 +501,15 @@ did = ? and queue = 3 and due <= ? limit ?""", # nothing left in the deck; move to next self._lrnDids.pop(0) - def _getLrnDayCard(self) -> Any: + def _getLrnDayCard(self): if self._fillLrnDay(): self.lrnCount -= 1 return self.col.getCard(self._lrnDayQueue.pop()) - def _answerLrnCard(self, card: Card, ease: int) -> None: + def _answerLrnCard(self, card, ease): # ease 1=no, 2=yes, 3=remove conf = self._lrnConf(card) - if card.odid and not card.wasNew: # type: ignore + if card.odid and not card.wasNew: type = 3 elif card.type == 2: type = 2 @@ -572,7 +568,7 @@ did = ? and queue = 3 and due <= ? limit ?""", card.queue = 3 self._logLrn(card, ease, conf, leaving, type, lastLeft) - def _delayForGrade(self, conf: Dict[str, Any], left: int) -> Any: + def _delayForGrade(self, conf, left): left = left % 1000 try: delay = conf['delays'][-left] @@ -584,13 +580,13 @@ did = ? and queue = 3 and due <= ? limit ?""", delay = 1 return delay*60 - def _lrnConf(self, card: Card) -> Any: + def _lrnConf(self, card): if card.type == 2: return self._lapseConf(card) else: return self._newConf(card) - def _rescheduleAsRev(self, card: Card, conf: Dict[str, Any], early: bool) -> None: + def _rescheduleAsRev(self, card, conf, early): lapse = card.type == 2 if lapse: if self._resched(card): @@ -613,7 +609,7 @@ did = ? and queue = 3 and due <= ? limit ?""", card.queue = card.type = 0 card.due = self.col.nextID("pos") - def _startingLeft(self, card: Card) -> int: + def _startingLeft(self, card): if card.type == 2: conf = self._lapseConf(card) else: @@ -622,7 +618,7 @@ did = ? and queue = 3 and due <= ? limit ?""", tod = self._leftToday(conf['delays'], tot) return tot + tod*1000 - def _leftToday(self, delays: Any, left: int, now: None = None) -> int: + def _leftToday(self, delays, left, now=None): "The number of steps that can be completed by the day cutoff." if not now: now = intTime() @@ -635,7 +631,7 @@ did = ? and queue = 3 and due <= ? limit ?""", ok = i return ok+1 - def _graduatingIvl(self, card: Card, conf: Dict[str, Any], early: bool, adj: bool = True) -> Any: + def _graduatingIvl(self, card, conf, early, adj=True): if card.type == 2: # lapsed card being relearnt if card.odid: @@ -653,13 +649,13 @@ did = ? and queue = 3 and due <= ? limit ?""", else: return ideal - def _rescheduleNew(self, card: Card, conf: Dict[str, Any], early: bool) -> None: + def _rescheduleNew(self, card, conf, early): "Reschedule a new card that's graduated for the first time." card.ivl = self._graduatingIvl(card, conf, early) card.due = self.today+card.ivl card.factor = conf['initialFactor'] - def _logLrn(self, card: Card, ease: int, conf: Dict[str, Any], leaving: bool, type: int, lastLeft: int) -> None: + def _logLrn(self, card, ease, conf, leaving, type, lastLeft): lastIvl = -(self._delayForGrade(conf, lastLeft)) ivl = card.ivl if leaving else -(self._delayForGrade(conf, card.left)) def log(): @@ -674,7 +670,7 @@ did = ? and queue = 3 and due <= ? limit ?""", time.sleep(0.01) log() - def removeLrn(self, ids: Optional[List[int]] = None) -> None: + def removeLrn(self, ids=None): "Remove cards from the learning queues." if ids: extra = " and id in "+ids2str(ids) @@ -693,7 +689,7 @@ where queue in (1,3) and type = 2 self.forgetCards(self.col.db.list( "select id from cards where queue in (1,3) %s" % extra)) - def _lrnForDeck(self, did: int) -> Any: + def _lrnForDeck(self, did): cnt = self.col.db.scalar( """ select sum(left/1000) from @@ -709,16 +705,16 @@ and due <= ? limit ?)""", # Reviews ########################################################################## - def _deckRevLimit(self, did: int) -> Any: + def _deckRevLimit(self, did): return self._deckNewLimit(did, self._deckRevLimitSingle) - def _deckRevLimitSingle(self, d: Dict[str, Any]) -> Any: + def _deckRevLimitSingle(self, d): if d['dyn']: return self.reportLimit c = self.col.decks.confForDid(d['id']) return max(0, c['rev']['perDay'] - d['revToday'][1]) - def _revForDeck(self, did: int, lim: int) -> Any: + def _revForDeck(self, did, lim): lim = min(lim, self.reportLimit) return self.col.db.scalar( """ @@ -727,7 +723,7 @@ select count() from and due <= ? limit ?)""", did, self.today, lim) - def _resetRevCount(self) -> None: + def _resetRevCount(self): def cntFn(did, lim): return self.col.db.scalar(""" select count() from (select id from cards where @@ -736,12 +732,12 @@ did = ? and queue = 2 and due <= ? limit %d)""" % lim, self.revCount = self._walkingCount( self._deckRevLimitSingle, cntFn) - def _resetRev(self) -> None: + def _resetRev(self): self._resetRevCount() self._revQueue = [] self._revDids = self.col.decks.active()[:] - def _fillRev(self) -> Any: + def _fillRev(self): if self._revQueue: return True if not self.revCount: @@ -778,12 +774,12 @@ did = ? and queue = 2 and due <= ? limit ?""", self._resetRev() return self._fillRev() - def _getRevCard(self) -> Any: + def _getRevCard(self): if self._fillRev(): self.revCount -= 1 return self.col.getCard(self._revQueue.pop()) - def totalRevForCurrentDeck(self) -> Any: + def totalRevForCurrentDeck(self): return self.col.db.scalar( """ select count() from cards where id in ( @@ -793,7 +789,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" # Answering a review card ########################################################################## - def _answerRevCard(self, card: Card, ease: int) -> None: + def _answerRevCard(self, card, ease): delay = 0 if ease == 1: delay = self._rescheduleLapse(card) @@ -801,7 +797,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" self._rescheduleRev(card, ease) self._logRev(card, ease, delay) - def _rescheduleLapse(self, card: Card) -> Any: + def _rescheduleLapse(self, card): conf = self._lapseConf(card) card.lastIvl = card.ivl if self._resched(card): @@ -837,10 +833,10 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" card.queue = 3 return delay - def _nextLapseIvl(self, card: Card, conf: Dict[str, Any]) -> Any: + def _nextLapseIvl(self, card, conf): return max(conf['minInt'], int(card.ivl*conf['mult'])) - def _rescheduleRev(self, card: Card, ease: int) -> None: + def _rescheduleRev(self, card, ease): # update interval card.lastIvl = card.ivl if self._resched(card): @@ -855,7 +851,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" card.odid = 0 card.odue = 0 - def _logRev(self, card: Card, ease: int, delay: Union[int, float]) -> None: + def _logRev(self, card, ease, delay): def log(): self.col.db.execute( "insert into revlog values (?,?,?,?,?,?,?,?,?)", @@ -872,7 +868,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" # Interval management ########################################################################## - def _nextRevIvl(self, card: Card, ease: int) -> Any: + def _nextRevIvl(self, card, ease): "Ideal next interval for CARD, given EASE." delay = self._daysLate(card) conf = self._revConf(card) @@ -890,11 +886,11 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" # interval capped? return min(interval, conf['maxIvl']) - def _fuzzedIvl(self, ivl: int) -> int: + def _fuzzedIvl(self, ivl): min, max = self._fuzzIvlRange(ivl) return random.randint(min, max) - def _fuzzIvlRange(self, ivl: int) -> List: + def _fuzzIvlRange(self, ivl): if ivl < 2: return [1, 1] elif ivl == 2: @@ -909,22 +905,22 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" fuzz = max(fuzz, 1) return [ivl-fuzz, ivl+fuzz] - def _constrainedIvl(self, ivl: float, conf: Dict[str, Any], prev: int) -> int: + def _constrainedIvl(self, ivl, conf, prev): "Integer interval after interval factor and prev+1 constraints applied." new = ivl * conf.get('ivlFct', 1) return int(max(new, prev+1)) - def _daysLate(self, card: Card) -> Any: + def _daysLate(self, card): "Number of days later than scheduled." due = card.odue if card.odid else card.due return max(0, self.today - due) - def _updateRevIvl(self, card: Card, ease: int) -> None: + def _updateRevIvl(self, card, ease): idealIvl = self._nextRevIvl(card, ease) card.ivl = min(max(self._adjRevIvl(card, idealIvl), card.ivl+1), self._revConf(card)['maxIvl']) - def _adjRevIvl(self, card: Card, idealIvl: int) -> int: + def _adjRevIvl(self, card, idealIvl): if self._spreadRev: idealIvl = self._fuzzedIvl(idealIvl) return idealIvl @@ -932,7 +928,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" # Dynamic deck handling ########################################################################## - def rebuildDyn(self, did: Optional[int] = None) -> Any: + def rebuildDyn(self, did=None): "Rebuild a dynamic deck." did = did or self.col.decks.selected() deck = self.col.decks.get(did) @@ -946,7 +942,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" self.col.decks.select(did) return ids - def _fillDyn(self, deck: Dict[str, Any]) -> Any: + def _fillDyn(self, deck): search, limit, order = deck['terms'][0] orderlimit = self._dynOrder(order, limit) if search.strip(): @@ -962,7 +958,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)""" self._moveToDyn(deck['id'], ids) return ids - def emptyDyn(self, did: Optional[int], lim: Optional[str] = None) -> None: + def emptyDyn(self, did, lim=None): if not lim: lim = "did = %s" % did self.col.log(self.col.db.list("select id from cards where %s" % lim)) @@ -973,10 +969,10 @@ else type end), type = (case when type = 1 then 0 else type end), due = odue, odue = 0, odid = 0, usn = ? where %s""" % lim, self.col.usn()) - def remFromDyn(self, cids: List[int]) -> None: + def remFromDyn(self, cids): self.emptyDyn(None, "id in %s and odid" % ids2str(cids)) - def _dynOrder(self, o: int, l: int) -> str: + def _dynOrder(self, o, l): if o == DYN_OLDEST: t = "(select max(id) from revlog where cid=c.id)" elif o == DYN_RANDOM: @@ -1001,7 +997,7 @@ due = odue, odue = 0, odid = 0, usn = ? where %s""" % lim, t = "c.due" return t + " limit %d" % l - def _moveToDyn(self, did: int, ids: List[int]) -> None: + def _moveToDyn(self, did, ids): deck = self.col.decks.get(did) data = [] t = intTime(); u = self.col.usn() @@ -1020,7 +1016,7 @@ odid = (case when odid then odid else did end), odue = (case when odue then odue else due end), did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) - def _dynIvlBoost(self, card: Card) -> Any: + def _dynIvlBoost(self, card): assert card.odid and card.type == 2 assert card.factor elapsed = card.ivl - (card.odue - self.today) @@ -1032,7 +1028,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) # Leeches ########################################################################## - def _checkLeech(self, card: Card, conf: Dict[str, Any]) -> Optional[bool]: + def _checkLeech(self, card, conf): "Leech handler. True if card was a leech." lf = conf['leechFails'] if not lf: @@ -1061,10 +1057,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) # Tools ########################################################################## - def _cardConf(self, card: Card) -> Any: + def _cardConf(self, card): return self.col.decks.confForDid(card.did) - def _newConf(self, card: Card) -> Any: + def _newConf(self, card): conf = self._cardConf(card) # normal deck if not card.odid: @@ -1084,7 +1080,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) perDay=self.reportLimit ) - def _lapseConf(self, card: Card) -> Any: + def _lapseConf(self, card): conf = self._cardConf(card) # normal deck if not card.odid: @@ -1103,7 +1099,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) resched=conf['resched'], ) - def _revConf(self, card: Card) -> Any: + def _revConf(self, card): conf = self._cardConf(card) # normal deck if not card.odid: @@ -1111,10 +1107,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) # dynamic deck return self.col.decks.confForDid(card.odid)['rev'] - def _deckLimit(self) -> str: + def _deckLimit(self): return ids2str(self.col.decks.active()) - def _resched(self, card: Card) -> Any: + def _resched(self, card): conf = self._cardConf(card) if not conf['dyn']: return True @@ -1123,7 +1119,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) # Daily cutoff ########################################################################## - def _updateCutoff(self) -> None: + def _updateCutoff(self): oldToday = self.today # days since col created self.today = int((time.time() - self.col.crt) // 86400) @@ -1145,7 +1141,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) if unburied < self.today: self.unburyCards() - def _checkDay(self) -> None: + def _checkDay(self): # check if the day has rolled over if time.time() > self.dayCutoff: self.reset() @@ -1153,12 +1149,12 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data) # Deck finished state ########################################################################## - def finishedMsg(self) -> str: + def finishedMsg(self): return (""+_( "Congratulations! You have finished this deck for now.")+ "

" + self._nextDueMsg()) - def _nextDueMsg(self) -> str: + def _nextDueMsg(self): line = [] # the new line replacements are so we don't break translations # in a point release @@ -1185,20 +1181,20 @@ Some related or buried cards were delayed until a later session.""")+now) To study outside of the normal schedule, click the Custom Study button below.""")) return "

".join(line) - def revDue(self) -> Any: + def revDue(self): "True if there are any rev cards due." return self.col.db.scalar( ("select 1 from cards where did in %s and queue = 2 " "and due <= ? limit 1") % self._deckLimit(), self.today) - def newDue(self) -> Any: + def newDue(self): "True if there are any new cards due." return self.col.db.scalar( ("select 1 from cards where did in %s and queue = 0 " "limit 1") % self._deckLimit()) - def haveBuried(self) -> bool: + def haveBuried(self): sdids = ids2str(self.col.decks.active()) cnt = self.col.db.scalar( "select 1 from cards where queue = -2 and did in %s limit 1" % sdids) @@ -1207,7 +1203,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" # Next time reports ########################################################################## - def nextIvlStr(self, card: Card, ease: int, short: bool = False) -> Any: + def nextIvlStr(self, card, ease, short=False): "Return the next interval for CARD as a string." ivl = self.nextIvl(card, ease) if not ivl: @@ -1217,7 +1213,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" s = "<"+s return s - def nextIvl(self, card: Card, ease: int) -> Any: + def nextIvl(self, card, ease): "Return the next interval for CARD, in seconds." if card.queue in (0,1,3): return self._nextLrnIvl(card, ease) @@ -1232,7 +1228,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" return self._nextRevIvl(card, ease)*86400 # this isn't easily extracted from the learn code - def _nextLrnIvl(self, card: Card, ease: int) -> Any: + def _nextLrnIvl(self, card, ease): if card.queue == 0: card.left = self._startingLeft(card) conf = self._lrnConf(card) @@ -1257,7 +1253,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" # Suspending ########################################################################## - def suspendCards(self, ids: List[int]) -> None: + def suspendCards(self, ids): "Suspend cards." self.col.log(ids) self.remFromDyn(ids) @@ -1266,7 +1262,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" "update cards set queue=-1,mod=?,usn=? where id in "+ ids2str(ids), intTime(), self.col.usn()) - def unsuspendCards(self, ids: List[int]) -> None: + def unsuspendCards(self, ids): "Unsuspend cards." self.col.log(ids) self.col.db.execute( @@ -1274,7 +1270,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" "where queue = -1 and id in "+ ids2str(ids), intTime(), self.col.usn()) - def buryCards(self, cids: List[int]) -> None: + def buryCards(self, cids): self.col.log(cids) self.remFromDyn(cids) self.removeLrn(cids) @@ -1282,7 +1278,7 @@ To study outside of the normal schedule, click the Custom Study button below.""" update cards set queue=-2,mod=?,usn=? where id in """+ids2str(cids), intTime(), self.col.usn()) - def buryNote(self, nid: int) -> None: + def buryNote(self, nid): "Bury all cards for note until next session." cids = self.col.db.list( "select id from cards where nid = ? and queue >= 0", nid) @@ -1291,7 +1287,7 @@ update cards set queue=-2,mod=?,usn=? where id in """+ids2str(cids), # Sibling spacing ########################################################################## - def _burySiblings(self, card: Card) -> None: + def _burySiblings(self, card): toBury = [] nconf = self._newConf(card) buryNew = nconf.get("bury", True) @@ -1328,7 +1324,7 @@ and (queue=0 or (queue=2 and due<=?))""", # Resetting ########################################################################## - def forgetCards(self, ids: List[int]) -> None: + def forgetCards(self, ids): "Put cards at the end of the new queue." self.remFromDyn(ids) self.col.db.execute( @@ -1340,7 +1336,7 @@ and (queue=0 or (queue=2 and due<=?))""", self.sortCards(ids, start=pmax+1) self.col.log(ids) - def reschedCards(self, ids: List[int], imin: int, imax: int) -> None: + def reschedCards(self, ids, imin, imax): "Put cards in review queue with a new interval in days (min, max)." d = [] t = self.today @@ -1356,7 +1352,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""", d) self.col.log(ids) - def resetCards(self, ids) -> None: + def resetCards(self, ids): "Completely reset cards for export." sids = ids2str(ids) # we want to avoid resetting due number of existing new cards on export @@ -1375,7 +1371,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""", # Repositioning new cards ########################################################################## - def sortCards(self, cids: List[int], start: int = 1, step: int = 1, shuffle: bool = False, shift: bool = False) -> None: + def sortCards(self, cids, start=1, step=1, shuffle=False, shift=False): scids = ids2str(cids) now = intTime() nids = [] @@ -1415,15 +1411,15 @@ and due >= ? and queue = 0""" % scids, now, self.col.usn(), shiftby, low) self.col.db.executemany( "update cards set due=:due,mod=:now,usn=:usn where id = :cid", d) - def randomizeCards(self, did: int) -> None: + def randomizeCards(self, did): cids = self.col.db.list("select id from cards where did = ?", did) self.sortCards(cids, shuffle=True) - def orderCards(self, did: int) -> None: + def orderCards(self, did): cids = self.col.db.list("select id from cards where did = ? order by id", did) self.sortCards(cids) - def resortConf(self, conf) -> None: + def resortConf(self, conf): for did in self.col.decks.didsForConf(conf): if conf['new']['order'] == 0: self.randomizeCards(did) @@ -1431,7 +1427,7 @@ and due >= ? and queue = 0""" % scids, now, self.col.usn(), shiftby, low) self.orderCards(did) # for post-import - def maybeRandomizeDeck(self, did=None) -> None: + def maybeRandomizeDeck(self, did=None): if not did: did = self.col.decks.selected() conf = self.col.decks.confForDid(did)