implement anki1 importer

This commit is contained in:
Damien Elmes 2011-10-21 23:45:42 +09:00
parent b242b06052
commit 119217290e
7 changed files with 100 additions and 110 deletions

View file

@ -2,74 +2,30 @@
# Copyright: Damien Elmes <anki@ichi2.net>
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from anki import Deck
from anki.importing.base import Importer
import traceback
from anki.lang import _
from anki.utils import ids2str
#from anki.deck import NEW_CARDS_RANDOM
import time
from anki.upgrade import Upgrader
from anki.importing.anki2 import Anki2Importer
class Anki1Importer(Importer):
needMapper = False
class Anki1Importer(Anki2Importer):
def run(self):
"Import."
random = self.deck.newCardOrder == NEW_CARDS_RANDOM
num = 4
if random:
num += 1
src = DeckStorage.Deck(self.file, backup=False)
client = SyncClient(self.deck)
server = SyncServer(src)
client.setServer(server)
# if there is a conflict, sync local -> src
client.localTime = self.deck.modified
client.remoteTime = 0
src.s.execute("update facts set modified = 1")
src.s.execute("update models set modified = 1")
src.s.execute("update cards set modified = 1")
src.s.execute("update media set created = 1")
self.deck.db.flush()
# set up a custom change list and sync
lsum = client.summary(0)
self._clearDeleted(lsum)
rsum = server.summary(0)
self._clearDeleted(rsum)
payload = client.genPayload((lsum, rsum))
# no need to add anything to src
payload['added-models'] = []
payload['added-cards'] = []
payload['added-facts'] = {'facts': [], 'fields': []}
assert payload['deleted-facts'] == []
assert payload['deleted-cards'] == []
assert payload['deleted-models'] == []
res = server.applyPayload(payload)
client.applyPayloadReply(res)
copyLocalMedia(server.deck, client.deck)
# add tags
fids = [f[0] for f in res['added-facts']['facts']]
self.deck.tags.add(fids, self.tagsToAdd)
# mark import material as newly added
self.deck.db.execute(
"update cards set modified = :t where id in %s" %
ids2str([x[0] for x in res['added-cards']]), t=time.time())
self.deck.db.execute(
"update facts set modified = :t where id in %s" %
ids2str([x[0] for x in res['added-facts']['facts']]), t=time.time())
self.deck.db.execute(
"update models set modified = :t where id in %s" %
ids2str([x['id'] for x in res['added-models']]), t=time.time())
# update total and refresh
self.total = len(res['added-facts']['facts'])
src.s.rollback()
src.engine.dispose()
# randomize?
if random:
self.deck.randomizeNewCards([x[0] for x in res['added-cards']])
self.deck.flushMod()
u = Upgrader()
# check
if not u.check(self.file):
self.log.append(_(
"File is damaged; please run Tools>Advanced>Check DB "
"in Anki 1.2 first."))
return
# upgrade
try:
deck = u.upgrade(self.file)
except:
self.log.append(traceback.format_exc())
return
# merge
deck.close()
mdir = self.file.replace(".anki", ".media")
self.file = deck.path
Anki2Importer.run(self, mdir)
def _clearDeleted(self, sum):
sum['delcards'] = []
sum['delfacts'] = []
sum['delmodels'] = []

View file

@ -16,18 +16,18 @@ from anki.importing.base import Importer
# - compare cards by fact guid + ordinal
# - compare groups by name
#
#
# When importing facts
class Anki2Importer(Importer):
needMapper = False
groupPrefix = None
def run(self):
"Import."
def run(self, media=None):
self.dst = self.deck
self.src = Deck(self.file, queue=False)
if media is not None:
# Anki1 importer has provided us with a custom media folder
self.src.media._dir = media
try:
self._import()
finally:

View file

