lang/media/storage

This commit is contained in:
Damien Elmes 2019-12-20 11:27:01 +10:00
parent f74ee52c73
commit c415a70e72
3 changed files with 31 additions and 30 deletions

View file

@ -116,10 +116,10 @@ def localTranslation() -> Any:
else: else:
return currentTranslation return currentTranslation
def _(str) -> Any: def _(str: str) -> str:
return localTranslation().gettext(str) return localTranslation().gettext(str)
def ngettext(single, plural, n) -> Any: def ngettext(single: str, plural: str, n: int) -> str:
return localTranslation().ngettext(single, plural, n) return localTranslation().ngettext(single, plural, n)
def langDir() -> str: def langDir() -> str:
@ -135,7 +135,7 @@ def langDir() -> str:
dir = os.path.abspath(os.path.join(filedir, "..", "locale")) dir = os.path.abspath(os.path.join(filedir, "..", "locale"))
return dir return dir
def setLang(lang, local=True) -> None: def setLang(lang: str, local: bool = True) -> None:
lang = mungeCode(lang) lang = mungeCode(lang)
trans = gettext.translation( trans = gettext.translation(
'anki', langDir(), languages=[lang], fallback=True) 'anki', langDir(), languages=[lang], fallback=True)
@ -147,7 +147,7 @@ def setLang(lang, local=True) -> None:
currentLang = lang currentLang = lang
currentTranslation = trans currentTranslation = trans
def getLang() -> Any: def getLang() -> str:
"Return the language local to this thread, or the default." "Return the language local to this thread, or the default."
if getattr(threadLocal, 'currentLang', None): if getattr(threadLocal, 'currentLang', None):
return threadLocal.currentLang return threadLocal.currentLang
@ -158,7 +158,7 @@ def noHint(str) -> str:
"Remove translation hint from end of string." "Remove translation hint from end of string."
return re.sub(r"(^.*?)( ?\(.+?\))?$", "\\1", str) return re.sub(r"(^.*?)( ?\(.+?\))?$", "\\1", str)
def mungeCode(code) -> Any: def mungeCode(code: str) -> Any:
code = code.replace("-", "_") code = code.replace("-", "_")
if code in compatMap: if code in compatMap:
code = compatMap[code] code = compatMap[code]

View file

