importing

This commit is contained in:
Damien Elmes 2019-12-20 11:41:24 +10:00
parent c415a70e72
commit 55795822b5
7 changed files with 60 additions and 47 deletions

View file

@ -59,8 +59,11 @@ def timezoneOffset() -> int:
else:
return time.timezone//60
from anki.schedv2 import Scheduler
# this is initialized by storage.Collection
class _Collection:
sched: Scheduler
def __init__(self, db: DB, server: bool = False, log: bool = False) -> None:
self._debugLog = log

View file

@ -8,8 +8,10 @@ from anki.storage import Collection
from anki.utils import intTime, splitFields, joinFields
from anki.importing.base import Importer
from anki.lang import _
from typing import Any
from typing import Any, Optional
from anki.collection import _Collection
from typing import List, Union
GUID = 1
MID = 2
MOD = 3
@ -20,7 +22,7 @@ class Anki2Importer(Importer):
deckPrefix = None
allowUpdate = True
def __init__(self, col, file):
def __init__(self, col: _Collection, file: str) -> None:
super().__init__(col, file)
# set later, defined here for typechecking
@ -28,7 +30,7 @@ class Anki2Importer(Importer):
self._decks = {}
self.mustResetLearning = False
def run(self, media=None) -> None:
def run(self, media: None = None) -> None:
self._prepareFiles()
if media is not None:
# Anki1 importer has provided us with a custom media folder
@ -69,7 +71,7 @@ class Anki2Importer(Importer):
# Notes
######################################################################
def _logNoteRow(self, action, noteRow) -> None:
def _logNoteRow(self, action: str, noteRow: List[str]) -> None:
self.log.append("[%s] %s" % (
action,
noteRow[6].replace("\x1f", ", ")
@ -186,7 +188,7 @@ class Anki2Importer(Importer):
# determine if note is a duplicate, and adjust mid and/or guid as required
# returns true if note should be added
def _uniquifyNote(self, note) -> bool:
def _uniquifyNote(self, note: List[Union[int, str]]) -> bool:
origGuid = note[GUID]
srcMid = note[MID]
dstMid = self._mid(srcMid)
@ -212,7 +214,7 @@ class Anki2Importer(Importer):
"Prepare index of schema hashes."
self._modelMap = {}
def _mid(self, srcMid) -> Any:
def _mid(self, srcMid: int) -> Any:
"Return local id for remote MID."
# already processed this mid?
if srcMid in self._modelMap:
@ -249,7 +251,7 @@ class Anki2Importer(Importer):
# Decks
######################################################################
def _did(self, did) -> Any:
def _did(self, did: int) -> Any:
"Given did in src col, return local id."
# already converted?
if did in self._decks:
@ -393,7 +395,7 @@ insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)""", revlog)
if fname.startswith("_") and not self.dst.media.have(fname):
self._writeDstMedia(fname, self._srcMediaData(fname))
def _mediaData(self, fname, dir=None) -> bytes:
def _mediaData(self, fname: str, dir: Optional[str] = None) -> bytes:
if not dir:
dir = self.src.media.dir()
path = os.path.join(dir, fname)
@ -403,15 +405,15 @@ insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)""", revlog)
except (IOError, OSError):
return
def _srcMediaData(self, fname) -> bytes:
def _srcMediaData(self, fname: str) -> bytes:
"Data for FNAME in src collection."
return self._mediaData(fname, self.src.media.dir())
def _dstMediaData(self, fname) -> bytes:
def _dstMediaData(self, fname: str) -> bytes:
"Data for FNAME in dst collection."
return self._mediaData(fname, self.dst.media.dir())
def _writeDstMedia(self, fname, data) -> None:
def _writeDstMedia(self, fname: str, data: bytes) -> None:
path = os.path.join(self.dst.media.dir(),
unicodedata.normalize("NFC", fname))
try:
@ -421,7 +423,7 @@ insert or ignore into revlog values (?,?,?,?,?,?,?,?,?)""", revlog)
# the user likely used subdirectories
pass
def _mungeMedia(self, mid, fields) -> str:
def _mungeMedia(self, mid: int, fields: str) -> str:
fields = splitFields(fields)
def repl(match):
fname = match.group("fname")

View file

@ -9,9 +9,10 @@ from anki.utils import tmpfile
from anki.importing.anki2 import Anki2Importer
from typing import Any
from anki.collection import _Collection
class AnkiPackageImporter(Anki2Importer):
def __init__(self, col, file):
def __init__(self, col: _Collection, file: str) -> None:
super().__init__(col, file)
# set later; set here for typechecking
self.nameToNum = {}
@ -53,7 +54,7 @@ class AnkiPackageImporter(Anki2Importer):
with open(path, "wb") as f:
f.write(z.read(c))
def _srcMediaData(self, fname) -> Any:
def _srcMediaData(self, fname: str) -> Any:
if fname in self.nameToNum:
return self.zip.read(self.nameToNum[fname])
return None

View file

@ -8,12 +8,13 @@ from typing import Any
# Base importer
##########################################################################
from anki.collection import _Collection
class Importer:
needMapper = False
needDelimiter = False
def __init__(self, col, file) -> None:
def __init__(self, col: _Collection, file: str) -> None:
self.file = file
self.log = []
self.col = col

View file

@ -10,12 +10,13 @@ from anki.lang import _
from typing import List
from anki.collection import _Collection
class TextImporter(NoteImporter):
needDelimiter = True
patterns = "\t|,;:"
def __init__(self, col, file):
def __init__(self, col: _Collection, file: str) -> None:
NoteImporter.__init__(self, col, file)
self.lines = None
self.fileobj = None
@ -56,7 +57,7 @@ class TextImporter(NoteImporter):
self.fileobj.close()
return notes
def open(self):
def open(self) -> None:
"Parse the top line and determine the pattern and number of fields."
# load & look for the right pattern
self.cacheFile()
@ -122,12 +123,12 @@ class TextImporter(NoteImporter):
err()
self.initMapping()
def fields(self):
def fields(self) -> int:
"Number of fields."
self.open()
return self.numFields
def noteFromFields(self, fields) -> ForeignNote:
def noteFromFields(self, fields: List[str]) -> ForeignNote:
note = ForeignNote()
note.fields.extend([x for x in fields])
note.tags.extend(self.tagsToAdd)

View file

@ -17,6 +17,8 @@ from typing import Any, List, Optional
# Stores a list of fields, tags and deck
######################################################################
from anki.collection import _Collection
from typing import List, Optional, Union
class ForeignNote:
"An temporary object storing fields and attributes."
def __init__(self) -> None:
@ -24,6 +26,7 @@ class ForeignNote:
self.tags = []
self.deck = None
self.cards = {} # map of ord -> card
self.fieldsStr = ""
class ForeignCard:
def __init__(self) -> None:
@ -54,14 +57,14 @@ class NoteImporter(Importer):
allowHTML = False
importMode = 0
def __init__(self, col, file):
def __init__(self, col: _Collection, file: str) -> None:
Importer.__init__(self, col, file)
self.model = col.models.current()
self.mapping = None
self._deckMap = {}
self._tagsMapped = False
def run(self):
def run(self) -> None:
"Import."
assert self.mapping
c = self.foreignNotes()
@ -93,7 +96,7 @@ class NoteImporter(Importer):
"Open file and ensure it's in the right format."
return
def importNotes(self, notes) -> None:
def importNotes(self, notes: List[ForeignNote]) -> None:
"Convert each card into a note, apply attributes and add to col."
assert self.mappingOk()
# note whether tags are mapped
@ -220,7 +223,7 @@ This can happen when you have empty fields or when you have not mapped the \
content in the text file to the correct fields."""))
self.total = len(self._ids)
def newData(self, n) -> Optional[list]:
def newData(self, n: ForeignNote) -> Optional[list]:
id = self._nextID
self._nextID += 1
self._ids.append(id)
@ -234,12 +237,12 @@ content in the text file to the correct fields."""))
intTime(), self.col.usn(), self.col.tags.join(n.tags),
n.fieldsStr, "", "", 0, ""]
def addNew(self, rows) -> None:
def addNew(self, rows: List[List[Union[int, str]]]) -> None:
self.col.db.executemany(
"insert or replace into notes values (?,?,?,?,?,?,?,?,?,?,?)",
rows)
def updateData(self, n, id, sflds) -> Optional[list]:
def updateData(self, n: ForeignNote, id: int, sflds: List[str]) -> Optional[list]:
self._ids.append(id)
if not self.processFields(n, sflds):
return
@ -252,7 +255,7 @@ content in the text file to the correct fields."""))
return [intTime(), self.col.usn(), n.fieldsStr,
id, n.fieldsStr]
def addUpdates(self, rows) -> None:
def addUpdates(self, rows: List[List[Union[int, str]]]) -> None:
old = self.col.db.totalChanges()
if self._tagsMapped:
self.col.db.executemany("""
@ -264,7 +267,7 @@ update notes set mod = ?, usn = ?, flds = ?
where id = ? and flds != ?""", rows)
self.updateCount = self.col.db.totalChanges() - old
def processFields(self, note, fields=None) -> Any:
def processFields(self, note: ForeignNote, fields: Optional[List[str]] = None) -> Any:
if not fields:
fields = [""]*len(self.model['flds'])
for c, f in enumerate(self.mapping):

