mirror of
https://github.com/ankitects/anki.git
synced 2025-09-19 06:22:22 -04:00

I've corrected some obvious issues, and we can fix others over time. Mypy tests are currently broken, as adding the type hints has increased mypy's testing surface.
134 lines
4.3 KiB
Python
134 lines
4.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright: Ankitects Pty Ltd and contributors
|
|
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
|
|
|
import csv
|
|
import re
|
|
|
|
from anki.importing.noteimp import NoteImporter, ForeignNote
|
|
from anki.lang import _
|
|
from typing import List
|
|
|
|
|
|
class TextImporter(NoteImporter):
|
|
|
|
needDelimiter = True
|
|
patterns = "\t|,;:"
|
|
|
|
def __init__(self, col, file):
|
|
NoteImporter.__init__(self, col, file)
|
|
self.lines = None
|
|
self.fileobj = None
|
|
self.delimiter = None
|
|
self.tagsToAdd = []
|
|
self.numFields = 0
|
|
|
|
def foreignNotes(self) -> List[ForeignNote]:
|
|
self.open()
|
|
# process all lines
|
|
log = []
|
|
notes = []
|
|
lineNum = 0
|
|
ignored = 0
|
|
if self.delimiter:
|
|
reader = csv.reader(self.data, delimiter=self.delimiter, doublequote=True)
|
|
else:
|
|
reader = csv.reader(self.data, self.dialect, doublequote=True)
|
|
try:
|
|
for row in reader:
|
|
if len(row) != self.numFields:
|
|
if row:
|
|
log.append(_(
|
|
"'%(row)s' had %(num1)d fields, "
|
|
"expected %(num2)d") % {
|
|
"row": " ".join(row),
|
|
"num1": len(row),
|
|
"num2": self.numFields,
|
|
})
|
|
ignored += 1
|
|
continue
|
|
note = self.noteFromFields(row)
|
|
notes.append(note)
|
|
except (csv.Error) as e:
|
|
log.append(_("Aborted: %s") % str(e))
|
|
self.log = log
|
|
self.ignored = ignored
|
|
self.fileobj.close()
|
|
return notes
|
|
|
|
def open(self):
|
|
"Parse the top line and determine the pattern and number of fields."
|
|
# load & look for the right pattern
|
|
self.cacheFile()
|
|
|
|
def cacheFile(self) -> None:
|
|
"Read file into self.lines if not already there."
|
|
if not self.fileobj:
|
|
self.openFile()
|
|
|
|
def openFile(self) -> None:
|
|
self.dialect = None
|
|
self.fileobj = open(self.file, "r", encoding='utf-8-sig')
|
|
self.data = self.fileobj.read()
|
|
def sub(s):
|
|
return re.sub(r"^\#.*$", "__comment", s)
|
|
self.data = [sub(x)+"\n" for x in self.data.split("\n") if sub(x) != "__comment"]
|
|
if self.data:
|
|
if self.data[0].startswith("tags:"):
|
|
tags = str(self.data[0][5:]).strip()
|
|
self.tagsToAdd = tags.split(" ")
|
|
del self.data[0]
|
|
self.updateDelimiter()
|
|
if not self.dialect and not self.delimiter:
|
|
raise Exception("unknownFormat")
|
|
|
|
def updateDelimiter(self) -> None:
|
|
def err():
|
|
raise Exception("unknownFormat")
|
|
self.dialect = None
|
|
sniffer = csv.Sniffer()
|
|
if not self.delimiter:
|
|
try:
|
|
self.dialect = sniffer.sniff("\n".join(self.data[:10]),
|
|
self.patterns)
|
|
except:
|
|
try:
|
|
self.dialect = sniffer.sniff(self.data[0], self.patterns)
|
|
except:
|
|
pass
|
|
if self.dialect:
|
|
try:
|
|
reader = csv.reader(self.data, self.dialect, doublequote=True)
|
|
except:
|
|
err()
|
|
else:
|
|
if not self.delimiter:
|
|
if "\t" in self.data[0]:
|
|
self.delimiter = "\t"
|
|
elif ";" in self.data[0]:
|
|
self.delimiter = ";"
|
|
elif "," in self.data[0]:
|
|
self.delimiter = ","
|
|
else:
|
|
self.delimiter = " "
|
|
reader = csv.reader(self.data, delimiter=self.delimiter, doublequote=True)
|
|
try:
|
|
while True:
|
|
row = next(reader)
|
|
if row:
|
|
self.numFields = len(row)
|
|
break
|
|
except:
|
|
err()
|
|
self.initMapping()
|
|
|
|
def fields(self):
|
|
"Number of fields."
|
|
self.open()
|
|
return self.numFields
|
|
|
|
def noteFromFields(self, fields) -> ForeignNote:
|
|
note = ForeignNote()
|
|
note.fields.extend([x for x in fields])
|
|
note.tags.extend(self.tagsToAdd)
|
|
return note
|