@ -17,9 +17,9 @@ from anki.db import DB, DBError
from anki.consts import * from anki.consts import *
from anki.latex import mungeQA from anki.latex import mungeQA
from anki.lang import _ from anki.lang import _
from typing import Any, List, Optional, Tuple, TypeVar, Union from typing import Any, List, Optional, Tuple
_T0 = TypeVar('_T0') from typing import Callable, Union
class MediaManager: class MediaManager:
@ -31,8 +31,9 @@ class MediaManager:
r"(?i)(<img[^>]* src=(?!['\"])(?P<fname>[^ >]+)[^>]*?>)", r"(?i)(<img[^>]* src=(?!['\"])(?P<fname>[^ >]+)[^>]*?>)",
] ]
regexps = soundRegexps + imgRegexps regexps = soundRegexps + imgRegexps
db: Optional[DB]
def __init__(self, col, server) -> None: def __init__(self, col, server: bool) -> None:
self.col = col self.col = col
if server: if server:
self._dir = None self._dir = None
@ -144,11 +145,11 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
########################################################################## ##########################################################################
# opath must be in unicode # opath must be in unicode
def addFile(self, opath) -> Any: def addFile(self, opath: str) -> Any:
with open(opath, "rb") as f: with open(opath, "rb") as f:
return self.writeData(opath, f.read()) return self.writeData(opath, f.read())
def writeData(self, opath, data, typeHint=None) -> Any: def writeData(self, opath: str, data: bytes, typeHint: Optional[str] = None) -> Any:
# if fname is a full path, use only the basename # if fname is a full path, use only the basename
fname = os.path.basename(opath) fname = os.path.basename(opath)
@ -196,7 +197,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
# String manipulation # String manipulation
########################################################################## ##########################################################################
def filesInStr(self, mid, string, includeRemote=False) -> List[str]: def filesInStr(self, mid: Union[int, str], string: str, includeRemote: bool = False) -> List[str]:
l = [] l = []
model = self.col.models.get(mid) model = self.col.models.get(mid)
strings = [] strings = []
@ -236,17 +237,17 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
strings.append(re.sub(clozeReg%".+?", arepl, string)) strings.append(re.sub(clozeReg%".+?", arepl, string))
return strings return strings
def transformNames(self, txt, func) -> Any: def transformNames(self, txt: str, func: Callable) -> Any:
for reg in self.regexps: for reg in self.regexps:
txt = re.sub(reg, func, txt) txt = re.sub(reg, func, txt)
return txt return txt
def strip(self, txt: _T0) -> Union[str, _T0]: def strip(self, txt: str) -> str:
for reg in self.regexps: for reg in self.regexps:
txt = re.sub(reg, "", txt) txt = re.sub(reg, "", txt)
return txt return txt
def escapeImages(self, string: _T0, unescape=False) -> Union[str, _T0]: def escapeImages(self, string: str, unescape: bool = False) -> str:
if unescape: if unescape:
fn = urllib.parse.unquote fn = urllib.parse.unquote
else: else:
@ -264,7 +265,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
# Rebuilding DB # Rebuilding DB
########################################################################## ##########################################################################
def check(self, local=None) -> Any: def check(self, local: Optional[List[str]]) -> Tuple[List[str],List[str],List[str]]:
"Return (missingFiles, unusedFiles)." "Return (missingFiles, unusedFiles)."
mdir = self.dir() mdir = self.dir()
# gather all media references in NFC form # gather all media references in NFC form
@ -349,7 +350,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
# Copying on import # Copying on import
########################################################################## ##########################################################################
def have(self, fname) -> bool: def have(self, fname: str) -> bool:
return os.path.exists(os.path.join(self.dir(), fname)) return os.path.exists(os.path.join(self.dir(), fname))
# Illegal characters and paths # Illegal characters and paths
@ -357,10 +358,10 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
_illegalCharReg = re.compile(r'[][><:"/?*^\\|\0\r\n]') _illegalCharReg = re.compile(r'[][><:"/?*^\\|\0\r\n]')
def stripIllegal(self, str) -> str: def stripIllegal(self, str: str) -> str:
return re.sub(self._illegalCharReg, "", str) return re.sub(self._illegalCharReg, "", str)
def hasIllegal(self, s: str): def hasIllegal(self, s: str) -> bool:
if re.search(self._illegalCharReg, s): if re.search(self._illegalCharReg, s):
return True return True
try: try:
@ -369,7 +370,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return True return True
return False return False
def cleanFilename(self, fname) -> str: def cleanFilename(self, fname: str) -> str:
fname = self.stripIllegal(fname) fname = self.stripIllegal(fname)
fname = self._cleanWin32Filename(fname) fname = self._cleanWin32Filename(fname)
fname = self._cleanLongFilename(fname) fname = self._cleanLongFilename(fname)
@ -378,7 +379,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return fname return fname
def _cleanWin32Filename(self, fname: _T0) -> Union[str, _T0]: def _cleanWin32Filename(self, fname: str) -> str:
if not isWin: if not isWin:
return fname return fname
@ -390,7 +391,7 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
return fname return fname
def _cleanLongFilename(self, fname) -> Any: def _cleanLongFilename(self, fname: str) -> Any:
# a fairly safe limit that should work on typical windows # a fairly safe limit that should work on typical windows
# paths and on eCryptfs partitions, even with a duplicate # paths and on eCryptfs partitions, even with a duplicate
# suffix appended # suffix appended
@ -427,10 +428,10 @@ create table meta (dirMod int, lastUsn int); insert into meta values (0, 0);
def haveDirty(self) -> Any: def haveDirty(self) -> Any:
return self.db.scalar("select 1 from media where dirty=1 limit 1") return self.db.scalar("select 1 from media where dirty=1 limit 1")
def _mtime(self, path) -> int: def _mtime(self, path: str) -> int:
return int(os.stat(path).st_mtime) return int(os.stat(path).st_mtime)
def _checksum(self, path) -> str: def _checksum(self, path: str) -> str:
with open(path, "rb") as f: with open(path, "rb") as f:
return checksum(f.read()) return checksum(f.read())

