remove typings from some other files

not used frequently enough to deal with the mypy errors they're causing
at the moment
This commit is contained in:
Damien Elmes 2019-12-20 16:33:49 +10:00
parent b6b8df2dcf
commit d8d7e78b6b
4 changed files with 167 additions and 173 deletions

View file

@ -101,7 +101,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""):
def fields(self):
return self._fields
def _mungeField(self, fld) -> str:
def _mungeField(self, fld):
# \n -> br
fld = re.sub("\r?\n", "<br>", fld)
# latex differences
@ -110,7 +110,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""):
fld = re.sub("<audio src=\"(.+?)\">(</audio>)?", "[sound:\\1]", fld)
return fld
def _addFronts(self, notes, model=None, fields=("f", "b")) -> None:
def _addFronts(self, notes, model=None, fields=("f", "b")):
data = []
for orig in notes:
# create a foreign note object
@ -135,7 +135,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""):
# import
self.importNotes(data)
def _addFrontBacks(self, notes) -> None:
def _addFrontBacks(self, notes):
m = addBasicModel(self.col)
m['name'] = "Mnemosyne-FrontBack"
mm = self.col.models
@ -145,7 +145,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""):
mm.addTemplate(m, t)
self._addFronts(notes, m)
def _addVocabulary(self, notes) -> None:
def _addVocabulary(self, notes):
mm = self.col.models
m = mm.new("Mnemosyne-Vocabulary")
for f in "Expression", "Pronunciation", "Meaning", "Notes":
@ -164,7 +164,7 @@ acq_reps+ret_reps, lapses, card_type_id from cards"""):
mm.add(m)
self._addFronts(notes, m, fields=("f", "p_1", "m_1", "n"))
def _addCloze(self, notes) -> None:
def _addCloze(self, notes):
data = []
notes = list(notes.values())
for orig in notes:

View file

@ -13,10 +13,7 @@ from anki.lang import ngettext
from xml.dom import minidom
from string import capwords
import re, unicodedata, time
from typing import Any, List, Optional
from anki.collection import _Collection
from xml.dom.minidom import Element, Text
class SmartDict(dict):
"""
See http://www.peterbe.com/plog/SmartDict
@ -28,7 +25,7 @@ class SmartDict(dict):
x.get('first_name').
"""
def __init__(self, *a, **kw) -> None:
def __init__(self, *a, **kw):
if a:
if isinstance(type(a[0]), dict):
kw.update(a[0])
@ -43,7 +40,7 @@ class SmartDict(dict):
class SuperMemoElement(SmartDict):
"SmartDict wrapper to store SM Element data"
def __init__(self, *a, **kw) -> None:
def __init__(self, *a, **kw):
SmartDict.__init__(self, *a, **kw)
#default content
self.__dict__['lTitle'] = None
@ -82,7 +79,7 @@ class SupermemoXmlImporter(NoteImporter):
Code should be upgrade to support importing of SM2006 exports.
"""
def __init__(self, col: _Collection, file: str) -> None:
def __init__(self, col, file):
"""Initialize internal varables.
Pameters to be exposed to GUI are stored in self.META"""
NoteImporter.__init__(self, col, file)
@ -122,17 +119,17 @@ class SupermemoXmlImporter(NoteImporter):
## TOOLS
def _fudgeText(self, text: str) -> Any:
def _fudgeText(self, text):
"Replace sm syntax to Anki syntax"
text = text.replace("\n\r", "<br>")
text = text.replace("\n", "<br>")
return text
def _unicode2ascii(self,str: str) -> str:
def _unicode2ascii(self,str):
"Remove diacritic punctuation from strings (titles)"
return "".join([ c for c in unicodedata.normalize('NFKD', str) if not unicodedata.combining(c)])
def _decode_htmlescapes(self,s: str) -> str:
def _decode_htmlescapes(self,s):
"""Unescape HTML code."""
#In case of bad formated html you can import MinimalSoup etc.. see btflsoup source code
from bs4 import BeautifulSoup as btflsoup
@ -145,7 +142,7 @@ class SupermemoXmlImporter(NoteImporter):
return str(btflsoup(s, "html.parser"))
def _afactor2efactor(self, af: float) -> Any:
def _afactor2efactor(self, af):
# Adapted from <http://www.supermemo.com/beta/xml/xml-core.htm>
# Ranges for A-factors and E-factors
@ -169,7 +166,7 @@ class SupermemoXmlImporter(NoteImporter):
## DEFAULT IMPORTER METHODS
def foreignNotes(self) -> List[ForeignNote]:
def foreignNotes(self):
# Load file and parse it by minidom
self.loadSource(self.file)
@ -185,12 +182,12 @@ class SupermemoXmlImporter(NoteImporter):
self.log.append(ngettext("%d card imported.", "%d cards imported.", self.total) % self.total)
return self.notes
def fields(self) -> int:
def fields(self):
return 2
## PARSER METHODS
def addItemToCards(self,item: SuperMemoElement) -> None:
def addItemToCards(self,item):
"This method actually do conversion"
# new anki card
@ -250,7 +247,7 @@ class SupermemoXmlImporter(NoteImporter):
self.notes.append(note)
def logger(self,text: str,level: int = 1) -> None:
def logger(self,text,level=1):
"Wrapper for Anki logger"
dLevels={0:'',1:'Info',2:'Verbose',3:'Debug'}
@ -262,7 +259,7 @@ class SupermemoXmlImporter(NoteImporter):
# OPEN AND LOAD
def openAnything(self,source) -> Any:
def openAnything(self,source):
"Open any source / actually only openig of files is used"
if source == "-":
@ -285,7 +282,7 @@ class SupermemoXmlImporter(NoteImporter):
import io
return io.StringIO(str(source))
def loadSource(self, source: str) -> None:
def loadSource(self, source):
"""Load source file and parse with xml.dom.minidom"""
self.source = source
self.logger('Load started...')
@ -296,7 +293,7 @@ class SupermemoXmlImporter(NoteImporter):
# PARSE
def parse(self, node: Optional[Any] = None) -> None:
def parse(self, node=None):
"Parse method - parses document elements"
if node is None and self.xmldoc is not None:
@ -309,12 +306,12 @@ class SupermemoXmlImporter(NoteImporter):
else:
self.logger('No handler for method %s' % _method, level=3)
def parse_Document(self, node) -> None:
def parse_Document(self, node):
"Parse XML document"
self.parse(node.documentElement)
def parse_Element(self, node: Element) -> None:
def parse_Element(self, node):
"Parse XML element"
_method = "do_%s" % node.tagName
@ -325,7 +322,7 @@ class SupermemoXmlImporter(NoteImporter):
self.logger('No handler for method %s' % _method, level=3)
#print traceback.print_exc()
def parse_Text(self, node: Text) -> None:
def parse_Text(self, node):
"Parse text inside elements. Text is stored into local buffer."
text = node.data
@ -339,12 +336,12 @@ class SupermemoXmlImporter(NoteImporter):
# DO
def do_SuperMemoCollection(self, node: Element) -> None:
def do_SuperMemoCollection(self, node):
"Process SM Collection"
for child in node.childNodes: self.parse(child)
def do_SuperMemoElement(self, node: Element) -> None:
def do_SuperMemoElement(self, node):
"Process SM Element (Type - Title,Topics)"
self.logger('='*45, level=3)
@ -394,14 +391,14 @@ class SupermemoXmlImporter(NoteImporter):
t = self.cntMeta['title'].pop()
self.logger('End of topic \t- %s' % (t), level=2)
def do_Content(self, node: Element) -> None:
def do_Content(self, node):
"Process SM element Content"
for child in node.childNodes:
if hasattr(child,'tagName') and child.firstChild is not None:
self.cntElm[-1][child.tagName]=child.firstChild.data
def do_LearningData(self, node: Element) -> None:
def do_LearningData(self, node):
"Process SM element LearningData"
for child in node.childNodes:
@ -418,7 +415,7 @@ class SupermemoXmlImporter(NoteImporter):
# for child in node.childNodes: self.parse(child)
# self.cntElm[-1][node.tagName]=self.cntBuf.pop()
def do_Title(self, node: Element) -> None:
def do_Title(self, node):
"Process SM element Title"
t = self._decode_htmlescapes(node.firstChild.data)
@ -428,7 +425,7 @@ class SupermemoXmlImporter(NoteImporter):
self.logger('Start of topic \t- ' + " / ".join(self.cntMeta['title']), level=2)
def do_Type(self, node: Element) -> None:
def do_Type(self, node):
"Process SM element Type"
if len(self.cntBuf) >=1 :