View file

@ -13,8 +13,10 @@ from anki.lang import ngettext
from xml.dom import minidom
from string import capwords
import re, unicodedata, time
from typing import Any, List
from typing import Any, List, Optional
from anki.collection import _Collection
from xml.dom.minidom import Element, Text
class SmartDict(dict):
"""
See http://www.peterbe.com/plog/SmartDict
@ -41,7 +43,7 @@ class SmartDict(dict):
class SuperMemoElement(SmartDict):
"SmartDict wrapper to store SM Element data"
def __init__(self, *a, **kw):
def __init__(self, *a, **kw) -> None:
SmartDict.__init__(self, *a, **kw)
#default content
self.__dict__['lTitle'] = None
@ -80,7 +82,7 @@ class SupermemoXmlImporter(NoteImporter):
Code should be upgrade to support importing of SM2006 exports.
"""
def __init__(self, col, file):
def __init__(self, col: _Collection, file: str) -> None:
"""Initialize internal varables.
Pameters to be exposed to GUI are stored in self.META"""
NoteImporter.__init__(self, col, file)
@ -120,17 +122,17 @@ class SupermemoXmlImporter(NoteImporter):
## TOOLS
def _fudgeText(self, text) -> Any:
def _fudgeText(self, text: str) -> Any:
"Replace sm syntax to Anki syntax"
text = text.replace("\n\r", "<br>")
text = text.replace("\n", "<br>")
return text
def _unicode2ascii(self,str) -> str:
def _unicode2ascii(self,str: str) -> str:
"Remove diacritic punctuation from strings (titles)"
return "".join([ c for c in unicodedata.normalize('NFKD', str) if not unicodedata.combining(c)])
def _decode_htmlescapes(self,s) -> str:
def _decode_htmlescapes(self,s: str) -> str:
"""Unescape HTML code."""
#In case of bad formated html you can import MinimalSoup etc.. see btflsoup source code
from bs4 import BeautifulSoup as btflsoup
@ -143,7 +145,7 @@ class SupermemoXmlImporter(NoteImporter):
return str(btflsoup(s, "html.parser"))
def _afactor2efactor(self, af) -> Any:
def _afactor2efactor(self, af: float) -> Any:
# Adapted from <http://www.supermemo.com/beta/xml/xml-core.htm>
# Ranges for A-factors and E-factors
@ -183,12 +185,12 @@ class SupermemoXmlImporter(NoteImporter):
self.log.append(ngettext("%d card imported.", "%d cards imported.", self.total) % self.total)
return self.notes
def fields(self):
def fields(self) -> int:
return 2
## PARSER METHODS
def addItemToCards(self,item) -> None:
def addItemToCards(self,item: SuperMemoElement) -> None:
"This method actually do conversion"
# new anki card
@ -248,7 +250,7 @@ class SupermemoXmlImporter(NoteImporter):
self.notes.append(note)
def logger(self,text,level=1) -> None:
def logger(self,text: str,level: int = 1) -> None:
"Wrapper for Anki logger"
dLevels={0:'',1:'Info',2:'Verbose',3:'Debug'}
@ -283,7 +285,7 @@ class SupermemoXmlImporter(NoteImporter):
import io
return io.StringIO(str(source))
def loadSource(self, source) -> None:
def loadSource(self, source: str) -> None:
"""Load source file and parse with xml.dom.minidom"""
self.source = source
self.logger('Load started...')
@ -294,7 +296,7 @@ class SupermemoXmlImporter(NoteImporter):
# PARSE
def parse(self, node=None) -> None:
def parse(self, node: Optional[Any] = None) -> None:
"Parse method - parses document elements"
if node is None and self.xmldoc is not None:
@ -312,7 +314,7 @@ class SupermemoXmlImporter(NoteImporter):
self.parse(node.documentElement)
def parse_Element(self, node) -> None:
def parse_Element(self, node: Element) -> None:
"Parse XML element"
_method = "do_%s" % node.tagName
@ -323,7 +325,7 @@ class SupermemoXmlImporter(NoteImporter):
self.logger('No handler for method %s' % _method, level=3)
#print traceback.print_exc()
def parse_Text(self, node) -> None:
def parse_Text(self, node: Text) -> None:
"Parse text inside elements. Text is stored into local buffer."
text = node.data
@ -337,12 +339,12 @@ class SupermemoXmlImporter(NoteImporter):
# DO
def do_SuperMemoCollection(self, node) -> None:
def do_SuperMemoCollection(self, node: Element) -> None:
"Process SM Collection"
for child in node.childNodes: self.parse(child)
def do_SuperMemoElement(self, node) -> None:
def do_SuperMemoElement(self, node: Element) -> None:
"Process SM Element (Type - Title,Topics)"
self.logger('='*45, level=3)
@ -392,14 +394,14 @@ class SupermemoXmlImporter(NoteImporter):
t = self.cntMeta['title'].pop()
self.logger('End of topic \t- %s' % (t), level=2)
def do_Content(self, node) -> None:
def do_Content(self, node: Element) -> None:
"Process SM element Content"
for child in node.childNodes:
if hasattr(child,'tagName') and child.firstChild is not None:
self.cntElm[-1][child.tagName]=child.firstChild.data
def do_LearningData(self, node) -> None:
def do_LearningData(self, node: Element) -> None:
"Process SM element LearningData"
for child in node.childNodes:
@ -416,7 +418,7 @@ class SupermemoXmlImporter(NoteImporter):
# for child in node.childNodes: self.parse(child)
# self.cntElm[-1][node.tagName]=self.cntBuf.pop()
def do_Title(self, node) -> None:
def do_Title(self, node: Element) -> None:
"Process SM element Title"
t = self._decode_htmlescapes(node.firstChild.data)
@ -426,7 +428,7 @@ class SupermemoXmlImporter(NoteImporter):
self.logger('Start of topic \t- ' + " / ".join(self.cntMeta['title']), level=2)
def do_Type(self, node) -> None:
def do_Type(self, node: Element) -> None:
"Process SM element Type"
if len(self.cntBuf) >=1 :