View file

@ -18,7 +18,7 @@ from typing import Any, Dict, List, Optional, Tuple, Type, Union
_Collection: Type[_Collection] _Collection: Type[_Collection]
def Collection(path, lock=True, server=False, log=False) -> _Collection: def Collection(path: str, lock: bool = True, server: bool = False, log: bool = False) -> _Collection:
"Open a new or existing collection. Path must be unicode." "Open a new or existing collection. Path must be unicode."
assert path.endswith(".anki2") assert path.endswith(".anki2")
path = os.path.abspath(path) path = os.path.abspath(path)
@ -57,7 +57,7 @@ def Collection(path, lock=True, server=False, log=False) -> _Collection:
col.lock() col.lock()
return col return col
def _upgradeSchema(db) -> Any: def _upgradeSchema(db: DB) -> Any:
ver = db.scalar("select ver from col") ver = db.scalar("select ver from col")
if ver == SCHEMA_VERSION: if ver == SCHEMA_VERSION:
return ver return ver
@ -208,7 +208,7 @@ def _upgradeClozeModel(col, m) -> None:
# Creating a new collection # Creating a new collection
###################################################################### ######################################################################
def _createDB(db) -> int: def _createDB(db: DB) -> int:
db.execute("pragma page_size = 4096") db.execute("pragma page_size = 4096")
db.execute("pragma legacy_file_format = 0") db.execute("pragma legacy_file_format = 0")
db.execute("vacuum") db.execute("vacuum")
@ -217,7 +217,7 @@ def _createDB(db) -> int:
db.execute("analyze") db.execute("analyze")
return SCHEMA_VERSION return SCHEMA_VERSION
def _addSchema(db, setColConf=True) -> None: def _addSchema(db: DB, setColConf: bool = True) -> None:
db.executescript(""" db.executescript("""
create table if not exists col ( create table if not exists col (
id integer primary key, id integer primary key,
@ -294,7 +294,7 @@ values(1,0,0,%(s)s,%(v)s,0,0,0,'','{}','','','{}');
if setColConf: if setColConf:
_addColVars(db, *_getColVars(db)) _addColVars(db, *_getColVars(db))
def _getColVars(db) -> Tuple[Any, Any, Dict[str, Optional[Union[int, str, List[int]]]]]: def _getColVars(db: DB) -> Tuple[Any, Any, Dict[str, Any]]:
import anki.collection import anki.collection
import anki.decks import anki.decks
g = copy.deepcopy(anki.decks.defaultDeck) g = copy.deepcopy(anki.decks.defaultDeck)
@ -306,14 +306,14 @@ def _getColVars(db) -> Tuple[Any, Any, Dict[str, Optional[Union[int, str, List[i
gc['id'] = 1 gc['id'] = 1
return g, gc, anki.collection.defaultConf.copy() return g, gc, anki.collection.defaultConf.copy()
def _addColVars(db, g, gc, c) -> None: def _addColVars(db: DB, g: Dict[str, Any], gc: Dict[str, Any], c: Dict[str, Any]) -> None:
db.execute(""" db.execute("""
update col set conf = ?, decks = ?, dconf = ?""", update col set conf = ?, decks = ?, dconf = ?""",
json.dumps(c), json.dumps(c),
json.dumps({'1': g}), json.dumps({'1': g}),
json.dumps({'1': gc})) json.dumps({'1': gc}))
def _updateIndices(db) -> None: def _updateIndices(db: DB) -> None:
"Add indices to the DB." "Add indices to the DB."
db.executescript(""" db.executescript("""
-- syncing -- syncing