View file

@ -39,6 +39,8 @@ import inspect
from distutils.spawn import find_executable # pylint: disable=import-error,no-name-in-module
from queue import Queue, Empty, Full
from typing import Dict, Optional
class MPVError(Exception):
pass
@ -56,7 +58,6 @@ class MPVTimeoutError(MPVError):
pass
from anki.utils import isWin
from typing import Any
if isWin:
# pylint: disable=import-error
import win32file, win32pipe, pywintypes, winerror # pytype: disable=import-error
@ -66,7 +67,7 @@ class MPVBase:
"""
executable = find_executable("mpv")
popenEnv = None
popenEnv: Optional[Dict[str,str]] = None
default_argv = [
"--idle",
@ -77,7 +78,7 @@ class MPVBase:
"--keep-open=no",
]
def __init__(self, window_id=None, debug=False) -> None:
def __init__(self, window_id=None, debug=False):
self.window_id = window_id
self.debug = debug
@ -88,18 +89,18 @@ class MPVBase:
self._prepare_thread()
self._start_thread()
def __del__(self) -> None:
def __del__(self):
self._stop_thread()
self._stop_process()
self._stop_socket()
def _thread_id(self) -> int:
def _thread_id(self):
return threading.get_ident()
#
# Process
#
def _prepare_process(self) -> None:
def _prepare_process(self):
"""Prepare the argument list for the mpv process.
"""
self.argv = [self.executable]
@ -108,12 +109,12 @@ class MPVBase:
if self.window_id is not None:
self.argv += ["--wid", str(self.window_id)]
def _start_process(self) -> None:
def _start_process(self):
"""Start the mpv process.
"""
self._proc = subprocess.Popen(self.argv, env=self.popenEnv)
def _stop_process(self) -> None:
def _stop_process(self):
"""Stop the mpv process.
"""
if hasattr(self, "_proc"):
@ -126,7 +127,7 @@ class MPVBase:
#
# Socket communication
#
def _prepare_socket(self) -> None:
def _prepare_socket(self):
"""Create a random socket filename which we pass to mpv with the
--input-unix-socket option.
"""
@ -137,7 +138,7 @@ class MPVBase:
os.close(fd)
os.remove(self._sock_filename)
def _start_socket(self) -> None:
def _start_socket(self):
"""Wait for the mpv process to create the unix socket and finish
startup.
"""
@ -174,7 +175,7 @@ class MPVBase:
else:
raise MPVProcessError("unable to start process")
def _stop_socket(self) -> None:
def _stop_socket(self):
"""Clean up the socket.
"""
if hasattr(self, "_sock"):
@ -185,7 +186,7 @@ class MPVBase:
except OSError:
pass
def _prepare_thread(self) -> None:
def _prepare_thread(self):
"""Set up the queues for the communication threads.
"""
self._request_queue = Queue(1)
@ -193,14 +194,14 @@ class MPVBase:
self._event_queue = Queue()
self._stop_event = threading.Event()
def _start_thread(self) -> None:
def _start_thread(self):
"""Start up the communication threads.
"""
self._thread = threading.Thread(target=self._reader)
self._thread.daemon = True
self._thread.start()
def _stop_thread(self) -> None:
def _stop_thread(self):
"""Stop the communication threads.
"""
if hasattr(self, "_stop_event"):
@ -208,7 +209,7 @@ class MPVBase:
if hasattr(self, "_thread"):
self._thread.join()
def _reader(self) -> None:
def _reader(self):
"""Read the incoming json messages from the unix socket that is
connected to the mpv process. Pass them on to the message handler.
"""
@ -250,21 +251,21 @@ class MPVBase:
#
# Message handling
#
def _compose_message(self, message) -> bytes:
def _compose_message(self, message):
"""Return a json representation from a message dictionary.
"""
# XXX may be strict is too strict ;-)
data = json.dumps(message)
return data.encode("utf8", "strict") + b"\n"
def _parse_message(self, data) -> Any:
def _parse_message(self, data):
"""Return a message dictionary from a json representation.
"""
# XXX may be strict is too strict ;-)
data = data.decode("utf8", "strict")
return json.loads(data)
def _handle_message(self, message) -> None:
def _handle_message(self, message):
"""Handle different types of incoming messages, i.e. responses to
commands or asynchronous events.
"""
@ -284,7 +285,7 @@ class MPVBase:
else:
raise MPVCommunicationError("invalid message %r" % message)
def _send_message(self, message, timeout=None) -> None:
def _send_message(self, message, timeout=None):
"""Send a message/command to the mpv process, message must be a
dictionary of the form {"command": ["arg1", "arg2", ...]}. Responses
from the mpv process must be collected using _get_response().
@ -321,7 +322,7 @@ class MPVBase:
raise MPVCommunicationError("broken sender socket")
data = data[size:]
def _get_response(self, timeout=None) -> Any:
def _get_response(self, timeout=None):
"""Collect the response message to a previous request. If there was an
error a MPVCommandError exception is raised, otherwise the command
specific data is returned.
@ -336,7 +337,7 @@ class MPVBase:
else:
return message.get("data")
def _get_event(self, timeout=None) -> Any:
def _get_event(self, timeout=None):
"""Collect a single event message that has been received out-of-band
from the mpv process. If a timeout is specified and there have not
been any events during that period, None is returned.
@ -346,7 +347,7 @@ class MPVBase:
except Empty:
return None
def _send_request(self, message, timeout=None, _retry=1) -> Any:
def _send_request(self, message, timeout=None, _retry=1):
"""Send a command to the mpv process and collect the result.
"""
self.ensure_running()
@ -366,12 +367,12 @@ class MPVBase:
#
# Public API
#
def is_running(self) -> bool:
def is_running(self):
"""Return True if the mpv process is still active.
"""
return self._proc.poll() is None
def ensure_running(self) -> None:
def ensure_running(self):
if not self.is_running():
self._stop_thread()
self._stop_process()
@ -383,7 +384,7 @@ class MPVBase:
self._prepare_thread()
self._start_thread()
def close(self) -> None:
def close(self):
"""Shutdown the mpv process and our communication setup.
"""
if self.is_running():
@ -414,7 +415,7 @@ class MPV(MPVBase):
threads to the same MPV instance are synchronized.
"""
def __init__(self, *args, **kwargs) -> None:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._callbacks = {}
@ -464,7 +465,7 @@ class MPV(MPVBase):
#
# Event/callback API
#
def _event_reader(self) -> None:
def _event_reader(self):
"""Collect incoming event messages and call the event handler.
"""
while not self._stop_event.is_set():
@ -474,7 +475,7 @@ class MPV(MPVBase):
self._handle_event(message)
def _handle_event(self, message) -> None:
def _handle_event(self, message):
"""Lookup and call the callbacks for a particular event message.
"""
if message["event"] == "property-change":
@ -488,7 +489,7 @@ class MPV(MPVBase):
else:
callback()
def register_callback(self, name, callback) -> None:
def register_callback(self, name, callback):
"""Register a function `callback` for the event `name`.
"""
try:
@ -498,7 +499,7 @@ class MPV(MPVBase):
self._callbacks.setdefault(name, []).append(callback)
def unregister_callback(self, name, callback) -> None:
def unregister_callback(self, name, callback):
"""Unregister a previously registered function `callback` for the event
`name`.
"""
@ -512,7 +513,7 @@ class MPV(MPVBase):
except ValueError:
raise MPVError("callback %r not registered for event %r" % (callback, name))
def register_property_callback(self, name, callback) -> int:
def register_property_callback(self, name, callback):
"""Register a function `callback` for the property-change event on
property `name`.
"""
@ -534,7 +535,7 @@ class MPV(MPVBase):
self._property_serials[(name, callback)] = serial
return serial
def unregister_property_callback(self, name, callback) -> None:
def unregister_property_callback(self, name, callback):
"""Unregister a previously registered function `callback` for the
property-change event on property `name`.
"""
@ -554,17 +555,17 @@ class MPV(MPVBase):
#
# Public API
#
def command(self, *args, timeout=1) -> Any:
def command(self, *args, timeout=1):
"""Execute a single command on the mpv process and return the result.
"""
return self._send_request({"command": list(args)}, timeout=timeout)
def get_property(self, name) -> Any:
def get_property(self, name):
"""Return the value of property `name`.
"""
return self.command("get_property", name)
def set_property(self, name, value) -> Any:
def set_property(self, name, value):
"""Set the value of property `name`.
"""
return self.command("set_property", name, value)

View file

@ -14,10 +14,6 @@ from anki.lang import _
from anki.consts import *
from anki.hooks import runHook
from anki.cards import Card
#from anki.collection import _Collection
from typing import Any, Callable, Dict, List, Optional, Union, Tuple
# queue types: 0=new/cram, 1=lrn, 2=rev, 3=day lrn, -1=suspended, -2=buried
# revlog types: 0=lrn, 1=rev, 2=relrn, 3=cram
# positive revlog intervals are in days (rev), negative in seconds (lrn)
@ -28,7 +24,7 @@ class Scheduler:
_spreadRev = True
_burySiblingsOnAnswer = True
def __init__(self, col) -> None:
def __init__(self, col):
self.col = col
self.queueLimit = 50
self.reportLimit = 1000
@ -37,7 +33,7 @@ class Scheduler:
self._haveQueues = False
self._updateCutoff()
def getCard(self) -> Any:
def getCard(self):
"Pop the next card from the queue. None if finished."
self._checkDay()
if not self._haveQueues:
@ -51,14 +47,14 @@ class Scheduler:
card.startTimer()
return card
def reset(self) -> None:
def reset(self):
self._updateCutoff()
self._resetLrn()
self._resetRev()
self._resetNew()
self._haveQueues = True
def answerCard(self, card: Card, ease: int) -> None:
def answerCard(self, card, ease):
self.col.log()
assert 1 <= ease <= 4
self.col.markReview(card)
@ -97,7 +93,7 @@ class Scheduler:
card.usn = self.col.usn()
card.flushSched()
def counts(self, card: None = None) -> tuple:
def counts(self, card=None):
counts = [self.newCount, self.lrnCount, self.revCount]
if card:
idx = self.countIdx(card)
@ -107,7 +103,7 @@ class Scheduler:
counts[idx] += 1
return tuple(counts)
def dueForecast(self, days=7) -> List:
def dueForecast(self, days=7):
"Return counts over next DAYS. Includes today."
daysd = dict(self.col.db.all("""
select due, count() from cards
@ -125,12 +121,12 @@ order by due""" % self._deckLimit(),
ret = [x[1] for x in sorted(daysd.items())]
return ret
def countIdx(self, card: Card) -> Any:
def countIdx(self, card):
if card.queue == 3:
return 1
return card.queue
def answerButtons(self, card: Card) -> int:
def answerButtons(self, card):
if card.odue:
# normal review in dyn deck?
if card.odid and card.queue == 2:
@ -144,7 +140,7 @@ order by due""" % self._deckLimit(),
else:
return 3
def unburyCards(self) -> None:
def unburyCards(self):
"Unbury cards."
self.col.conf['lastUnburied'] = self.today
self.col.log(
@ -152,7 +148,7 @@ order by due""" % self._deckLimit(),
self.col.db.execute(
"update cards set queue=type where queue = -2")
def unburyCardsForDeck(self) -> None:
def unburyCardsForDeck(self):
sids = ids2str(self.col.decks.active())
self.col.log(
self.col.db.list("select id from cards where queue = -2 and did in %s"
@ -164,7 +160,7 @@ order by due""" % self._deckLimit(),
# Rev/lrn/time daily stats
##########################################################################
def _updateStats(self, card: Card, type: str, cnt: int = 1) -> None:
def _updateStats(self, card, type, cnt=1):
key = type+"Today"
for g in ([self.col.decks.get(card.did)] +
self.col.decks.parents(card.did)):
@ -172,7 +168,7 @@ order by due""" % self._deckLimit(),
g[key][1] += cnt
self.col.decks.save(g)
def extendLimits(self, new, rev) -> None:
def extendLimits(self, new, rev):
cur = self.col.decks.current()
parents = self.col.decks.parents(cur['id'])
children = [self.col.decks.get(did) for (name, did) in
@ -183,7 +179,7 @@ order by due""" % self._deckLimit(),
g['revToday'][1] -= rev
self.col.decks.save(g)
def _walkingCount(self, limFn: Optional[Callable] = None, cntFn: Optional[Callable] = None) -> Any:
def _walkingCount(self, limFn=None, cntFn=None):
tot = 0
pcounts = {}
# for each of the active decks
@ -217,7 +213,7 @@ order by due""" % self._deckLimit(),
# Deck list
##########################################################################
def deckDueList(self) -> List[list]:
def deckDueList(self):
"Returns [deckname, did, rev, lrn, new]"
self._checkDay()
self.col.decks.checkIntegrity()
@ -251,10 +247,10 @@ order by due""" % self._deckLimit(),
lims[deck['name']] = [nlim, rlim]
return data
def deckDueTree(self) -> Any:
def deckDueTree(self):
return self._groupChildren(self.deckDueList())
def _groupChildren(self, grps: List[List]) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]:
def _groupChildren(self, grps):
# first, split the group names into components
for g in grps:
g[0] = g[0].split("::")
@ -263,7 +259,7 @@ order by due""" % self._deckLimit(),
# then run main function
return self._groupChildrenMain(grps)
def _groupChildrenMain(self, grps: List[List[Union[List[str], int]]]) -> Tuple[Tuple[Any, Any, Any, Any, Any, Any], ...]:
def _groupChildrenMain(self, grps):
tree = []
# group and recurse
def key(grp):
@ -304,7 +300,7 @@ order by due""" % self._deckLimit(),
# Getting the next card
##########################################################################
def _getCard(self) -> Any:
def _getCard(self):
"Return the next due card id, or None."
# learning card due?
c = self._getLrnCard()
@ -333,19 +329,19 @@ order by due""" % self._deckLimit(),
# New cards
##########################################################################
def _resetNewCount(self) -> None:
def _resetNewCount(self):
cntFn = lambda did, lim: self.col.db.scalar("""
select count() from (select 1 from cards where
did = ? and queue = 0 limit ?)""", did, lim)
self.newCount = self._walkingCount(self._deckNewLimitSingle, cntFn)
def _resetNew(self) -> None:
def _resetNew(self):
self._resetNewCount()
self._newDids = self.col.decks.active()[:]
self._newQueue = []
self._updateNewCardRatio()
def _fillNew(self) -> Any:
def _fillNew(self):
if self._newQueue:
return True
if not self.newCount:
@ -369,12 +365,12 @@ did = ? and queue = 0 limit ?)""", did, lim)
self._resetNew()
return self._fillNew()
def _getNewCard(self) -> Any:
def _getNewCard(self):
if self._fillNew():
self.newCount -= 1
return self.col.getCard(self._newQueue.pop())
def _updateNewCardRatio(self) -> None:
def _updateNewCardRatio(self):
if self.col.conf['newSpread'] == NEW_CARDS_DISTRIBUTE:
if self.newCount:
self.newCardModulus = (
@ -385,7 +381,7 @@ did = ? and queue = 0 limit ?)""", did, lim)
return
self.newCardModulus = 0
def _timeForNewCard(self) -> Optional[int]:
def _timeForNewCard(self):
"True if it's time to display a new card when distributing."
if not self.newCount:
return False
@ -396,7 +392,7 @@ did = ? and queue = 0 limit ?)""", did, lim)
elif self.newCardModulus:
return self.reps and self.reps % self.newCardModulus == 0
def _deckNewLimit(self, did: int, fn: Optional[Callable] = None) -> Any:
def _deckNewLimit(self, did, fn=None):
if not fn:
fn = self._deckNewLimitSingle
sel = self.col.decks.get(did)
@ -410,7 +406,7 @@ did = ? and queue = 0 limit ?)""", did, lim)
lim = min(rem, lim)
return lim
def _newForDeck(self, did: int, lim: int) -> Any:
def _newForDeck(self, did, lim):
"New count for a single deck."
if not lim:
return 0
@ -419,14 +415,14 @@ did = ? and queue = 0 limit ?)""", did, lim)
select count() from
(select 1 from cards where did = ? and queue = 0 limit ?)""", did, lim)
def _deckNewLimitSingle(self, g: Dict[str, Any]) -> Any:
def _deckNewLimitSingle(self, g):
"Limit for deck without parent limits."
if g['dyn']:
return self.reportLimit
c = self.col.decks.confForDid(g['id'])
return max(0, c['new']['perDay'] - g['newToday'][1])
def totalNewForCurrentDeck(self) -> Any:
def totalNewForCurrentDeck(self):
return self.col.db.scalar(
"""
select count() from cards where id in (
@ -436,7 +432,7 @@ select id from cards where did in %s and queue = 0 limit ?)"""
# Learning queues
##########################################################################
def _resetLrnCount(self) -> None:
def _resetLrnCount(self):
# sub-day
self.lrnCount = self.col.db.scalar("""
select sum(left/1000) from (select left from cards where
@ -449,14 +445,14 @@ select count() from cards where did in %s and queue = 3
and due <= ? limit %d""" % (self._deckLimit(), self.reportLimit),
self.today)
def _resetLrn(self) -> None:
def _resetLrn(self):
self._resetLrnCount()
self._lrnQueue = []
self._lrnDayQueue = []
self._lrnDids = self.col.decks.active()[:]
# sub-day learning
def _fillLrn(self) -> Any:
def _fillLrn(self):
if not self.lrnCount:
return False
if self._lrnQueue:
@ -469,7 +465,7 @@ limit %d""" % (self._deckLimit(), self.reportLimit), lim=self.dayCutoff)
self._lrnQueue.sort()
return self._lrnQueue
def _getLrnCard(self, collapse: bool = False) -> Any:
def _getLrnCard(self, collapse=False):
if self._fillLrn():
cutoff = time.time()
if collapse:
@ -481,7 +477,7 @@ limit %d""" % (self._deckLimit(), self.reportLimit), lim=self.dayCutoff)
return card
# daily learning
def _fillLrnDay(self) -> Optional[bool]:
def _fillLrnDay(self):
if not self.lrnCount:
return False
if self._lrnDayQueue:
@ -505,15 +501,15 @@ did = ? and queue = 3 and due <= ? limit ?""",
# nothing left in the deck; move to next
self._lrnDids.pop(0)
def _getLrnDayCard(self) -> Any:
def _getLrnDayCard(self):
if self._fillLrnDay():
self.lrnCount -= 1
return self.col.getCard(self._lrnDayQueue.pop())
def _answerLrnCard(self, card: Card, ease: int) -> None:
def _answerLrnCard(self, card, ease):
# ease 1=no, 2=yes, 3=remove
conf = self._lrnConf(card)
if card.odid and not card.wasNew: # type: ignore
if card.odid and not card.wasNew:
type = 3
elif card.type == 2:
type = 2
@ -572,7 +568,7 @@ did = ? and queue = 3 and due <= ? limit ?""",
card.queue = 3
self._logLrn(card, ease, conf, leaving, type, lastLeft)
def _delayForGrade(self, conf: Dict[str, Any], left: int) -> Any:
def _delayForGrade(self, conf, left):
left = left % 1000
try:
delay = conf['delays'][-left]
@ -584,13 +580,13 @@ did = ? and queue = 3 and due <= ? limit ?""",
delay = 1
return delay*60
def _lrnConf(self, card: Card) -> Any:
def _lrnConf(self, card):
if card.type == 2:
return self._lapseConf(card)
else:
return self._newConf(card)
def _rescheduleAsRev(self, card: Card, conf: Dict[str, Any], early: bool) -> None:
def _rescheduleAsRev(self, card, conf, early):
lapse = card.type == 2
if lapse:
if self._resched(card):
@ -613,7 +609,7 @@ did = ? and queue = 3 and due <= ? limit ?""",
card.queue = card.type = 0
card.due = self.col.nextID("pos")
def _startingLeft(self, card: Card) -> int:
def _startingLeft(self, card):
if card.type == 2:
conf = self._lapseConf(card)
else:
@ -622,7 +618,7 @@ did = ? and queue = 3 and due <= ? limit ?""",
tod = self._leftToday(conf['delays'], tot)
return tot + tod*1000
def _leftToday(self, delays: Any, left: int, now: None = None) -> int:
def _leftToday(self, delays, left, now=None):
"The number of steps that can be completed by the day cutoff."
if not now:
now = intTime()
@ -635,7 +631,7 @@ did = ? and queue = 3 and due <= ? limit ?""",
ok = i
return ok+1
def _graduatingIvl(self, card: Card, conf: Dict[str, Any], early: bool, adj: bool = True) -> Any:
def _graduatingIvl(self, card, conf, early, adj=True):
if card.type == 2:
# lapsed card being relearnt
if card.odid:
@ -653,13 +649,13 @@ did = ? and queue = 3 and due <= ? limit ?""",
else:
return ideal
def _rescheduleNew(self, card: Card, conf: Dict[str, Any], early: bool) -> None:
def _rescheduleNew(self, card, conf, early):
"Reschedule a new card that's graduated for the first time."
card.ivl = self._graduatingIvl(card, conf, early)
card.due = self.today+card.ivl
card.factor = conf['initialFactor']
def _logLrn(self, card: Card, ease: int, conf: Dict[str, Any], leaving: bool, type: int, lastLeft: int) -> None:
def _logLrn(self, card, ease, conf, leaving, type, lastLeft):
lastIvl = -(self._delayForGrade(conf, lastLeft))
ivl = card.ivl if leaving else -(self._delayForGrade(conf, card.left))
def log():
@ -674,7 +670,7 @@ did = ? and queue = 3 and due <= ? limit ?""",
time.sleep(0.01)
log()
def removeLrn(self, ids: Optional[List[int]] = None) -> None:
def removeLrn(self, ids=None):
"Remove cards from the learning queues."
if ids:
extra = " and id in "+ids2str(ids)
@ -693,7 +689,7 @@ where queue in (1,3) and type = 2
self.forgetCards(self.col.db.list(
"select id from cards where queue in (1,3) %s" % extra))
def _lrnForDeck(self, did: int) -> Any:
def _lrnForDeck(self, did):
cnt = self.col.db.scalar(
"""
select sum(left/1000) from
@ -709,16 +705,16 @@ and due <= ? limit ?)""",
# Reviews
##########################################################################
def _deckRevLimit(self, did: int) -> Any:
def _deckRevLimit(self, did):
return self._deckNewLimit(did, self._deckRevLimitSingle)
def _deckRevLimitSingle(self, d: Dict[str, Any]) -> Any:
def _deckRevLimitSingle(self, d):
if d['dyn']:
return self.reportLimit
c = self.col.decks.confForDid(d['id'])
return max(0, c['rev']['perDay'] - d['revToday'][1])
def _revForDeck(self, did: int, lim: int) -> Any:
def _revForDeck(self, did, lim):
lim = min(lim, self.reportLimit)
return self.col.db.scalar(
"""
@ -727,7 +723,7 @@ select count() from
and due <= ? limit ?)""",
did, self.today, lim)
def _resetRevCount(self) -> None:
def _resetRevCount(self):
def cntFn(did, lim):
return self.col.db.scalar("""
select count() from (select id from cards where
@ -736,12 +732,12 @@ did = ? and queue = 2 and due <= ? limit %d)""" % lim,
self.revCount = self._walkingCount(
self._deckRevLimitSingle, cntFn)
def _resetRev(self) -> None:
def _resetRev(self):
self._resetRevCount()
self._revQueue = []
self._revDids = self.col.decks.active()[:]
def _fillRev(self) -> Any:
def _fillRev(self):
if self._revQueue:
return True
if not self.revCount:
@ -778,12 +774,12 @@ did = ? and queue = 2 and due <= ? limit ?""",
self._resetRev()
return self._fillRev()
def _getRevCard(self) -> Any:
def _getRevCard(self):
if self._fillRev():
self.revCount -= 1
return self.col.getCard(self._revQueue.pop())
def totalRevForCurrentDeck(self) -> Any:
def totalRevForCurrentDeck(self):
return self.col.db.scalar(
"""
select count() from cards where id in (
@ -793,7 +789,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
# Answering a review card
##########################################################################
def _answerRevCard(self, card: Card, ease: int) -> None:
def _answerRevCard(self, card, ease):
delay = 0
if ease == 1:
delay = self._rescheduleLapse(card)
@ -801,7 +797,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
self._rescheduleRev(card, ease)
self._logRev(card, ease, delay)
def _rescheduleLapse(self, card: Card) -> Any:
def _rescheduleLapse(self, card):
conf = self._lapseConf(card)
card.lastIvl = card.ivl
if self._resched(card):
@ -837,10 +833,10 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
card.queue = 3
return delay
def _nextLapseIvl(self, card: Card, conf: Dict[str, Any]) -> Any:
def _nextLapseIvl(self, card, conf):
return max(conf['minInt'], int(card.ivl*conf['mult']))
def _rescheduleRev(self, card: Card, ease: int) -> None:
def _rescheduleRev(self, card, ease):
# update interval
card.lastIvl = card.ivl
if self._resched(card):
@ -855,7 +851,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
card.odid = 0
card.odue = 0
def _logRev(self, card: Card, ease: int, delay: Union[int, float]) -> None:
def _logRev(self, card, ease, delay):
def log():
self.col.db.execute(
"insert into revlog values (?,?,?,?,?,?,?,?,?)",
@ -872,7 +868,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
# Interval management
##########################################################################
def _nextRevIvl(self, card: Card, ease: int) -> Any:
def _nextRevIvl(self, card, ease):
"Ideal next interval for CARD, given EASE."
delay = self._daysLate(card)
conf = self._revConf(card)
@ -890,11 +886,11 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
# interval capped?
return min(interval, conf['maxIvl'])
def _fuzzedIvl(self, ivl: int) -> int:
def _fuzzedIvl(self, ivl):
min, max = self._fuzzIvlRange(ivl)
return random.randint(min, max)
def _fuzzIvlRange(self, ivl: int) -> List:
def _fuzzIvlRange(self, ivl):
if ivl < 2:
return [1, 1]
elif ivl == 2:
@ -909,22 +905,22 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
fuzz = max(fuzz, 1)
return [ivl-fuzz, ivl+fuzz]
def _constrainedIvl(self, ivl: float, conf: Dict[str, Any], prev: int) -> int:
def _constrainedIvl(self, ivl, conf, prev):
"Integer interval after interval factor and prev+1 constraints applied."
new = ivl * conf.get('ivlFct', 1)
return int(max(new, prev+1))
def _daysLate(self, card: Card) -> Any:
def _daysLate(self, card):
"Number of days later than scheduled."
due = card.odue if card.odid else card.due
return max(0, self.today - due)
def _updateRevIvl(self, card: Card, ease: int) -> None:
def _updateRevIvl(self, card, ease):
idealIvl = self._nextRevIvl(card, ease)
card.ivl = min(max(self._adjRevIvl(card, idealIvl), card.ivl+1),
self._revConf(card)['maxIvl'])
def _adjRevIvl(self, card: Card, idealIvl: int) -> int:
def _adjRevIvl(self, card, idealIvl):
if self._spreadRev:
idealIvl = self._fuzzedIvl(idealIvl)
return idealIvl
@ -932,7 +928,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
# Dynamic deck handling
##########################################################################
def rebuildDyn(self, did: Optional[int] = None) -> Any:
def rebuildDyn(self, did=None):
"Rebuild a dynamic deck."
did = did or self.col.decks.selected()
deck = self.col.decks.get(did)
@ -946,7 +942,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
self.col.decks.select(did)
return ids
def _fillDyn(self, deck: Dict[str, Any]) -> Any:
def _fillDyn(self, deck):
search, limit, order = deck['terms'][0]
orderlimit = self._dynOrder(order, limit)
if search.strip():
@ -962,7 +958,7 @@ select id from cards where did in %s and queue = 2 and due <= ? limit ?)"""
self._moveToDyn(deck['id'], ids)
return ids
def emptyDyn(self, did: Optional[int], lim: Optional[str] = None) -> None:
def emptyDyn(self, did, lim=None):
if not lim:
lim = "did = %s" % did
self.col.log(self.col.db.list("select id from cards where %s" % lim))
@ -973,10 +969,10 @@ else type end), type = (case when type = 1 then 0 else type end),
due = odue, odue = 0, odid = 0, usn = ? where %s""" % lim,
self.col.usn())
def remFromDyn(self, cids: List[int]) -> None:
def remFromDyn(self, cids):
self.emptyDyn(None, "id in %s and odid" % ids2str(cids))
def _dynOrder(self, o: int, l: int) -> str:
def _dynOrder(self, o, l):
if o == DYN_OLDEST:
t = "(select max(id) from revlog where cid=c.id)"
elif o == DYN_RANDOM:
@ -1001,7 +997,7 @@ due = odue, odue = 0, odid = 0, usn = ? where %s""" % lim,
t = "c.due"
return t + " limit %d" % l
def _moveToDyn(self, did: int, ids: List[int]) -> None:
def _moveToDyn(self, did, ids):
deck = self.col.decks.get(did)
data = []
t = intTime(); u = self.col.usn()
@ -1020,7 +1016,7 @@ odid = (case when odid then odid else did end),
odue = (case when odue then odue else due end),
did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
def _dynIvlBoost(self, card: Card) -> Any:
def _dynIvlBoost(self, card):
assert card.odid and card.type == 2
assert card.factor
elapsed = card.ivl - (card.odue - self.today)
@ -1032,7 +1028,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
# Leeches
##########################################################################
def _checkLeech(self, card: Card, conf: Dict[str, Any]) -> Optional[bool]:
def _checkLeech(self, card, conf):
"Leech handler. True if card was a leech."
lf = conf['leechFails']
if not lf:
@ -1061,10 +1057,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
# Tools
##########################################################################
def _cardConf(self, card: Card) -> Any:
def _cardConf(self, card):
return self.col.decks.confForDid(card.did)
def _newConf(self, card: Card) -> Any:
def _newConf(self, card):
conf = self._cardConf(card)
# normal deck
if not card.odid:
@ -1084,7 +1080,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
perDay=self.reportLimit
)
def _lapseConf(self, card: Card) -> Any:
def _lapseConf(self, card):
conf = self._cardConf(card)
# normal deck
if not card.odid:
@ -1103,7 +1099,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
resched=conf['resched'],
)
def _revConf(self, card: Card) -> Any:
def _revConf(self, card):
conf = self._cardConf(card)
# normal deck
if not card.odid:
@ -1111,10 +1107,10 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
# dynamic deck
return self.col.decks.confForDid(card.odid)['rev']
def _deckLimit(self) -> str:
def _deckLimit(self):
return ids2str(self.col.decks.active())
def _resched(self, card: Card) -> Any:
def _resched(self, card):
conf = self._cardConf(card)
if not conf['dyn']:
return True
@ -1123,7 +1119,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
# Daily cutoff
##########################################################################
def _updateCutoff(self) -> None:
def _updateCutoff(self):
oldToday = self.today
# days since col created
self.today = int((time.time() - self.col.crt) // 86400)
@ -1145,7 +1141,7 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
if unburied < self.today:
self.unburyCards()
def _checkDay(self) -> None:
def _checkDay(self):
# check if the day has rolled over
if time.time() > self.dayCutoff:
self.reset()
@ -1153,12 +1149,12 @@ did = ?, queue = %s, due = ?, usn = ? where id = ?""" % queue, data)
# Deck finished state
##########################################################################
def finishedMsg(self) -> str:
def finishedMsg(self):
return ("<b>"+_(
"Congratulations! You have finished this deck for now.")+
"</b><br><br>" + self._nextDueMsg())
def _nextDueMsg(self) -> str:
def _nextDueMsg(self):
line = []
# the new line replacements are so we don't break translations
# in a point release
@ -1185,20 +1181,20 @@ Some related or buried cards were delayed until a later session.""")+now)
To study outside of the normal schedule, click the Custom Study button below."""))
return "<p>".join(line)
def revDue(self) -> Any:
def revDue(self):
"True if there are any rev cards due."
return self.col.db.scalar(
("select 1 from cards where did in %s and queue = 2 "
"and due <= ? limit 1") % self._deckLimit(),
self.today)
def newDue(self) -> Any:
def newDue(self):
"True if there are any new cards due."
return self.col.db.scalar(
("select 1 from cards where did in %s and queue = 0 "
"limit 1") % self._deckLimit())
def haveBuried(self) -> bool:
def haveBuried(self):
sdids = ids2str(self.col.decks.active())
cnt = self.col.db.scalar(
"select 1 from cards where queue = -2 and did in %s limit 1" % sdids)
@ -1207,7 +1203,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
# Next time reports
##########################################################################
def nextIvlStr(self, card: Card, ease: int, short: bool = False) -> Any:
def nextIvlStr(self, card, ease, short=False):
"Return the next interval for CARD as a string."
ivl = self.nextIvl(card, ease)
if not ivl:
@ -1217,7 +1213,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
s = "<"+s
return s
def nextIvl(self, card: Card, ease: int) -> Any:
def nextIvl(self, card, ease):
"Return the next interval for CARD, in seconds."
if card.queue in (0,1,3):
return self._nextLrnIvl(card, ease)
@ -1232,7 +1228,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
return self._nextRevIvl(card, ease)*86400
# this isn't easily extracted from the learn code
def _nextLrnIvl(self, card: Card, ease: int) -> Any:
def _nextLrnIvl(self, card, ease):
if card.queue == 0:
card.left = self._startingLeft(card)
conf = self._lrnConf(card)
@ -1257,7 +1253,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
# Suspending
##########################################################################
def suspendCards(self, ids: List[int]) -> None:
def suspendCards(self, ids):
"Suspend cards."
self.col.log(ids)
self.remFromDyn(ids)
@ -1266,7 +1262,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
"update cards set queue=-1,mod=?,usn=? where id in "+
ids2str(ids), intTime(), self.col.usn())
def unsuspendCards(self, ids: List[int]) -> None:
def unsuspendCards(self, ids):
"Unsuspend cards."
self.col.log(ids)
self.col.db.execute(
@ -1274,7 +1270,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
"where queue = -1 and id in "+ ids2str(ids),
intTime(), self.col.usn())
def buryCards(self, cids: List[int]) -> None:
def buryCards(self, cids):
self.col.log(cids)
self.remFromDyn(cids)
self.removeLrn(cids)
@ -1282,7 +1278,7 @@ To study outside of the normal schedule, click the Custom Study button below."""
update cards set queue=-2,mod=?,usn=? where id in """+ids2str(cids),
intTime(), self.col.usn())
def buryNote(self, nid: int) -> None:
def buryNote(self, nid):
"Bury all cards for note until next session."
cids = self.col.db.list(
"select id from cards where nid = ? and queue >= 0", nid)
@ -1291,7 +1287,7 @@ update cards set queue=-2,mod=?,usn=? where id in """+ids2str(cids),
# Sibling spacing
##########################################################################
def _burySiblings(self, card: Card) -> None:
def _burySiblings(self, card):
toBury = []
nconf = self._newConf(card)
buryNew = nconf.get("bury", True)
@ -1328,7 +1324,7 @@ and (queue=0 or (queue=2 and due<=?))""",
# Resetting
##########################################################################
def forgetCards(self, ids: List[int]) -> None:
def forgetCards(self, ids):
"Put cards at the end of the new queue."
self.remFromDyn(ids)
self.col.db.execute(
@ -1340,7 +1336,7 @@ and (queue=0 or (queue=2 and due<=?))""",
self.sortCards(ids, start=pmax+1)
self.col.log(ids)
def reschedCards(self, ids: List[int], imin: int, imax: int) -> None:
def reschedCards(self, ids, imin, imax):
"Put cards in review queue with a new interval in days (min, max)."
d = []
t = self.today
@ -1356,7 +1352,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""",
d)
self.col.log(ids)
def resetCards(self, ids) -> None:
def resetCards(self, ids):
"Completely reset cards for export."
sids = ids2str(ids)
# we want to avoid resetting due number of existing new cards on export
@ -1375,7 +1371,7 @@ usn=:usn,mod=:mod,factor=:fact where id=:id""",
# Repositioning new cards
##########################################################################
def sortCards(self, cids: List[int], start: int = 1, step: int = 1, shuffle: bool = False, shift: bool = False) -> None:
def sortCards(self, cids, start=1, step=1, shuffle=False, shift=False):
scids = ids2str(cids)
now = intTime()
nids = []
@ -1415,15 +1411,15 @@ and due >= ? and queue = 0""" % scids, now, self.col.usn(), shiftby, low)
self.col.db.executemany(
"update cards set due=:due,mod=:now,usn=:usn where id = :cid", d)
def randomizeCards(self, did: int) -> None:
def randomizeCards(self, did):
cids = self.col.db.list("select id from cards where did = ?", did)
self.sortCards(cids, shuffle=True)
def orderCards(self, did: int) -> None:
def orderCards(self, did):
cids = self.col.db.list("select id from cards where did = ? order by id", did)
self.sortCards(cids)
def resortConf(self, conf) -> None:
def resortConf(self, conf):
for did in self.col.decks.didsForConf(conf):
if conf['new']['order'] == 0:
self.randomizeCards(did)
@ -1431,7 +1427,7 @@ and due >= ? and queue = 0""" % scids, now, self.col.usn(), shiftby, low)
self.orderCards(did)
# for post-import
def maybeRandomizeDeck(self, did=None) -> None:
def maybeRandomizeDeck(self, did=None):
if not did:
did = self.col.decks.selected()
conf = self.col.decks.confForDid(did)