mirror of
https://github.com/ankitects/anki.git
synced 2025-09-21 15:32:23 -04:00
anki2 importing and reorganize import code
This commit is contained in:
parent
1ba75e8dc9
commit
83f8ef45ff
13 changed files with 687 additions and 489 deletions
|
@ -2,349 +2,15 @@
|
|||
# Copyright: Damien Elmes <anki@ichi2.net>
|
||||
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
"""\
|
||||
Importing support
|
||||
==============================
|
||||
|
||||
To import, a mapping is created of the form: [FieldModel, ...]. The mapping
|
||||
may be extended by calling code if a file has more fields. To ignore a
|
||||
particular FieldModel, replace it with None. A special number 0 donates a tags
|
||||
field. The same field model should not occur more than once."""
|
||||
|
||||
import time
|
||||
#from anki.cards import cardsTable
|
||||
#from anki.facts import factsTable, fieldsTable
|
||||
from anki.lang import _
|
||||
from anki.utils import fieldChecksum, ids2str
|
||||
from anki.errors import *
|
||||
#from anki.deck import NEW_CARDS_RANDOM
|
||||
|
||||
# Base importer
|
||||
##########################################################################
|
||||
|
||||
class ForeignCard(object):
|
||||
"An temporary object storing fields and attributes."
|
||||
def __init__(self):
|
||||
self.fields = []
|
||||
self.tags = u""
|
||||
|
||||
class Importer(object):
|
||||
|
||||
needMapper = True
|
||||
tagDuplicates = False
|
||||
# if set, update instead of regular importing
|
||||
# (foreignCardFieldIndex, fieldModelId)
|
||||
updateKey = None
|
||||
multipleCardsAllowed = True
|
||||
needDelimiter = False
|
||||
|
||||
def __init__(self, deck, file):
|
||||
self.file = file
|
||||
self._model = deck.currentModel
|
||||
self._mapping = None
|
||||
self.log = []
|
||||
self.deck = deck
|
||||
self.total = 0
|
||||
self.tagsToAdd = u""
|
||||
|
||||
def doImport(self):
|
||||
"Import."
|
||||
if self.updateKey is not None:
|
||||
return self.doUpdate()
|
||||
random = self.deck.newCardOrder == NEW_CARDS_RANDOM
|
||||
num = 6
|
||||
if random:
|
||||
num += 1
|
||||
c = self.foreignCards()
|
||||
if self.importCards(c):
|
||||
self.deck.updateCardTags(self.cardIds)
|
||||
if random:
|
||||
self.deck.randomizeNewCards(self.cardIds)
|
||||
if c:
|
||||
self.deck.setModified()
|
||||
|
||||
def doUpdate(self):
|
||||
# grab the data from the external file
|
||||
cards = self.foreignCards()
|
||||
# grab data from db
|
||||
fields = self.deck.db.all("""
|
||||
select factId, value from fields where fieldModelId = :id
|
||||
and value != ''""",
|
||||
id=self.updateKey[1])
|
||||
# hash it
|
||||
vhash = {}
|
||||
fids = []
|
||||
for (fid, val) in fields:
|
||||
fids.append(fid)
|
||||
vhash[val] = fid
|
||||
# prepare tags
|
||||
tagsIdx = None
|
||||
try:
|
||||
tagsIdx = self.mapping.index(0)
|
||||
for c in cards:
|
||||
c.tags = canonifyTags(self.tagsToAdd + " " + c.fields[tagsIdx])
|
||||
except ValueError:
|
||||
pass
|
||||
# look for matches
|
||||
upcards = []
|
||||
newcards = []
|
||||
for c in cards:
|
||||
v = c.fields[self.updateKey[0]]
|
||||
if v in vhash:
|
||||
# ignore empty keys
|
||||
if v:
|
||||
# fid, card
|
||||
upcards.append((vhash[v], c))
|
||||
else:
|
||||
newcards.append(c)
|
||||
# update fields
|
||||
for fm in self.model.fieldModels:
|
||||
if fm.id == self.updateKey[1]:
|
||||
# don't update key
|
||||
continue
|
||||
try:
|
||||
index = self.mapping.index(fm)
|
||||
except ValueError:
|
||||
# not mapped
|
||||
continue
|
||||
data = [{'fid': fid,
|
||||
'fmid': fm.id,
|
||||
'v': c.fields[index],
|
||||
'chk': self.maybeChecksum(c.fields[index], fm.unique)}
|
||||
for (fid, c) in upcards]
|
||||
self.deck.db.execute("""
|
||||
update fields set value = :v, chksum = :chk where factId = :fid
|
||||
and fieldModelId = :fmid""", data)
|
||||
# update tags
|
||||
if tagsIdx is not None:
|
||||
data = [{'fid': fid,
|
||||
't': c.fields[tagsIdx]}
|
||||
for (fid, c) in upcards]
|
||||
self.deck.db.execute(
|
||||
"update facts set tags = :t where id = :fid",
|
||||
data)
|
||||
# rebuild caches
|
||||
cids = self.deck.db.column0(
|
||||
"select id from cards where factId in %s" %
|
||||
ids2str(fids))
|
||||
self.deck.updateCardTags(cids)
|
||||
self.deck.updateCardsFromFactIds(fids)
|
||||
self.total = len(cards)
|
||||
self.deck.setModified()
|
||||
|
||||
def fields(self):
|
||||
"The number of fields."
|
||||
return 0
|
||||
|
||||
def maybeChecksum(self, data, unique):
|
||||
if not unique:
|
||||
return ""
|
||||
return fieldChecksum(data)
|
||||
|
||||
def foreignCards(self):
|
||||
"Return a list of foreign cards for importing."
|
||||
assert 0
|
||||
|
||||
def resetMapping(self):
|
||||
"Reset mapping to default."
|
||||
numFields = self.fields()
|
||||
m = [f for f in self.model.fieldModels]
|
||||
m.append(0)
|
||||
rem = max(0, self.fields() - len(m))
|
||||
m += [None] * rem
|
||||
del m[numFields:]
|
||||
self._mapping = m
|
||||
|
||||
def getMapping(self):
|
||||
if not self._mapping:
|
||||
self.resetMapping()
|
||||
return self._mapping
|
||||
|
||||
def setMapping(self, mapping):
|
||||
self._mapping = mapping
|
||||
|
||||
mapping = property(getMapping, setMapping)
|
||||
|
||||
def getModel(self):
|
||||
return self._model
|
||||
|
||||
def setModel(self, model):
|
||||
self._model = model
|
||||
# update the mapping for the new model
|
||||
self._mapping = None
|
||||
self.getMapping()
|
||||
|
||||
model = property(getModel, setModel)
|
||||
|
||||
def importCards(self, cards):
|
||||
"Convert each card into a fact, apply attributes and add to deck."
|
||||
# ensure all unique and required fields are mapped
|
||||
for fm in self.model.fieldModels:
|
||||
if fm.required or fm.unique:
|
||||
if fm not in self.mapping:
|
||||
raise ImportFormatError(
|
||||
type="missingRequiredUnique",
|
||||
info=_("Missing required/unique field '%(field)s'") %
|
||||
{'field': fm.name})
|
||||
active = 0
|
||||
for cm in self.model.cardModels:
|
||||
if cm.active: active += 1
|
||||
if active > 1 and not self.multipleCardsAllowed:
|
||||
raise ImportFormatError(type="tooManyCards",
|
||||
info=_("""\
|
||||
The current importer only supports a single active card template. Please disable\
|
||||
all but one card template."""))
|
||||
# strip invalid cards
|
||||
cards = self.stripInvalid(cards)
|
||||
cards = self.stripOrTagDupes(cards)
|
||||
self.cardIds = []
|
||||
if cards:
|
||||
self.addCards(cards)
|
||||
return cards
|
||||
|
||||
def addCards(self, cards):
|
||||
"Add facts in bulk from foreign cards."
|
||||
# map tags field to attr
|
||||
try:
|
||||
idx = self.mapping.index(0)
|
||||
for c in cards:
|
||||
c.tags += " " + c.fields[idx]
|
||||
except ValueError:
|
||||
pass
|
||||
# add facts
|
||||
factIds = [genID() for n in range(len(cards))]
|
||||
factCreated = {}
|
||||
def fudgeCreated(d, tmp=[]):
|
||||
if not tmp:
|
||||
tmp.append(time.time())
|
||||
else:
|
||||
tmp[0] += 0.0001
|
||||
d['created'] = tmp[0]
|
||||
factCreated[d['id']] = d['created']
|
||||
return d
|
||||
self.deck.db.execute(factsTable.insert(),
|
||||
[fudgeCreated({'modelId': self.model.id,
|
||||
'tags': canonifyTags(self.tagsToAdd + " " + cards[n].tags),
|
||||
'id': factIds[n]}) for n in range(len(cards))])
|
||||
self.deck.db.execute("""
|
||||
delete from factsDeleted
|
||||
where factId in (%s)""" % ",".join([str(s) for s in factIds]))
|
||||
# add all the fields
|
||||
for fm in self.model.fieldModels:
|
||||
try:
|
||||
index = self.mapping.index(fm)
|
||||
except ValueError:
|
||||
index = None
|
||||
data = [{'factId': factIds[m],
|
||||
'fieldModelId': fm.id,
|
||||
'ordinal': fm.ordinal,
|
||||
'id': genID(),
|
||||
'value': (index is not None and
|
||||
cards[m].fields[index] or u""),
|
||||
'chksum': self.maybeChecksum(
|
||||
index is not None and
|
||||
cards[m].fields[index] or u"", fm.unique)
|
||||
}
|
||||
for m in range(len(cards))]
|
||||
self.deck.db.execute(fieldsTable.insert(),
|
||||
data)
|
||||
# and cards
|
||||
active = 0
|
||||
for cm in self.model.cardModels:
|
||||
if cm.active:
|
||||
active += 1
|
||||
data = [self.addMeta({
|
||||
'id': genID(),
|
||||
'factId': factIds[m],
|
||||
'factCreated': factCreated[factIds[m]],
|
||||
'cardModelId': cm.id,
|
||||
'ordinal': cm.ordinal,
|
||||
'question': u"",
|
||||
'answer': u""
|
||||
},cards[m]) for m in range(len(cards))]
|
||||
self.deck.db.execute(cardsTable.insert(),
|
||||
data)
|
||||
self.deck.updateCardsFromFactIds(factIds)
|
||||
self.total = len(factIds)
|
||||
|
||||
def addMeta(self, data, card):
|
||||
"Add any scheduling metadata to cards"
|
||||
if 'fields' in card.__dict__:
|
||||
del card.fields
|
||||
t = data['factCreated'] + data['ordinal'] * 0.00001
|
||||
data['created'] = t
|
||||
data['modified'] = t
|
||||
data['due'] = t
|
||||
data.update(card.__dict__)
|
||||
data['tags'] = u""
|
||||
self.cardIds.append(data['id'])
|
||||
data['combinedDue'] = data['due']
|
||||
if data.get('successive', 0):
|
||||
t = 1
|
||||
elif data.get('reps', 0):
|
||||
t = 0
|
||||
else:
|
||||
t = 2
|
||||
data['type'] = t
|
||||
data['queue'] = t
|
||||
return data
|
||||
|
||||
def stripInvalid(self, cards):
|
||||
return [c for c in cards if self.cardIsValid(c)]
|
||||
|
||||
def cardIsValid(self, card):
|
||||
fieldNum = len(card.fields)
|
||||
for n in range(len(self.mapping)):
|
||||
if self.mapping[n] and self.mapping[n].required:
|
||||
if fieldNum <= n or not card.fields[n].strip():
|
||||
self.log.append("Fact is missing field '%s': %s" %
|
||||
(self.mapping[n].name,
|
||||
", ".join(card.fields)))
|
||||
return False
|
||||
return True
|
||||
|
||||
def stripOrTagDupes(self, cards):
|
||||
# build a cache of items
|
||||
self.uniqueCache = {}
|
||||
for field in self.mapping:
|
||||
if field and field.unique:
|
||||
self.uniqueCache[field.id] = self.getUniqueCache(field)
|
||||
return [c for c in cards if self.cardIsUnique(c)]
|
||||
|
||||
def getUniqueCache(self, field):
|
||||
"Return a dict with all fields, to test for uniqueness."
|
||||
return dict(self.deck.db.all(
|
||||
"select value, 1 from fields where fieldModelId = :fmid",
|
||||
fmid=field.id))
|
||||
|
||||
def cardIsUnique(self, card):
|
||||
fieldsAsTags = []
|
||||
for n in range(len(self.mapping)):
|
||||
if self.mapping[n] and self.mapping[n].unique:
|
||||
if card.fields[n] in self.uniqueCache[self.mapping[n].id]:
|
||||
if not self.tagDuplicates:
|
||||
self.log.append("Fact has duplicate '%s': %s" %
|
||||
(self.mapping[n].name,
|
||||
", ".join(card.fields)))
|
||||
return False
|
||||
fieldsAsTags.append(self.mapping[n].name.replace(" ", "-"))
|
||||
else:
|
||||
self.uniqueCache[self.mapping[n].id][card.fields[n]] = 1
|
||||
if fieldsAsTags:
|
||||
card.tags += u" Duplicate:" + (
|
||||
"+".join(fieldsAsTags))
|
||||
card.tags = canonifyTags(card.tags)
|
||||
return True
|
||||
|
||||
# Export modules
|
||||
##########################################################################
|
||||
|
||||
from anki.importing.csvfile import TextImporter
|
||||
from anki.importing.anki10 import Anki10Importer
|
||||
from anki.importing.anki2 import Anki2Importer
|
||||
from anki.importing.anki1 import Anki1Importer
|
||||
from anki.importing.supermemo_xml import SupermemoXmlImporter
|
||||
from anki.lang import _
|
||||
|
||||
Importers = (
|
||||
(_("Text separated by tabs or semicolons (*)"), TextImporter),
|
||||
(_("Anki Deck (*.anki)"), Anki10Importer),
|
||||
(_("Text separated by tabs or semicolons (*.txt,*.csv)"), TextImporter),
|
||||
(_("Anki 2.0 Deck (*.anki2)"), Anki2Importer),
|
||||
(_("Anki 1.2 Deck (*.anki)"), Anki1Importer),
|
||||
(_("Supermemo XML export (*.xml)"), SupermemoXmlImporter),
|
||||
)
|
||||
|
|
|
@ -3,17 +3,17 @@
|
|||
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
from anki import Deck
|
||||
from anki.importing import Importer
|
||||
from anki.importing.base import Importer
|
||||
from anki.lang import _
|
||||
from anki.utils import ids2str
|
||||
#from anki.deck import NEW_CARDS_RANDOM
|
||||
import time
|
||||
|
||||
class Anki10Importer(Importer):
|
||||
class Anki1Importer(Importer):
|
||||
|
||||
needMapper = False
|
||||
|
||||
def doImport(self):
|
||||
def run(self):
|
||||
"Import."
|
||||
random = self.deck.newCardOrder == NEW_CARDS_RANDOM
|
||||
num = 4
|
186
anki/importing/anki2.py
Normal file
186
anki/importing/anki2.py
Normal file
|
@ -0,0 +1,186 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright: Damien Elmes <anki@ichi2.net>
|
||||
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
from anki import Deck
|
||||
from anki.utils import intTime
|
||||
from anki.importing.base import Importer
|
||||
|
||||
#
|
||||
# Import a .anki2 file into the current deck. Used for migration from 1.x,
|
||||
# shared decks, and import from a packaged deck.
|
||||
#
|
||||
# We can't rely on internal ids, so we:
|
||||
# - compare facts by guid
|
||||
# - compare models by schema signature
|
||||
# - compare cards by fact guid + ordinal
|
||||
# - compare groups by name
|
||||
#
|
||||
#
|
||||
# When importing facts
|
||||
|
||||
class Anki2Importer(Importer):
|
||||
|
||||
needMapper = False
|
||||
groupPrefix = None
|
||||
|
||||
def run(self):
|
||||
"Import."
|
||||
self.dst = self.deck
|
||||
self.src = Deck(self.file, queue=False)
|
||||
try:
|
||||
self._import()
|
||||
finally:
|
||||
self.src.close(save=False)
|
||||
|
||||
def _import(self):
|
||||
self._groups = {}
|
||||
self._prepareTS()
|
||||
self._prepareModels()
|
||||
self._importFacts()
|
||||
self._importCards()
|
||||
|
||||
# Facts
|
||||
######################################################################
|
||||
# - should note new for wizard
|
||||
|
||||
def _importFacts(self):
|
||||
# build guid -> (id,mod,mid) hash
|
||||
self._facts = {}
|
||||
for id, guid, mod, mid in self.dst.db.execute(
|
||||
"select id, guid, mod, mid from facts"):
|
||||
self._facts[guid] = (id, mod, mid)
|
||||
# iterate over source deck
|
||||
add = []
|
||||
dirty = []
|
||||
for fact in self.src.db.execute(
|
||||
"select * from facts"):
|
||||
# turn the db result into a mutable list
|
||||
fact = list(fact)
|
||||
guid, mid = fact[1:3]
|
||||
# missing from local deck?
|
||||
if guid not in self._facts:
|
||||
# get corresponding local model
|
||||
lmid = self._mid(mid)
|
||||
# rewrite internal ids, models, etc
|
||||
fact[0] = self.ts()
|
||||
fact[2] = lmid
|
||||
fact[3] = self._gid(fact[3])
|
||||
fact[4] = intTime()
|
||||
fact[5] = -1 # usn
|
||||
add.append(fact)
|
||||
dirty.append(fact[0])
|
||||
# note we have the added fact
|
||||
self._facts[guid] = (fact[0], fact[4], fact[2])
|
||||
else:
|
||||
continue #raise Exception("merging facts nyi")
|
||||
# add to deck
|
||||
self.dst.db.executemany(
|
||||
"insert or replace into facts values (?,?,?,?,?,?,?,?,?,?,?)",
|
||||
add)
|
||||
self.dst.updateFieldCache(dirty)
|
||||
self.dst.tags.registerFacts(dirty)
|
||||
|
||||
# Models
|
||||
######################################################################
|
||||
|
||||
def _prepareModels(self):
|
||||
"Prepare index of schema hashes."
|
||||
self._srcModels = {}
|
||||
self._dstModels = {}
|
||||
self._dstHashes = {}
|
||||
for m in self.dst.models.all():
|
||||
h = self.dst.models.scmhash(m)
|
||||
mid = int(m['id'])
|
||||
self._dstHashes[h] = mid
|
||||
self._dstModels[mid] = h
|
||||
for m in self.src.models.all():
|
||||
mid = int(m['id'])
|
||||
self._srcModels[mid] = self.src.models.scmhash(m)
|
||||
|
||||
def _mid(self, mid):
|
||||
"Return local id for remote MID."
|
||||
hash = self._srcModels[mid]
|
||||
dmid = self._dstHashes.get(hash)
|
||||
if dmid:
|
||||
# dst deck already has this model
|
||||
return dmid
|
||||
# need to add to local and update index
|
||||
m = self.dst.models._add(self.src.models.get(mid))
|
||||
h = self.dst.models.scmhash(m)
|
||||
mid = int(m['id'])
|
||||
self._dstModels[mid] = h
|
||||
self._dstHashes[h] = mid
|
||||
return mid
|
||||
|
||||
# Groups
|
||||
######################################################################
|
||||
|
||||
def _gid(self, gid):
|
||||
"Given gid in src deck, return local id."
|
||||
# already converted?
|
||||
if gid in self._groups:
|
||||
return self._groups[gid]
|
||||
# get the name in src
|
||||
g = self.src.groups.get(gid)
|
||||
name = g['name']
|
||||
# add prefix if necessary
|
||||
if self.groupPrefix:
|
||||
name = self.groupPrefix + "::" + name
|
||||
# create in local
|
||||
newid = self.dst.groups.id(name)
|
||||
# add to group map and return
|
||||
self._groups[gid] = newid
|
||||
return newid
|
||||
|
||||
# Cards
|
||||
######################################################################
|
||||
|
||||
def _importCards(self):
|
||||
# build map of (guid, ord) -> cid
|
||||
self._cards = {}
|
||||
for guid, ord, cid in self.dst.db.execute(
|
||||
"select f.guid, c.ord, c.id from cards c, facts f "
|
||||
"where c.fid = f.id"):
|
||||
self._cards[(guid, ord)] = cid
|
||||
# loop through src
|
||||
cards = []
|
||||
revlog = []
|
||||
for card in self.src.db.execute(
|
||||
"select f.guid, f.mid, c.* from cards c, facts f "
|
||||
"where c.fid = f.id"):
|
||||
guid = card[0]
|
||||
shash = self._srcModels[card[1]]
|
||||
# does the card's fact exist in dst deck?
|
||||
if guid not in self._facts:
|
||||
continue
|
||||
dfid = self._facts[guid]
|
||||
# does the fact share the same schema?
|
||||
mid = self._facts[guid][2]
|
||||
if shash != self._dstModels[mid]:
|
||||
continue
|
||||
# does the card already exist in the dst deck?
|
||||
ord = card[5]
|
||||
if (guid, ord) in self._cards:
|
||||
# fixme: in future, could update if newer mod time
|
||||
continue
|
||||
# doesn't exist. strip off fact info, and save src id for later
|
||||
card = list(card[2:])
|
||||
scid = card[0]
|
||||
# update cid, fid, etc
|
||||
card[0] = self.ts()
|
||||
card[1] = self._facts[guid][0]
|
||||
card[2] = self._gid(card[2])
|
||||
card[4] = intTime()
|
||||
cards.append(card)
|
||||
# we need to import revlog, rewriting card ids
|
||||
for rev in self.src.db.execute(
|
||||
"select * from revlog where cid = ?", scid):
|
||||
rev = list(rev)
|
||||
rev[1] = card[0]
|
||||
revlog.append(rev)
|
||||
# apply
|
||||
self.dst.db.executemany("""
|
||||
insert into cards values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", cards)
|
||||
self.dst.db.executemany("""
|
||||
insert into revlog values (?,?,?,?,?,?,?,?,?)""", revlog)
|
38
anki/importing/base.py
Normal file
38
anki/importing/base.py
Normal file
|
@ -0,0 +1,38 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright: Damien Elmes <anki@ichi2.net>
|
||||
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
from anki.utils import intTime
|
||||
|
||||
# Base importer
|
||||
##########################################################################
|
||||
|
||||
class Importer(object):
|
||||
|
||||
needMapper = False
|
||||
|
||||
def __init__(self, deck, file):
|
||||
self.file = file
|
||||
self.log = []
|
||||
self.deck = deck
|
||||
self.total = 0
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
# Timestamps
|
||||
######################################################################
|
||||
# It's too inefficient to check for existing ids on every object,
|
||||
# and a previous import may have created timestamps in the future, so we
|
||||
# need to make sure our starting point is safe.
|
||||
|
||||
def _prepareTS(self):
|
||||
now = intTime(1000)
|
||||
for tbl in "cards", "facts":
|
||||
now = max(now, self.dst.db.scalar(
|
||||
"select max(id) from %s" % tbl))
|
||||
self._ts = now
|
||||
|
||||
def ts(self):
|
||||
self._ts += 1
|
||||
return self._ts
|
321
anki/importing/cardimp.py
Normal file
321
anki/importing/cardimp.py
Normal file
|
@ -0,0 +1,321 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright: Damien Elmes <anki@ichi2.net>
|
||||
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
import time
|
||||
from anki.lang import _
|
||||
from anki.utils import fieldChecksum, ids2str
|
||||
from anki.errors import *
|
||||
from anki.importing.base import Importer
|
||||
#from anki.deck import NEW_CARDS_RANDOM
|
||||
|
||||
# Stores a list of fields, tags, and optionally properties like 'ivl'
|
||||
######################################################################
|
||||
|
||||
class ForeignCard(object):
|
||||
"An temporary object storing fields and attributes."
|
||||
def __init__(self):
|
||||
self.fields = []
|
||||
self.tags = u""
|
||||
|
||||
# Base class for csv/supermemo/etc importers
|
||||
######################################################################
|
||||
|
||||
class CardImporter(Importer):
|
||||
|
||||
needMapper = True
|
||||
tagDuplicates = False
|
||||
# if set, update instead of regular importing
|
||||
# (foreignCardFieldIndex, fieldModelId)
|
||||
updateKey = None
|
||||
needDelimiter = False
|
||||
|
||||
def __init__(self, deck, file):
|
||||
Importer.__init__(self, deck, file)
|
||||
self._model = deck.currentModel
|
||||
self.tagsToAdd = u""
|
||||
self._mapping = None
|
||||
|
||||
def run(self):
|
||||
"Import."
|
||||
if self.updateKey is not None:
|
||||
return self.doUpdate()
|
||||
random = self.deck.newCardOrder == NEW_CARDS_RANDOM
|
||||
num = 6
|
||||
if random:
|
||||
num += 1
|
||||
c = self.foreignCards()
|
||||
if self.importCards(c):
|
||||
self.deck.updateCardTags(self.cardIds)
|
||||
if random:
|
||||
self.deck.randomizeNewCards(self.cardIds)
|
||||
if c:
|
||||
self.deck.setModified()
|
||||
|
||||
def doUpdate(self):
|
||||
# grab the data from the external file
|
||||
cards = self.foreignCards()
|
||||
# grab data from db
|
||||
fields = self.deck.db.all("""
|
||||
select factId, value from fields where fieldModelId = :id
|
||||
and value != ''""",
|
||||
id=self.updateKey[1])
|
||||
# hash it
|
||||
vhash = {}
|
||||
fids = []
|
||||
for (fid, val) in fields:
|
||||
fids.append(fid)
|
||||
vhash[val] = fid
|
||||
# prepare tags
|
||||
tagsIdx = None
|
||||
try:
|
||||
tagsIdx = self.mapping.index(0)
|
||||
for c in cards:
|
||||
c.tags = canonifyTags(self.tagsToAdd + " " + c.fields[tagsIdx])
|
||||
except ValueError:
|
||||
pass
|
||||
# look for matches
|
||||
upcards = []
|
||||
newcards = []
|
||||
for c in cards:
|
||||
v = c.fields[self.updateKey[0]]
|
||||
if v in vhash:
|
||||
# ignore empty keys
|
||||
if v:
|
||||
# fid, card
|
||||
upcards.append((vhash[v], c))
|
||||
else:
|
||||
newcards.append(c)
|
||||
# update fields
|
||||
for fm in self.model.fieldModels:
|
||||
if fm.id == self.updateKey[1]:
|
||||
# don't update key
|
||||
continue
|
||||
try:
|
||||
index = self.mapping.index(fm)
|
||||
except ValueError:
|
||||
# not mapped
|
||||
continue
|
||||
data = [{'fid': fid,
|
||||
'fmid': fm.id,
|
||||
'v': c.fields[index],
|
||||
'chk': self.maybeChecksum(c.fields[index], fm.unique)}
|
||||
for (fid, c) in upcards]
|
||||
self.deck.db.execute("""
|
||||
update fields set value = :v, chksum = :chk where factId = :fid
|
||||
and fieldModelId = :fmid""", data)
|
||||
# update tags
|
||||
if tagsIdx is not None:
|
||||
data = [{'fid': fid,
|
||||
't': c.fields[tagsIdx]}
|
||||
for (fid, c) in upcards]
|
||||
self.deck.db.execute(
|
||||
"update facts set tags = :t where id = :fid",
|
||||
data)
|
||||
# rebuild caches
|
||||
cids = self.deck.db.column0(
|
||||
"select id from cards where factId in %s" %
|
||||
ids2str(fids))
|
||||
self.deck.updateCardTags(cids)
|
||||
self.deck.updateCardsFromFactIds(fids)
|
||||
self.total = len(cards)
|
||||
self.deck.setModified()
|
||||
|
||||
def fields(self):
|
||||
"The number of fields."
|
||||
return 0
|
||||
|
||||
def maybeChecksum(self, data, unique):
|
||||
if not unique:
|
||||
return ""
|
||||
return fieldChecksum(data)
|
||||
|
||||
def foreignCards(self):
|
||||
"Return a list of foreign cards for importing."
|
||||
assert 0
|
||||
|
||||
def resetMapping(self):
|
||||
"Reset mapping to default."
|
||||
numFields = self.fields()
|
||||
m = [f for f in self.model.fieldModels]
|
||||
m.append(0)
|
||||
rem = max(0, self.fields() - len(m))
|
||||
m += [None] * rem
|
||||
del m[numFields:]
|
||||
self._mapping = m
|
||||
|
||||
def getMapping(self):
|
||||
if not self._mapping:
|
||||
self.resetMapping()
|
||||
return self._mapping
|
||||
|
||||
def setMapping(self, mapping):
|
||||
self._mapping = mapping
|
||||
|
||||
mapping = property(getMapping, setMapping)
|
||||
|
||||
def getModel(self):
|
||||
return self._model
|
||||
|
||||
def setModel(self, model):
|
||||
self._model = model
|
||||
# update the mapping for the new model
|
||||
self._mapping = None
|
||||
self.getMapping()
|
||||
|
||||
model = property(getModel, setModel)
|
||||
|
||||
def importCards(self, cards):
|
||||
"Convert each card into a fact, apply attributes and add to deck."
|
||||
# ensure all unique and required fields are mapped
|
||||
for fm in self.model.fieldModels:
|
||||
if fm.required or fm.unique:
|
||||
if fm not in self.mapping:
|
||||
raise ImportFormatError(
|
||||
type="missingRequiredUnique",
|
||||
info=_("Missing required/unique field '%(field)s'") %
|
||||
{'field': fm.name})
|
||||
active = 0
|
||||
for cm in self.model.cardModels:
|
||||
if cm.active: active += 1
|
||||
# strip invalid cards
|
||||
cards = self.stripInvalid(cards)
|
||||
cards = self.stripOrTagDupes(cards)
|
||||
self.cardIds = []
|
||||
if cards:
|
||||
self.addCards(cards)
|
||||
return cards
|
||||
|
||||
def addCards(self, cards):
|
||||
"Add facts in bulk from foreign cards."
|
||||
# map tags field to attr
|
||||
try:
|
||||
idx = self.mapping.index(0)
|
||||
for c in cards:
|
||||
c.tags += " " + c.fields[idx]
|
||||
except ValueError:
|
||||
pass
|
||||
# add facts
|
||||
factIds = [genID() for n in range(len(cards))]
|
||||
factCreated = {}
|
||||
def fudgeCreated(d, tmp=[]):
|
||||
if not tmp:
|
||||
tmp.append(time.time())
|
||||
else:
|
||||
tmp[0] += 0.0001
|
||||
d['created'] = tmp[0]
|
||||
factCreated[d['id']] = d['created']
|
||||
return d
|
||||
self.deck.db.execute(factsTable.insert(),
|
||||
[fudgeCreated({'modelId': self.model.id,
|
||||
'tags': canonifyTags(self.tagsToAdd + " " + cards[n].tags),
|
||||
'id': factIds[n]}) for n in range(len(cards))])
|
||||
self.deck.db.execute("""
|
||||
delete from factsDeleted
|
||||
where factId in (%s)""" % ",".join([str(s) for s in factIds]))
|
||||
# add all the fields
|
||||
for fm in self.model.fieldModels:
|
||||
try:
|
||||
index = self.mapping.index(fm)
|
||||
except ValueError:
|
||||
index = None
|
||||
data = [{'factId': factIds[m],
|
||||
'fieldModelId': fm.id,
|
||||
'ordinal': fm.ordinal,
|
||||
'id': genID(),
|
||||
'value': (index is not None and
|
||||
cards[m].fields[index] or u""),
|
||||
'chksum': self.maybeChecksum(
|
||||
index is not None and
|
||||
cards[m].fields[index] or u"", fm.unique)
|
||||
}
|
||||
for m in range(len(cards))]
|
||||
self.deck.db.execute(fieldsTable.insert(),
|
||||
data)
|
||||
# and cards
|
||||
active = 0
|
||||
for cm in self.model.cardModels:
|
||||
if cm.active:
|
||||
active += 1
|
||||
data = [self.addMeta({
|
||||
'id': genID(),
|
||||
'factId': factIds[m],
|
||||
'factCreated': factCreated[factIds[m]],
|
||||
'cardModelId': cm.id,
|
||||
'ordinal': cm.ordinal,
|
||||
'question': u"",
|
||||
'answer': u""
|
||||
},cards[m]) for m in range(len(cards))]
|
||||
self.deck.db.execute(cardsTable.insert(),
|
||||
data)
|
||||
self.deck.updateCardsFromFactIds(factIds)
|
||||
self.total = len(factIds)
|
||||
|
||||
def addMeta(self, data, card):
|
||||
"Add any scheduling metadata to cards"
|
||||
if 'fields' in card.__dict__:
|
||||
del card.fields
|
||||
t = data['factCreated'] + data['ordinal'] * 0.00001
|
||||
data['created'] = t
|
||||
data['modified'] = t
|
||||
data['due'] = t
|
||||
data.update(card.__dict__)
|
||||
data['tags'] = u""
|
||||
self.cardIds.append(data['id'])
|
||||
data['combinedDue'] = data['due']
|
||||
if data.get('successive', 0):
|
||||
t = 1
|
||||
elif data.get('reps', 0):
|
||||
t = 0
|
||||
else:
|
||||
t = 2
|
||||
data['type'] = t
|
||||
data['queue'] = t
|
||||
return data
|
||||
|
||||
def stripInvalid(self, cards):
|
||||
return [c for c in cards if self.cardIsValid(c)]
|
||||
|
||||
def cardIsValid(self, card):
|
||||
fieldNum = len(card.fields)
|
||||
for n in range(len(self.mapping)):
|
||||
if self.mapping[n] and self.mapping[n].required:
|
||||
if fieldNum <= n or not card.fields[n].strip():
|
||||
self.log.append("Fact is missing field '%s': %s" %
|
||||
(self.mapping[n].name,
|
||||
", ".join(card.fields)))
|
||||
return False
|
||||
return True
|
||||
|
||||
def stripOrTagDupes(self, cards):
|
||||
# build a cache of items
|
||||
self.uniqueCache = {}
|
||||
for field in self.mapping:
|
||||
if field and field.unique:
|
||||
self.uniqueCache[field.id] = self.getUniqueCache(field)
|
||||
return [c for c in cards if self.cardIsUnique(c)]
|
||||
|
||||
def getUniqueCache(self, field):
|
||||
"Return a dict with all fields, to test for uniqueness."
|
||||
return dict(self.deck.db.all(
|
||||
"select value, 1 from fields where fieldModelId = :fmid",
|
||||
fmid=field.id))
|
||||
|
||||
def cardIsUnique(self, card):
|
||||
fieldsAsTags = []
|
||||
for n in range(len(self.mapping)):
|
||||
if self.mapping[n] and self.mapping[n].unique:
|
||||
if card.fields[n] in self.uniqueCache[self.mapping[n].id]:
|
||||
if not self.tagDuplicates:
|
||||
self.log.append("Fact has duplicate '%s': %s" %
|
||||
(self.mapping[n].name,
|
||||
", ".join(card.fields)))
|
||||
return False
|
||||
fieldsAsTags.append(self.mapping[n].name.replace(" ", "-"))
|
||||
else:
|
||||
self.uniqueCache[self.mapping[n].id][card.fields[n]] = 1
|
||||
if fieldsAsTags:
|
||||
card.tags += u" Duplicate:" + (
|
||||
"+".join(fieldsAsTags))
|
||||
card.tags = canonifyTags(card.tags)
|
||||
return True
|
|
@ -3,11 +3,11 @@
|
|||
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
|
||||
|
||||
import codecs, csv, re
|
||||
from anki.importing import Importer, ForeignCard
|
||||
from anki.importing.cardimp import CardImporter, ForeignCard
|
||||
from anki.lang import _
|
||||
from anki.errors import *
|
||||
|
||||
class TextImporter(Importer):
|
||||
class TextImporter(CardImporter):
|
||||
|
||||
needDelimiter = True
|
||||
patterns = ("\t", ";")
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
import sys
|
||||
|
||||
from anki.importing import Importer, ForeignCard
|
||||
from anki.importing.cardimp import CardImporter, ForeignCard
|
||||
from anki.lang import _
|
||||
from anki.errors import *
|
||||
|
||||
|
@ -63,7 +63,7 @@ class SuperMemoElement(SmartDict):
|
|||
|
||||
|
||||
# This is an AnkiImporter
|
||||
class SupermemoXmlImporter(Importer):
|
||||
class SupermemoXmlImporter(CardImporter):
|
||||
"""
|
||||
Supermemo XML export's to Anki parser.
|
||||
Goes through a SM collection and fetch all elements.
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
import simplejson, copy
|
||||
from anki.utils import intTime, hexifyID, joinFields, splitFields, ids2str, \
|
||||
timestampID
|
||||
timestampID, fieldChecksum
|
||||
from anki.lang import _
|
||||
from anki.consts import *
|
||||
|
||||
|
@ -418,3 +418,15 @@ select id from facts where mid = ?)""" % " ".join(map),
|
|||
"update cards set ord=:new,usn=:u,mod=:m where id=:cid",
|
||||
d)
|
||||
self.deck.remCards(deleted)
|
||||
|
||||
# Schema hash
|
||||
##########################################################################
|
||||
|
||||
def scmhash(self, m):
|
||||
"Return a hash of the schema, to see if models are compatible."
|
||||
s = m['name']
|
||||
for f in m['flds']:
|
||||
s += f['name']
|
||||
for t in m['tmpls']:
|
||||
s += t['name']
|
||||
return fieldChecksum(s)
|
||||
|
|
|
@ -1,139 +0,0 @@
|
|||
# coding: utf-8
|
||||
|
||||
import nose, os, shutil
|
||||
from tests.shared import assertException
|
||||
|
||||
from anki.errors import *
|
||||
from anki import Deck
|
||||
from anki.importing import anki10, csvfile, mnemosyne10, supermemo_xml, dingsbums
|
||||
from anki.stdmodels import BasicModel
|
||||
from anki.facts import Fact
|
||||
from anki.sync import SyncClient, SyncServer
|
||||
|
||||
from anki.db import *
|
||||
|
||||
testDir = os.path.dirname(__file__)
|
||||
|
||||
def test_csv():
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/text-2fields.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
i.doImport()
|
||||
# four problems - missing front, dupe front, wrong num of fields
|
||||
assert len(i.log) == 4
|
||||
assert i.total == 5
|
||||
deck.close()
|
||||
|
||||
def test_csv_tags():
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/text-tags.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
i.doImport()
|
||||
facts = deck.db.query(Fact).all()
|
||||
assert len(facts) == 2
|
||||
assert facts[0].tags == "baz qux" or facts[1].tags == "baz qux"
|
||||
deck.close()
|
||||
|
||||
def test_mnemosyne10():
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/test.mem"))
|
||||
i = mnemosyne10.Mnemosyne10Importer(deck, file)
|
||||
i.doImport()
|
||||
assert i.total == 5
|
||||
deck.close()
|
||||
|
||||
def test_supermemo_xml_01_unicode():
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/supermemo1.xml"))
|
||||
i = supermemo_xml.SupermemoXmlImporter(deck, file)
|
||||
#i.META.logToStdOutput = True
|
||||
i.doImport()
|
||||
# only returning top-level elements?
|
||||
assert i.total == 1
|
||||
deck.close()
|
||||
|
||||
def test_anki10():
|
||||
# though these are not modified, sqlite updates the mtime, so copy to tmp
|
||||
# first
|
||||
file_ = unicode(os.path.join(testDir, "importing/test10.anki"))
|
||||
file = "/tmp/test10.anki"
|
||||
shutil.copy(file_, file)
|
||||
file2_ = unicode(os.path.join(testDir, "importing/test10-2.anki"))
|
||||
file2 = "/tmp/test10-2.anki"
|
||||
shutil.copy(file2_, file2)
|
||||
deck = Deck()
|
||||
i = anki10.Anki10Importer(deck, file)
|
||||
i.doImport()
|
||||
assert i.total == 2
|
||||
deck.db.rollback()
|
||||
deck.close()
|
||||
# import a deck into itself - 10-2 is the same as test10, but with one
|
||||
# card answered and another deleted. nothing should be synced to client
|
||||
deck = Deck(file, backup=False)
|
||||
i = anki10.Anki10Importer(deck, file2)
|
||||
i.doImport()
|
||||
assert i.total == 0
|
||||
deck.db.rollback()
|
||||
|
||||
def test_anki10_modtime():
|
||||
deck1 = Deck()
|
||||
deck2 = Deck()
|
||||
client = SyncClient(deck1)
|
||||
server = SyncServer(deck2)
|
||||
client.setServer(server)
|
||||
deck1.addModel(BasicModel())
|
||||
f = deck1.newFact()
|
||||
f['Front'] = u"foo"; f['Back'] = u"bar"
|
||||
deck1.addFact(f)
|
||||
assert deck1.cardCount() == 1
|
||||
assert deck2.cardCount() == 0
|
||||
client.sync()
|
||||
assert deck1.cardCount() == 1
|
||||
assert deck2.cardCount() == 1
|
||||
file_ = unicode(os.path.join(testDir, "importing/test10-3.anki"))
|
||||
file = "/tmp/test10-3.anki"
|
||||
shutil.copy(file_, file)
|
||||
i = anki10.Anki10Importer(deck1, file)
|
||||
i.doImport()
|
||||
client.sync()
|
||||
assert i.total == 1
|
||||
assert deck2.db.scalar("select count(*) from cards") == 2
|
||||
assert deck2.db.scalar("select count(*) from facts") == 2
|
||||
assert deck2.db.scalar("select count(*) from models") == 2
|
||||
|
||||
def test_dingsbums():
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
startNumberOfFacts = deck.factCount()
|
||||
file = unicode(os.path.join(testDir, "importing/dingsbums.xml"))
|
||||
i = dingsbums.DingsBumsImporter(deck, file)
|
||||
i.doImport()
|
||||
assert 7 == i.total
|
||||
deck.close()
|
||||
|
||||
def test_updating():
|
||||
# get the standard csv deck first
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/text-2fields.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
i.doImport()
|
||||
# now update
|
||||
file = unicode(os.path.join(testDir, "importing/text-update.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
# first field
|
||||
i.updateKey = (0, deck.currentModel.fieldModels[0].id)
|
||||
i.multipleCardsAllowed = False
|
||||
i.doImport()
|
||||
ans = deck.db.scalar(
|
||||
u"select answer from cards where question like '%食べる%'")
|
||||
assert "to ate" in ans
|
||||
# try again with tags
|
||||
i.updateKey = (0, deck.currentModel.fieldModels[0].id)
|
||||
i.mapping[1] = 0
|
||||
i.doImport()
|
||||
deck.close()
|
74
tests/test_importing.py
Normal file
74
tests/test_importing.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
# coding: utf-8
|
||||
|
||||
import nose, os, shutil
|
||||
from tests.shared import assertException
|
||||
|
||||
from anki.errors import *
|
||||
from anki import Deck
|
||||
from anki.importing import Anki1Importer, Anki2Importer, TextImporter, \
|
||||
SupermemoXmlImporter
|
||||
from anki.facts import Fact
|
||||
|
||||
from anki.db import *
|
||||
|
||||
testDir = os.path.dirname(__file__)
|
||||
|
||||
def test_csv():
|
||||
print "disabled"; return
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/text-2fields.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
i.run()
|
||||
# four problems - missing front, dupe front, wrong num of fields
|
||||
assert len(i.log) == 4
|
||||
assert i.total == 5
|
||||
deck.close()
|
||||
|
||||
def test_csv_tags():
|
||||
print "disabled"; return
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/text-tags.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
i.run()
|
||||
facts = deck.db.query(Fact).all()
|
||||
assert len(facts) == 2
|
||||
assert facts[0].tags == "baz qux" or facts[1].tags == "baz qux"
|
||||
deck.close()
|
||||
|
||||
def test_supermemo_xml_01_unicode():
|
||||
print "disabled"; return
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/supermemo1.xml"))
|
||||
i = supermemo_xml.SupermemoXmlImporter(deck, file)
|
||||
#i.META.logToStdOutput = True
|
||||
i.run()
|
||||
# only returning top-level elements?
|
||||
assert i.total == 1
|
||||
deck.close()
|
||||
|
||||
def test_updating():
|
||||
print "disabled"; return
|
||||
# get the standard csv deck first
|
||||
deck = Deck()
|
||||
deck.addModel(BasicModel())
|
||||
file = unicode(os.path.join(testDir, "importing/text-2fields.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
i.run()
|
||||
# now update
|
||||
file = unicode(os.path.join(testDir, "importing/text-update.txt"))
|
||||
i = csvfile.TextImporter(deck, file)
|
||||
# first field
|
||||
i.updateKey = (0, deck.currentModel.fieldModels[0].id)
|
||||
i.multipleCardsAllowed = False
|
||||
i.run()
|
||||
ans = deck.db.scalar(
|
||||
u"select answer from cards where question like '%食べる%'")
|
||||
assert "to ate" in ans
|
||||
# try again with tags
|
||||
i.updateKey = (0, deck.currentModel.fieldModels[0].id)
|
||||
i.mapping[1] = 0
|
||||
i.run()
|
||||
deck.close()
|
|
@ -24,6 +24,10 @@ def test_modelCopy():
|
|||
assert len(m2['flds']) == len(m['flds'])
|
||||
assert len(m['tmpls']) == 2
|
||||
assert len(m2['tmpls']) == 2
|
||||
# name changed
|
||||
assert deck.models.scmhash(m) != deck.models.scmhash(m2)
|
||||
m2['name'] = "Basic"
|
||||
assert deck.models.scmhash(m) == deck.models.scmhash(m2)
|
||||
|
||||
def test_fields():
|
||||
d = getEmptyDeck()
|
||||
|
@ -35,11 +39,13 @@ def test_fields():
|
|||
# make sure renaming a field updates the templates
|
||||
d.models.renameField(m, m['flds'][0], "NewFront")
|
||||
assert m['tmpls'][0]['qfmt'] == "{{NewFront}}"
|
||||
h = d.models.scmhash(m)
|
||||
# add a field
|
||||
f = d.models.newField(m)
|
||||
f['name'] = "foo"
|
||||
d.models.addField(m, f)
|
||||
assert d.getFact(d.models.fids(m)[0]).fields == ["1", "2", ""]
|
||||
assert d.models.scmhash(m) != h
|
||||
# rename it
|
||||
d.models.renameField(m, f, "bar")
|
||||
assert d.getFact(d.models.fids(m)[0])['bar'] == ''
|
||||
|
|
|
@ -25,7 +25,7 @@ def test_graphs_empty():
|
|||
|
||||
def test_graphs():
|
||||
from anki import Deck
|
||||
d = Deck(os.path.expanduser("~/test.anki"))
|
||||
d = Deck(os.path.expanduser("~/test.anki2"))
|
||||
g = d.stats()
|
||||
rep = g.report()
|
||||
open(os.path.expanduser("~/test.html"), "w").write(rep)
|
||||
|
|
|
@ -2,8 +2,10 @@
|
|||
|
||||
import datetime
|
||||
from anki.consts import *
|
||||
from shared import getUpgradeDeckPath
|
||||
from shared import getUpgradeDeckPath, getEmptyDeck
|
||||
from anki.upgrade import Upgrader
|
||||
from anki.importing import Anki2Importer
|
||||
from anki.utils import ids2str
|
||||
|
||||
def test_check():
|
||||
dst = getUpgradeDeckPath()
|
||||
|
@ -28,5 +30,37 @@ def test_upgrade():
|
|||
# now's a good time to test the integrity check too
|
||||
deck.fixIntegrity()
|
||||
|
||||
def test_import():
|
||||
# get the deck to import
|
||||
tmp = getUpgradeDeckPath()
|
||||
u = Upgrader()
|
||||
src = u.upgrade(tmp)
|
||||
srcpath = src.path
|
||||
srcFacts = src.factCount()
|
||||
srcCards = src.cardCount()
|
||||
srcRev = src.db.scalar("select count() from revlog")
|
||||
src.close()
|
||||
# create a new empty deck
|
||||
dst = getEmptyDeck()
|
||||
# import src into dst
|
||||
imp = Anki2Importer(dst, srcpath)
|
||||
imp.run()
|
||||
def check():
|
||||
assert dst.factCount() == srcFacts
|
||||
assert dst.cardCount() == srcCards
|
||||
assert srcRev == dst.db.scalar("select count() from revlog")
|
||||
mids = [int(x) for x in dst.models.models.keys()]
|
||||
assert not dst.db.scalar(
|
||||
"select count() from facts where mid not in "+ids2str(mids))
|
||||
assert not dst.db.scalar(
|
||||
"select count() from cards where fid not in (select id from facts)")
|
||||
assert not dst.db.scalar(
|
||||
"select count() from revlog where cid not in (select id from cards)")
|
||||
check()
|
||||
# importing should be idempotent
|
||||
imp.run()
|
||||
check()
|
||||
print dst.path
|
||||
|
||||
|
||||
|
||||
|
|
Loading…
Reference in a new issue