Anki/pylib/anki/utils.py
Damien Elmes 9d853bbb03 start work on more clearly defining backend/protobuf boundaries
- anki._backend stores the protobuf files and rsbackend.py code
- pylib modules import protobuf messages directly from the
_pb2 files, and explicitly export any will be returned or consumed
by public pylib functions, so that calling code can import from pylib
- the "rsbackend" no longer imports and re-exports protobuf messages
- pylib can just consume them directly.
- move errors to errors.py

Still todo:

- rsbridge
- finishing the work on rsbackend, and check what we need to add
back to the original file location to avoid breaking add-ons
2021-01-31 18:55:45 +10:00

408 lines
10 KiB
Python

# Copyright: Ankitects Pty Ltd and contributors
# License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html
from __future__ import annotations
# some add-ons expect json to be in the utils module
import json # pylint: disable=unused-import
import locale
import os
import platform
import random
import re
import shutil
import string
import subprocess
import sys
import tempfile
import time
import traceback
from contextlib import contextmanager
from hashlib import sha1
from html.entities import name2codepoint
from typing import Iterable, Iterator, List, Optional, Union
from anki.dbproxy import DBProxy
_tmpdir: Optional[str]
try:
# pylint: disable=c-extension-no-member
import orjson
to_json_bytes = orjson.dumps
from_json_bytes = orjson.loads
except:
print("orjson is missing; DB operations will be slower")
to_json_bytes = lambda obj: json.dumps(obj).encode("utf8") # type: ignore
from_json_bytes = json.loads
# Time handling
##############################################################################
def intTime(scale: int = 1) -> int:
"The time in integer seconds. Pass scale=1000 to get milliseconds."
return int(time.time() * scale)
# Locale
##############################################################################
def fmtPercentage(float_value, point=1) -> str:
"Return float with percentage sign"
fmt = "%" + "0.%(b)df" % {"b": point}
return locale.format_string(fmt, float_value) + "%"
def fmtFloat(float_value, point=1) -> str:
"Return a string with decimal separator according to current locale"
fmt = "%" + "0.%(b)df" % {"b": point}
return locale.format_string(fmt, float_value)
# HTML
##############################################################################
reComment = re.compile("(?s)<!--.*?-->")
reStyle = re.compile("(?si)<style.*?>.*?</style>")
reScript = re.compile("(?si)<script.*?>.*?</script>")
reTag = re.compile("(?s)<.*?>")
reEnts = re.compile(r"&#?\w+;")
reMedia = re.compile("(?i)<img[^>]+src=[\"']?([^\"'>]+)[\"']?[^>]*>")
def stripHTML(s: str) -> str:
s = reComment.sub("", s)
s = reStyle.sub("", s)
s = reScript.sub("", s)
s = reTag.sub("", s)
s = entsToTxt(s)
return s
def stripHTMLMedia(s: str) -> str:
"Strip HTML but keep media filenames"
s = reMedia.sub(" \\1 ", s)
return stripHTML(s)
def minimizeHTML(s: str) -> str:
"Correct Qt's verbose bold/underline/etc."
s = re.sub('<span style="font-weight:600;">(.*?)</span>', "<b>\\1</b>", s)
s = re.sub('<span style="font-style:italic;">(.*?)</span>', "<i>\\1</i>", s)
s = re.sub(
'<span style="text-decoration: underline;">(.*?)</span>', "<u>\\1</u>", s
)
return s
def htmlToTextLine(s: str) -> str:
s = s.replace("<br>", " ")
s = s.replace("<br />", " ")
s = s.replace("<div>", " ")
s = s.replace("\n", " ")
s = re.sub(r"\[sound:[^]]+\]", "", s)
s = re.sub(r"\[\[type:[^]]+\]\]", "", s)
s = stripHTMLMedia(s)
s = s.strip()
return s
def entsToTxt(html: str) -> str:
# entitydefs defines nbsp as \xa0 instead of a standard space, so we
# replace it first
html = html.replace("&nbsp;", " ")
def fixup(m):
text = m.group(0)
if text[:2] == "&#":
# character reference
try:
if text[:3] == "&#x":
return chr(int(text[3:-1], 16))
else:
return chr(int(text[2:-1]))
except ValueError:
pass
else:
# named entity
try:
text = chr(name2codepoint[text[1:-1]])
except KeyError:
pass
return text # leave as is
return reEnts.sub(fixup, html)
# IDs
##############################################################################
def hexifyID(id) -> str:
return "%x" % int(id)
def dehexifyID(id) -> int:
return int(id, 16)
def ids2str(ids: Iterable[Union[int, str]]) -> str:
"""Given a list of integers, return a string '(int1,int2,...)'."""
return "(%s)" % ",".join(str(i) for i in ids)
def timestampID(db: DBProxy, table: str) -> int:
"Return a non-conflicting timestamp for table."
# be careful not to create multiple objects without flushing them, or they
# may share an ID.
t = intTime(1000)
while db.scalar("select id from %s where id = ?" % table, t):
t += 1
return t
def maxID(db: DBProxy) -> int:
"Return the first safe ID to use."
now = intTime(1000)
for tbl in "cards", "notes":
now = max(now, db.scalar("select max(id) from %s" % tbl) or 0)
return now + 1
# used in ankiweb
def base62(num: int, extra: str = "") -> str:
s = string
table = s.ascii_letters + s.digits + extra
buf = ""
while num:
num, i = divmod(num, len(table))
buf = table[i] + buf
return buf
_base91_extra_chars = "!#$%&()*+,-./:;<=>?@[]^_`{|}~"
def base91(num: int) -> str:
# all printable characters minus quotes, backslash and separators
return base62(num, _base91_extra_chars)
def guid64() -> str:
"Return a base91-encoded 64bit random number."
return base91(random.randint(0, 2 ** 64 - 1))
# increment a guid by one, for note type conflicts
def incGuid(guid) -> str:
return _incGuid(guid[::-1])[::-1]
def _incGuid(guid) -> str:
s = string
table = s.ascii_letters + s.digits + _base91_extra_chars
idx = table.index(guid[0])
if idx + 1 == len(table):
# overflow
guid = table[0] + _incGuid(guid[1:])
else:
guid = table[idx + 1] + guid[1:]
return guid
# Fields
##############################################################################
def joinFields(list: List[str]) -> str:
return "\x1f".join(list)
def splitFields(string: str) -> List[str]:
return string.split("\x1f")
# Checksums
##############################################################################
def checksum(data: Union[bytes, str]) -> str:
if isinstance(data, str):
data = data.encode("utf-8")
return sha1(data).hexdigest()
def fieldChecksum(data: str) -> int:
# 32 bit unsigned number from first 8 digits of sha1 hash
return int(checksum(stripHTMLMedia(data).encode("utf-8"))[:8], 16)
# Temp files
##############################################################################
_tmpdir = None
def tmpdir() -> str:
"A reusable temp folder which we clean out on each program invocation."
global _tmpdir
if not _tmpdir:
def cleanup():
if os.path.exists(_tmpdir):
shutil.rmtree(_tmpdir)
import atexit
atexit.register(cleanup)
_tmpdir = os.path.join(tempfile.gettempdir(), "anki_temp")
try:
os.mkdir(_tmpdir)
except FileExistsError:
pass
return _tmpdir
def tmpfile(prefix: str = "", suffix: str = "") -> str:
(fd, name) = tempfile.mkstemp(dir=tmpdir(), prefix=prefix, suffix=suffix)
os.close(fd)
return name
def namedtmp(name: str, rm: bool = True) -> str:
"Return tmpdir+name. Deletes any existing file."
path = os.path.join(tmpdir(), name)
if rm:
try:
os.unlink(path)
except (OSError, IOError):
pass
return path
# Cmd invocation
##############################################################################
@contextmanager
def noBundledLibs() -> Iterator[None]:
oldlpath = os.environ.pop("LD_LIBRARY_PATH", None)
yield
if oldlpath is not None:
os.environ["LD_LIBRARY_PATH"] = oldlpath
def call(argv: List[str], wait: bool = True, **kwargs) -> int:
"Execute a command. If WAIT, return exit code."
# ensure we don't open a separate window for forking process on windows
if isWin:
si = subprocess.STARTUPINFO() # type: ignore
try:
si.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore
except:
# pylint: disable=no-member
si.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW # type: ignore
else:
si = None
# run
try:
with noBundledLibs():
o = subprocess.Popen(argv, startupinfo=si, **kwargs)
except OSError:
# command not found
return -1
# wait for command to finish
if wait:
while 1:
try:
ret = o.wait()
except OSError:
# interrupted system call
continue
break
else:
ret = 0
return ret
# OS helpers
##############################################################################
isMac = sys.platform.startswith("darwin")
isWin = sys.platform.startswith("win32")
isLin = not isMac and not isWin
devMode = os.getenv("ANKIDEV", "")
invalidFilenameChars = ':*?"<>|'
def invalidFilename(str, dirsep=True) -> Optional[str]:
for c in invalidFilenameChars:
if c in str:
return c
if (dirsep or isWin) and "/" in str:
return "/"
elif (dirsep or not isWin) and "\\" in str:
return "\\"
elif str.strip().startswith("."):
return "."
return None
def platDesc() -> str:
# we may get an interrupted system call, so try this in a loop
n = 0
theos = "unknown"
while n < 100:
n += 1
try:
system = platform.system()
if isMac:
theos = "mac:%s" % (platform.mac_ver()[0])
elif isWin:
theos = "win:%s" % (platform.win32_ver()[0])
elif system == "Linux":
import distro # pytype: disable=import-error # pylint: disable=import-error
r = distro.linux_distribution(full_distribution_name=False)
theos = "lin:%s:%s" % (r[0], r[1])
else:
theos = system
break
except:
continue
return theos
# Debugging
##############################################################################
class TimedLog:
def __init__(self) -> None:
self._last = time.time()
def log(self, s) -> None:
path, num, fn, y = traceback.extract_stack(limit=2)[0]
sys.stderr.write(
"%5dms: %s(): %s\n" % ((time.time() - self._last) * 1000, fn, s)
)
self._last = time.time()
# Version
##############################################################################
def versionWithBuild() -> str:
from anki.buildinfo import buildhash, version
return "%s (%s)" % (version, buildhash)
def pointVersion() -> int:
from anki.buildinfo import version
return int(version.split(".")[-1])