Anki/pylib/anki/importing/noteimp.py
Abdo f94d05bcbe
Switch to Ruff (#4119)
* Add check:ruff build action

* Add fix:ruff action

* Add Ruff config

Mostly generated by Cursor

* Handle rest of lints

* Fix formatting

* Replace black and isort with ruff-format

* Run ruff-format

* Fix lint errors

* Remove pylint disables

* Remove .pylintrc

* Update docs

* Fix check:format not just checking

* Fix isort rule being ignored

* Sort imports

* Ensure ./ninja format also handles import sorting

* Remove unused isort cfg

* Enable unsafe fixes in fix:ruff, and enable unused var warning

* Re-run on config change; enable unnecessary ARG ignores

* Use all pycodestyle errors, and add some more commented-out ones

Latter logged on https://github.com/ankitects/anki/issues/4135
2025-06-29 14:38:35 +07:00

345 lines
12 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
import html
import unicodedata
from typing import Union
from anki.collection import Collection
from anki.config import Config
from anki.consts import NEW_CARDS_RANDOM, STARTING_FACTOR
from anki.importing.base import Importer
from anki.models import NotetypeId
from anki.notes import NoteId
from anki.utils import (
field_checksum,
guid64,
int_time,
join_fields,
split_fields,
timestamp_id,
)
TagMappedUpdate = tuple[int, int, str, str, NoteId, str, str]
TagModifiedUpdate = tuple[int, int, str, str, NoteId, str]
NoTagUpdate = tuple[int, int, str, NoteId, str]
Updates = Union[TagMappedUpdate, TagModifiedUpdate, NoTagUpdate]
# Stores a list of fields, tags and deck
######################################################################
class ForeignNote:
"An temporary object storing fields and attributes."
def __init__(self) -> None:
self.fields: list[str] = []
self.tags: list[str] = []
self.deck = None
self.cards: dict[int, ForeignCard] = {} # map of ord -> card
self.fieldsStr = ""
class ForeignCard:
def __init__(self) -> None:
self.due = 0
self.ivl = 1
self.factor = STARTING_FACTOR
self.reps = 0
self.lapses = 0
# Base class for CSV and similar text-based imports
######################################################################
# The mapping is list of input fields, like:
# ['Expression', 'Reading', '_tags', None]
# - None means that the input should be discarded
# - _tags maps to note tags
# If the first field of the model is not in the map, the map is invalid.
# The import mode is one of:
# UPDATE_MODE: update if first field matches existing note
# IGNORE_MODE: ignore if first field matches existing note
# ADD_MODE: import even if first field matches existing note
UPDATE_MODE = 0
IGNORE_MODE = 1
ADD_MODE = 2
class NoteImporter(Importer):
needMapper = True
needDelimiter = False
allowHTML = False
importMode = UPDATE_MODE
mapping: list[str] | None
tagModified: str | None
def __init__(self, col: Collection, file: str) -> None:
Importer.__init__(self, col, file)
self.model = col.models.current()
self.mapping = None
self.tagModified = None
self._tagsMapped = False
def run(self) -> None:
"Import."
assert self.mapping
c = self.foreignNotes()
self.importNotes(c)
def fields(self) -> int:
"The number of fields."
return 0
def initMapping(self) -> None:
flds = [f["name"] for f in self.model["flds"]]
# truncate to provided count
flds = flds[0 : self.fields()]
# if there's room left, add tags
if self.fields() > len(flds):
flds.append("_tags")
# and if there's still room left, pad
flds = flds + [None] * (self.fields() - len(flds))
self.mapping = flds
def mappingOk(self) -> bool:
return self.model["flds"][0]["name"] in self.mapping
def foreignNotes(self) -> list:
"Return a list of foreign notes for importing."
return []
def importNotes(self, notes: list[ForeignNote]) -> None:
"Convert each card into a note, apply attributes and add to col."
if not self.mappingOk():
raise Exception("mapping not ok")
# note whether tags are mapped
self._tagsMapped = False
for f in self.mapping:
if f == "_tags":
self._tagsMapped = True
# gather checks for duplicate comparison
csums: dict[str, list[NoteId]] = {}
for csum, id in self.col.db.execute(
"select csum, id from notes where mid = ?", self.model["id"]
):
if csum in csums:
csums[csum].append(id)
else:
csums[csum] = [id]
firsts: dict[str, bool] = {}
fld0idx = self.mapping.index(self.model["flds"][0]["name"])
self._fmap = self.col.models.field_map(self.model)
self._nextID = NoteId(timestamp_id(self.col.db, "notes"))
# loop through the notes
updates: list[Updates] = []
updateLog = []
new = []
self._ids: list[NoteId] = []
self._cards: list[tuple] = []
dupeCount = 0
dupes: list[str] = []
for n in notes:
for c, field in enumerate(n.fields):
if not self.allowHTML:
n.fields[c] = html.escape(field, quote=False)
n.fields[c] = field.strip()
if not self.allowHTML:
n.fields[c] = field.replace("\n", "<br>")
fld0 = unicodedata.normalize("NFC", n.fields[fld0idx])
# first field must exist
if not fld0:
self.log.append(
self.col.tr.importing_empty_first_field(val=" ".join(n.fields))
)
continue
csum = field_checksum(fld0)
# earlier in import?
if fld0 in firsts and self.importMode != ADD_MODE:
# duplicates in source file; log and ignore
self.log.append(self.col.tr.importing_appeared_twice_in_file(val=fld0))
continue
firsts[fld0] = True
# already exists?
found = False
if csum in csums: # type: ignore[comparison-overlap]
# csum is not a guarantee; have to check
for id in csums[csum]: # type: ignore[index]
flds = self.col.db.scalar("select flds from notes where id = ?", id)
sflds = split_fields(flds)
if fld0 == sflds[0]:
# duplicate
found = True
if self.importMode == UPDATE_MODE:
data = self.updateData(n, id, sflds)
if data:
updates.append(data)
updateLog.append(
self.col.tr.importing_first_field_matched(val=fld0)
)
dupeCount += 1
found = True
elif self.importMode == IGNORE_MODE:
dupeCount += 1
elif self.importMode == ADD_MODE:
# allow duplicates in this case
if fld0 not in dupes:
# only show message once, no matter how many
# duplicates are in the collection already
updateLog.append(
self.col.tr.importing_added_duplicate_with_first_field(
val=fld0,
)
)
dupes.append(fld0)
found = False
# newly add
if not found:
new_data = self.newData(n)
if new_data:
new.append(new_data)
# note that we've seen this note once already
firsts[fld0] = True
self.addNew(new)
self.addUpdates(updates)
# generate cards + update field cache
self.col.after_note_updates(self._ids, mark_modified=False)
# apply scheduling updates
self.updateCards()
# we randomize or order here, to ensure that siblings
# have the same due#
did = self.col.decks.selected()
conf = self.col.decks.config_dict_for_deck_id(did)
# in order due?
if not conf["dyn"] and conf["new"]["order"] == NEW_CARDS_RANDOM:
self.col.sched.randomize_cards(did)
part1 = self.col.tr.importing_note_added(count=len(new))
part2 = self.col.tr.importing_note_updated(count=self.updateCount)
if self.importMode == UPDATE_MODE:
unchanged = dupeCount - self.updateCount
elif self.importMode == IGNORE_MODE:
unchanged = dupeCount
else:
unchanged = 0
part3 = self.col.tr.importing_note_unchanged(count=unchanged)
self.log.append(f"{part1}, {part2}, {part3}.")
self.log.extend(updateLog)
self.total = len(self._ids)
def newData(
self, n: ForeignNote
) -> tuple[NoteId, str, NotetypeId, int, int, str, str, str, int, int, str]:
id = self._nextID
self._nextID = NoteId(self._nextID + 1)
self._ids.append(id)
self.processFields(n)
# note id for card updates later
for ord, c in list(n.cards.items()):
self._cards.append((id, ord, c))
return (
id,
guid64(),
self.model["id"],
int_time(),
self.col.usn(),
self.col.tags.join(n.tags),
n.fieldsStr,
"",
0,
0,
"",
)
def addNew(
self,
rows: list[
tuple[NoteId, str, NotetypeId, int, int, str, str, str, int, int, str]
],
) -> None:
self.col.db.executemany(
"insert or replace into notes values (?,?,?,?,?,?,?,?,?,?,?)", rows
)
def updateData(
self, n: ForeignNote, id: NoteId, sflds: list[str]
) -> Updates | None:
self._ids.append(id)
self.processFields(n, sflds)
if self._tagsMapped:
tags = self.col.tags.join(n.tags)
return (
int_time(),
self.col.usn(),
n.fieldsStr,
tags,
id,
n.fieldsStr,
tags,
)
elif self.tagModified:
tags = self.col.db.scalar("select tags from notes where id = ?", id)
tagList = self.col.tags.split(tags) + self.tagModified.split()
tags = self.col.tags.join(tagList)
return (int_time(), self.col.usn(), n.fieldsStr, tags, id, n.fieldsStr)
else:
return (int_time(), self.col.usn(), n.fieldsStr, id, n.fieldsStr)
def addUpdates(self, rows: list[Updates]) -> None:
changes = self.col.db.scalar("select total_changes()")
if self._tagsMapped:
self.col.db.executemany(
"""
update notes set mod = ?, usn = ?, flds = ?, tags = ?
where id = ? and (flds != ? or tags != ?)""",
rows,
)
elif self.tagModified:
self.col.db.executemany(
"""
update notes set mod = ?, usn = ?, flds = ?, tags = ?
where id = ? and flds != ?""",
rows,
)
else:
self.col.db.executemany(
"""
update notes set mod = ?, usn = ?, flds = ?
where id = ? and flds != ?""",
rows,
)
changes2 = self.col.db.scalar("select total_changes()")
self.updateCount = changes2 - changes
def processFields(self, note: ForeignNote, fields: list[str] | None = None) -> None:
if not fields:
fields = [""] * len(self.model["flds"])
for c, f in enumerate(self.mapping):
if not f:
continue
elif f == "_tags":
note.tags.extend(self.col.tags.split(note.fields[c]))
else:
sidx = self._fmap[f][0]
fields[sidx] = note.fields[c]
note.fieldsStr = join_fields(fields)
# temporary fix for the following issue until we can update the code:
# https://forums.ankiweb.net/t/python-checksum-rust-checksum/8195/16
if self.col.get_config_bool(Config.Bool.NORMALIZE_NOTE_TEXT):
note.fieldsStr = unicodedata.normalize("NFC", note.fieldsStr)
def updateCards(self) -> None:
data = []
for nid, ord, c in self._cards:
data.append((c.ivl, c.due, c.factor, c.reps, c.lapses, nid, ord))
# we assume any updated cards are reviews
self.col.db.executemany(
"""
update cards set type = 2, queue = 2, ivl = ?, due = ?,
factor = ?, reps = ?, lapses = ? where nid = ? and ord = ?""",
data,
)