@ -163,6 +163,8 @@ If the same name exists, compare checksums."""
# Copying on import
##########################################################################
# FIXME: check if the files are actually identical, and rewrite references
# if necessary
def copyTo(self, rdir):
ldir = self.dir()

View file

@ -4,7 +4,7 @@
import os, time, simplejson, re, datetime, shutil
from anki.lang import _
from anki.utils import intTime, namedtmp
from anki.utils import intTime, tmpfile
from anki.db import DB
from anki.deck import _Deck
from anki.consts import *
@ -108,7 +108,7 @@ class Upgrader(object):
######################################################################
def _openDB(self, path):
self.tmppath = namedtmp(os.path.basename(path))
(fd, self.tmppath) = tmpfile(suffix=".anki2")
shutil.copy(path, self.tmppath)
self.db = DB(self.tmppath)

View file

@ -225,7 +225,7 @@ def tmpdir():
os.mkdir(_tmpdir)
return _tmpdir
def tmpfile(prefix=None, suffix=None):
def tmpfile(prefix="", suffix=""):
return tempfile.mkstemp(dir=tmpdir(), prefix=prefix, suffix=suffix)
def namedtmp(name):

View file

@ -1,8 +1,9 @@
# coding: utf-8
import nose, os, shutil
from tests.shared import assertException
from tests.shared import assertException, getUpgradeDeckPath, getEmptyDeck
from anki.upgrade import Upgrader
from anki.utils import ids2str
from anki.errors import *
from anki import Deck
from anki.importing import Anki1Importer, Anki2Importer, TextImporter, \
@ -13,6 +14,68 @@ from anki.db import *
testDir = os.path.dirname(__file__)
srcFacts=None
srcCards=None
def test_anki2():
global srcFacts, srcCards
# get the deck to import
tmp = getUpgradeDeckPath()
u = Upgrader()
src = u.upgrade(tmp)
srcpath = src.path
srcFacts = src.factCount()
srcCards = src.cardCount()
srcRev = src.db.scalar("select count() from revlog")
# add a media file for testing
open(os.path.join(src.media.dir(), "foo.jpg"), "w").write("foo")
src.close()
# create a new empty deck
dst = getEmptyDeck()
# import src into dst
imp = Anki2Importer(dst, srcpath)
imp.run()
def check():
assert dst.factCount() == srcFacts
assert dst.cardCount() == srcCards
assert srcRev == dst.db.scalar("select count() from revlog")
mids = [int(x) for x in dst.models.models.keys()]
assert not dst.db.scalar(
"select count() from facts where mid not in "+ids2str(mids))
assert not dst.db.scalar(
"select count() from cards where fid not in (select id from facts)")
assert not dst.db.scalar(
"select count() from revlog where cid not in (select id from cards)")
check()
# importing should be idempotent
imp.run()
check()
assert len(os.listdir(dst.media.dir())) == 1
print dst.path
def test_anki1():
# get the deck path to import
tmp = getUpgradeDeckPath()
# make sure media is imported properly through the upgrade
mdir = tmp.replace(".anki", ".media")
if not os.path.exists(mdir):
os.mkdir(mdir)
open(os.path.join(mdir, "foo.jpg"), "w").write("foo")
# create a new empty deck
dst = getEmptyDeck()
# import src into dst
imp = Anki1Importer(dst, tmp)
imp.run()
def check():
assert dst.factCount() == srcFacts
assert dst.cardCount() == srcCards
assert len(os.listdir(dst.media.dir())) == 1
check()
# importing should be idempotent
imp = Anki1Importer(dst, tmp)
imp.run()
check()
def test_csv():
print "disabled"; return
deck = Deck()

View file

@ -5,7 +5,7 @@ from anki.consts import *
from shared import getUpgradeDeckPath, getEmptyDeck
from anki.upgrade import Upgrader
from anki.importing import Anki2Importer
from anki.utils import ids2str
from anki.utils import ids2str, checksum
def test_check():
dst = getUpgradeDeckPath()
@ -17,9 +17,12 @@ def test_check():
def test_upgrade():
dst = getUpgradeDeckPath()
csum = checksum(open(dst).read())
u = Upgrader()
print "upgrade to", dst
deck = u.upgrade(dst)
# src file must not have changed
assert csum == checksum(open(dst).read())
# creation time should have been adjusted
d = datetime.datetime.fromtimestamp(deck.crt)
assert d.hour == 4 and d.minute == 0
@ -29,37 +32,3 @@ def test_upgrade():
assert deck.sched.cardCounts() == (3,2,1)
# now's a good time to test the integrity check too
deck.fixIntegrity()
def test_import():
# get the deck to import
tmp = getUpgradeDeckPath()
u = Upgrader()
src = u.upgrade(tmp)
srcpath = src.path
srcFacts = src.factCount()
srcCards = src.cardCount()
srcRev = src.db.scalar("select count() from revlog")
# add a media file for testing
open(os.path.join(src.media.dir(), "foo.jpg"), "w").write("foo")
src.close()
# create a new empty deck
dst = getEmptyDeck()
# import src into dst
imp = Anki2Importer(dst, srcpath)
imp.run()
def check():
assert dst.factCount() == srcFacts
assert dst.cardCount() == srcCards
assert srcRev == dst.db.scalar("select count() from revlog")
mids = [int(x) for x in dst.models.models.keys()]
assert not dst.db.scalar(
"select count() from facts where mid not in "+ids2str(mids))
assert not dst.db.scalar(
"select count() from cards where fid not in (select id from facts)")
assert not dst.db.scalar(
"select count() from revlog where cid not in (select id from cards)")
check()
# importing should be idempotent
imp.run()
check()
print dst.path