This commit is contained in:
Damien Elmes 2019-12-19 14:26:39 +10:00
parent 6ffe82ac54
commit efd78e66ef

View file

@ -17,6 +17,7 @@ from typing import List, Optional, Tuple, Union
# - careful not to add any lists/dicts/etc here, as they aren't deep copied # - careful not to add any lists/dicts/etc here, as they aren't deep copied
from typing import Any, Callable, Dict, List, Optional
defaultModel = { defaultModel = {
'sortf': 0, 'sortf': 0,
'did': 1, 'did': 1,
@ -81,12 +82,12 @@ class ModelManager:
self.models = {} self.models = {}
self.changed = False self.changed = False
def load(self, json_) -> None: def load(self, json_: str) -> None:
"Load registry from JSON." "Load registry from JSON."
self.changed = False self.changed = False
self.models = json.loads(json_) self.models = json.loads(json_)
def save(self, m=None, templates=False, updateReqs=True) -> None: def save(self, m: Optional[Dict[str, Any]] = None, templates: bool = False, updateReqs: bool = True) -> None:
"Mark M modified if provided, and schedule registry flush." "Mark M modified if provided, and schedule registry flush."
if m and m['id']: if m and m['id']:
m['mod'] = intTime() m['mod'] = intTime()
@ -115,18 +116,18 @@ class ModelManager:
# Retrieving and creating models # Retrieving and creating models
############################################################# #############################################################
def current(self, forDeck=True) -> Any: def current(self, forDeck: bool = True) -> Any:
"Get current model." "Get current model."
m = self.get(self.col.decks.current().get('mid')) m = self.get(self.col.decks.current().get('mid'))
if not forDeck or not m: if not forDeck or not m:
m = self.get(self.col.conf['curModel']) m = self.get(self.col.conf['curModel'])
return m or list(self.models.values())[0] return m or list(self.models.values())[0]
def setCurrent(self, m) -> None: def setCurrent(self, m: Dict[str, Any]) -> None:
self.col.conf['curModel'] = m['id'] self.col.conf['curModel'] = m['id']
self.col.setMod() self.col.setMod()
def get(self, id) -> Any: def get(self, id: Any) -> Any:
"Get model with ID, or None." "Get model with ID, or None."
id = str(id) id = str(id)
if id in self.models: if id in self.models:
@ -139,7 +140,7 @@ class ModelManager:
def allNames(self) -> List: def allNames(self) -> List:
return [m['name'] for m in self.all()] return [m['name'] for m in self.all()]
def byName(self, name) -> Any: def byName(self, name: str) -> Any:
"Get model with NAME." "Get model with NAME."
for m in list(self.models.values()): for m in list(self.models.values()):
if m['name'] == name: if m['name'] == name:
@ -157,7 +158,7 @@ class ModelManager:
m['id'] = None m['id'] = None
return m return m
def rem(self, m) -> None: def rem(self, m: Dict[str, Any]) -> None:
"Delete model, and all its cards/notes." "Delete model, and all its cards/notes."
self.col.modSchema(check=True) self.col.modSchema(check=True)
current = self.current()['id'] == m['id'] current = self.current()['id'] == m['id']
@ -172,33 +173,33 @@ select id from cards where nid in (select id from notes where mid = ?)""",
if current: if current:
self.setCurrent(list(self.models.values())[0]) self.setCurrent(list(self.models.values())[0])
def add(self, m) -> None: def add(self, m: Dict[str, Any]) -> None:
self._setID(m) self._setID(m)
self.update(m) self.update(m)
self.setCurrent(m) self.setCurrent(m)
self.save(m) self.save(m)
def ensureNameUnique(self, m) -> None: def ensureNameUnique(self, m: Dict[str, Any]) -> None:
for mcur in self.all(): for mcur in self.all():
if (mcur['name'] == m['name'] and mcur['id'] != m['id']): if (mcur['name'] == m['name'] and mcur['id'] != m['id']):
m['name'] += "-" + checksum(str(time.time()))[:5] m['name'] += "-" + checksum(str(time.time()))[:5]
break break
def update(self, m) -> None: def update(self, m: Dict[str, Any]) -> None:
"Add or update an existing model. Used for syncing and merging." "Add or update an existing model. Used for syncing and merging."
self.ensureNameUnique(m) self.ensureNameUnique(m)
self.models[str(m['id'])] = m self.models[str(m['id'])] = m
# mark registry changed, but don't bump mod time # mark registry changed, but don't bump mod time
self.save() self.save()
def _setID(self, m) -> None: def _setID(self, m: Dict[str, Any]) -> None:
while 1: while 1:
id = str(intTime(1000)) id = str(intTime(1000))
if id not in self.models: if id not in self.models:
break break
m['id'] = id m['id'] = id
def have(self, id) -> bool: def have(self, id: int) -> bool:
return str(id) in self.models return str(id) in self.models
def ids(self) -> List[str]: def ids(self) -> List[str]:
@ -207,12 +208,12 @@ select id from cards where nid in (select id from notes where mid = ?)""",
# Tools # Tools
################################################## ##################################################
def nids(self, m) -> Any: def nids(self, m: Dict[str, Any]) -> Any:
"Note ids for M." "Note ids for M."
return self.col.db.list( return self.col.db.list(
"select id from notes where mid = ?", m['id']) "select id from notes where mid = ?", m['id'])
def useCount(self, m) -> Any: def useCount(self, m: Dict[str, Any]) -> Any:
"Number of note using M." "Number of note using M."
return self.col.db.scalar( return self.col.db.scalar(
"select count() from notes where mid = ?", m['id']) "select count() from notes where mid = ?", m['id'])
@ -225,7 +226,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
# Copying # Copying
################################################## ##################################################
def copy(self, m) -> Any: def copy(self, m: Dict[str, Any]) -> Any:
"Copy, save and return." "Copy, save and return."
m2 = copy.deepcopy(m) m2 = copy.deepcopy(m)
m2['name'] = _("%s copy") % m2['name'] m2['name'] = _("%s copy") % m2['name']
@ -235,20 +236,20 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
# Fields # Fields
################################################## ##################################################
def newField(self, name) -> Dict[str, Any]: def newField(self, name: str) -> Dict[str, Any]:
assert(isinstance(name, str)) assert(isinstance(name, str))
f = defaultField.copy() f = defaultField.copy()
f['name'] = name f['name'] = name
return f return f
def fieldMap(self, m) -> Dict[Any, Tuple[Any, Any]]: def fieldMap(self, m: Dict[str, Any]) -> Dict[Any, Tuple[Any, Any]]:
"Mapping of field name -> (ord, field)." "Mapping of field name -> (ord, field)."
return dict((f['name'], (f['ord'], f)) for f in m['flds']) return dict((f['name'], (f['ord'], f)) for f in m['flds'])
def fieldNames(self, m) -> List: def fieldNames(self, m) -> List:
return [f['name'] for f in m['flds']] return [f['name'] for f in m['flds']]
def sortIdx(self, m) -> Any: def sortIdx(self, m: Dict[str, Any]) -> Any:
return m['sortf'] return m['sortf']
def setSortIdx(self, m, idx) -> None: def setSortIdx(self, m, idx) -> None:
@ -258,7 +259,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
self.col.updateFieldCache(self.nids(m)) self.col.updateFieldCache(self.nids(m))
self.save(m, updateReqs=False) self.save(m, updateReqs=False)
def addField(self, m, field) -> None: def addField(self, m: Dict[str, Any], field: Dict[str, Any]) -> None:
# only mod schema if model isn't new # only mod schema if model isn't new
if m['id']: if m['id']:
self.col.modSchema(check=True) self.col.modSchema(check=True)
@ -270,7 +271,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
return fields return fields
self._transformFields(m, add) self._transformFields(m, add)
def remField(self, m, field) -> None: def remField(self, m: Dict[str, Any], field: Dict[str, Any]) -> None:
self.col.modSchema(check=True) self.col.modSchema(check=True)
# save old sort field # save old sort field
sortFldName = m['flds'][m['sortf']]['name'] sortFldName = m['flds'][m['sortf']]['name']
@ -293,7 +294,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
# saves # saves
self.renameField(m, field, None) self.renameField(m, field, None)
def moveField(self, m, field, idx) -> None: def moveField(self, m: Dict[str, Any], field: Dict[str, Any], idx: int) -> None:
self.col.modSchema(check=True) self.col.modSchema(check=True)
oldidx = m['flds'].index(field) oldidx = m['flds'].index(field)
if oldidx == idx: if oldidx == idx:
@ -314,7 +315,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
return fields return fields
self._transformFields(m, move) self._transformFields(m, move)
def renameField(self, m, field, newName) -> None: def renameField(self, m: Dict[str, Any], field: Dict[str, Any], newName: Optional[str]) -> None:
self.col.modSchema(check=True) self.col.modSchema(check=True)
pat = r'{{([^{}]*)([:#^/]|[^:#/^}][^:}]*?:|)%s}}' pat = r'{{([^{}]*)([:#^/]|[^:#/^}][^:}]*?:|)%s}}'
def wrap(txt): def wrap(txt):
@ -332,11 +333,11 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
field['name'] = newName field['name'] = newName
self.save(m) self.save(m)
def _updateFieldOrds(self, m) -> None: def _updateFieldOrds(self, m: Dict[str, Any]) -> None:
for c, f in enumerate(m['flds']): for c, f in enumerate(m['flds']):
f['ord'] = c f['ord'] = c
def _transformFields(self, m, fn) -> None: def _transformFields(self, m: Dict[str, Any], fn: Callable) -> None:
# model hasn't been added yet? # model hasn't been added yet?
if not m['id']: if not m['id']:
return return
@ -356,7 +357,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
t['name'] = name t['name'] = name
return t return t
def addTemplate(self, m, template) -> None: def addTemplate(self, m: Dict[str, Any], template: Dict[str, Union[str, None]]) -> None:
"Note: should col.genCards() afterwards." "Note: should col.genCards() afterwards."
if m['id']: if m['id']:
self.col.modSchema(check=True) self.col.modSchema(check=True)
@ -364,7 +365,7 @@ and notes.mid = ? and cards.ord = ?""", m['id'], ord)
self._updateTemplOrds(m) self._updateTemplOrds(m)
self.save(m) self.save(m)
def remTemplate(self, m, template) -> bool: def remTemplate(self, m: Dict[str, Any], template: Dict[str, Any]) -> bool:
"False if removing template would leave orphan notes." "False if removing template would leave orphan notes."
assert len(m['tmpls']) > 1 assert len(m['tmpls']) > 1
# find cards using this template # find cards using this template
@ -394,7 +395,7 @@ update cards set ord = ord - 1, usn = ?, mod = ?
self.save(m) self.save(m)
return True return True
def _updateTemplOrds(self, m) -> None: def _updateTemplOrds(self, m: Dict[str, Any]) -> None:
for c, t in enumerate(m['tmpls']): for c, t in enumerate(m['tmpls']):
t['ord'] = c t['ord'] = c
@ -417,7 +418,7 @@ update cards set ord = (case %s end),usn=?,mod=? where nid in (
select id from notes where mid = ?)""" % " ".join(map), select id from notes where mid = ?)""" % " ".join(map),
self.col.usn(), intTime(), m['id']) self.col.usn(), intTime(), m['id'])
def _syncTemplates(self, m) -> None: def _syncTemplates(self, m: Dict[str, Any]) -> None:
rem = self.col.genCards(self.nids(m)) rem = self.col.genCards(self.nids(m))
# Model changing # Model changing
@ -425,7 +426,7 @@ select id from notes where mid = ?)""" % " ".join(map),
# - maps are ord->ord, and there should not be duplicate targets # - maps are ord->ord, and there should not be duplicate targets
# - newModel should be self if model is not changing # - newModel should be self if model is not changing
def change(self, m, nids, newModel, fmap, cmap) -> None: def change(self, m: Dict[str, Any], nids: List[int], newModel: Dict[str, Any], fmap: Any, cmap: Any) -> None:
self.col.modSchema(check=True) self.col.modSchema(check=True)
assert newModel['id'] == m['id'] or (fmap and cmap) assert newModel['id'] == m['id'] or (fmap and cmap)
if fmap: if fmap:
@ -434,7 +435,7 @@ select id from notes where mid = ?)""" % " ".join(map),
self._changeCards(nids, m, newModel, cmap) self._changeCards(nids, m, newModel, cmap)
self.col.genCards(nids) self.col.genCards(nids)
def _changeNotes(self, nids, newModel, map) -> None: def _changeNotes(self, nids: List[int], newModel: Dict[str, Any], map: Dict[int, Union[None, int]]) -> None:
d = [] d = []
nfields = len(newModel['flds']) nfields = len(newModel['flds'])
for (nid, flds) in self.col.db.execute( for (nid, flds) in self.col.db.execute(
@ -453,7 +454,7 @@ select id from notes where mid = ?)""" % " ".join(map),
"update notes set flds=:flds,mid=:mid,mod=:m,usn=:u where id = :nid", d) "update notes set flds=:flds,mid=:mid,mod=:m,usn=:u where id = :nid", d)
self.col.updateFieldCache(nids) self.col.updateFieldCache(nids)
def _changeCards(self, nids, oldModel, newModel, map) -> None: def _changeCards(self, nids: List[int], oldModel: Dict[str, Any], newModel: Dict[str, Any], map: Dict[int, Union[None, int]]) -> None:
d = [] d = []
deleted = [] deleted = []
for (cid, ord) in self.col.db.execute( for (cid, ord) in self.col.db.execute(
@ -483,7 +484,7 @@ select id from notes where mid = ?)""" % " ".join(map),
# Schema hash # Schema hash
########################################################################## ##########################################################################
def scmhash(self, m) -> str: def scmhash(self, m: Dict[str, Any]) -> str:
"Return a hash of the schema, to see if models are compatible." "Return a hash of the schema, to see if models are compatible."
s = "" s = ""
for f in m['flds']: for f in m['flds']:
@ -495,7 +496,7 @@ select id from notes where mid = ?)""" % " ".join(map),
# Required field/text cache # Required field/text cache
########################################################################## ##########################################################################
def _updateRequired(self, m) -> None: def _updateRequired(self, m: Dict[str, Any]) -> None:
if m['type'] == MODEL_CLOZE: if m['type'] == MODEL_CLOZE:
# nothing to do # nothing to do
return return
@ -506,7 +507,7 @@ select id from notes where mid = ?)""" % " ".join(map),
req.append([t['ord'], ret[0], ret[1]]) req.append([t['ord'], ret[0], ret[1]])
m['req'] = req m['req'] = req
def _reqForTemplate(self, m, flds, t) -> Tuple[Union[str, List[int]], ...]: def _reqForTemplate(self, m: Dict[str, Any], flds: List[str], t: Dict[str, Any]) -> Tuple[Union[str, List[int]], ...]:
a = [] a = []
b = [] b = []
for f in flds: for f in flds:
@ -543,7 +544,7 @@ select id from notes where mid = ?)""" % " ".join(map),
req.append(i) req.append(i)
return type, req return type, req
def availOrds(self, m, flds) -> List: def availOrds(self, m: Dict[str, Any], flds: str) -> List:
"Given a joined field string, return available template ordinals." "Given a joined field string, return available template ordinals."
if m['type'] == MODEL_CLOZE: if m['type'] == MODEL_CLOZE:
return self._availClozeOrds(m, flds) return self._availClozeOrds(m, flds)
@ -577,7 +578,7 @@ select id from notes where mid = ?)""" % " ".join(map),
avail.append(ord) avail.append(ord)
return avail return avail
def _availClozeOrds(self, m, flds, allowEmpty=True) -> List: def _availClozeOrds(self, m: Dict[str, Any], flds: str, allowEmpty: bool = True) -> List:
sflds = splitFields(flds) sflds = splitFields(flds)
map = self.fieldMap(m) map = self.fieldMap(m)
ords = set() ords = set()