# Copyright: Ankitects Pty Ltd and contributors # License: GNU AGPL, version 3 or later; http://www.gnu.org/licenses/agpl.html # pylint: enable=invalid-name from __future__ import annotations import json as _json import os import platform import random import re import shutil import string import subprocess import sys import tempfile import time from contextlib import contextmanager from hashlib import sha1 from typing import Any, Iterable, Iterator, no_type_check from anki._legacy import DeprecatedNamesMixinForModule from anki.dbproxy import DBProxy _tmpdir: str | None try: # pylint: disable=c-extension-no-member import orjson to_json_bytes = orjson.dumps from_json_bytes = orjson.loads except: print("orjson is missing; DB operations will be slower") to_json_bytes = lambda obj: _json.dumps(obj).encode("utf8") # type: ignore from_json_bytes = _json.loads # Time handling ############################################################################## def int_time(scale: int = 1) -> int: "The time in integer seconds. Pass scale=1000 to get milliseconds." return int(time.time() * scale) # HTML ############################################################################## def strip_html(txt: str) -> str: import anki.lang from anki.collection import StripHtmlMode return anki.lang.current_i18n.strip_html(text=txt, mode=StripHtmlMode.NORMAL) def strip_html_media(txt: str) -> str: "Strip HTML but keep media filenames" import anki.lang from anki.collection import StripHtmlMode return anki.lang.current_i18n.strip_html( text=txt, mode=StripHtmlMode.PRESERVE_MEDIA_FILENAMES ) def html_to_text_line(txt: str) -> str: txt = txt.replace("
", " ") txt = txt.replace("
", " ") txt = txt.replace("
", " ") txt = txt.replace("\n", " ") txt = re.sub(r"\[sound:[^]]+\]", "", txt) txt = re.sub(r"\[\[type:[^]]+\]\]", "", txt) txt = strip_html_media(txt) txt = txt.strip() return txt # IDs ############################################################################## def ids2str(ids: Iterable[int | str]) -> str: """Given a list of integers, return a string '(int1,int2,...)'.""" return f"({','.join(str(i) for i in ids)})" def timestamp_id(db: DBProxy, table: str) -> int: "Return a non-conflicting timestamp for table." # be careful not to create multiple objects without flushing them, or they # may share an ID. timestamp = int_time(1000) while db.scalar(f"select id from {table} where id = ?", timestamp): timestamp += 1 return timestamp def max_id(db: DBProxy) -> int: "Return the first safe ID to use." now = int_time(1000) for tbl in "cards", "notes": now = max(now, db.scalar(f"select max(id) from {tbl}") or 0) return now + 1 # used in ankiweb def base62(num: int, extra: str = "") -> str: table = string.ascii_letters + string.digits + extra buf = "" while num: num, mod = divmod(num, len(table)) buf = table[mod] + buf return buf _BASE91_EXTRA_CHARS = "!#$%&()*+,-./:;<=>?@[]^_`{|}~" def base91(num: int) -> str: # all printable characters minus quotes, backslash and separators return base62(num, _BASE91_EXTRA_CHARS) def guid64() -> str: "Return a base91-encoded 64bit random number." return base91(random.randint(0, 2 ** 64 - 1)) # Fields ############################################################################## def join_fields(list: list[str]) -> str: return "\x1f".join(list) def split_fields(string: str) -> list[str]: return string.split("\x1f") # Checksums ############################################################################## def checksum(data: bytes | str) -> str: if isinstance(data, str): data = data.encode("utf-8") return sha1(data).hexdigest() def field_checksum(data: str) -> int: # 32 bit unsigned number from first 8 digits of sha1 hash return int(checksum(strip_html_media(data).encode("utf-8"))[:8], 16) # Temp files ############################################################################## _tmpdir = None # pylint: disable=invalid-name def tmpdir() -> str: "A reusable temp folder which we clean out on each program invocation." global _tmpdir # pylint: disable=invalid-name if not _tmpdir: def cleanup() -> None: if os.path.exists(_tmpdir): shutil.rmtree(_tmpdir) import atexit atexit.register(cleanup) _tmpdir = os.path.join(tempfile.gettempdir(), "anki_temp") try: os.mkdir(_tmpdir) except FileExistsError: pass return _tmpdir def tmpfile(prefix: str = "", suffix: str = "") -> str: (descriptor, name) = tempfile.mkstemp(dir=tmpdir(), prefix=prefix, suffix=suffix) os.close(descriptor) return name def namedtmp(name: str, remove: bool = True) -> str: "Return tmpdir+name. Deletes any existing file." path = os.path.join(tmpdir(), name) if remove: try: os.unlink(path) except OSError: pass return path # Cmd invocation ############################################################################## @contextmanager def no_bundled_libs() -> Iterator[None]: oldlpath = os.environ.pop("LD_LIBRARY_PATH", None) yield if oldlpath is not None: os.environ["LD_LIBRARY_PATH"] = oldlpath def call(argv: list[str], wait: bool = True, **kwargs: Any) -> int: "Execute a command. If WAIT, return exit code." # ensure we don't open a separate window for forking process on windows if isWin: info = subprocess.STARTUPINFO() # type: ignore try: info.dwFlags |= subprocess.STARTF_USESHOWWINDOW # type: ignore except: # pylint: disable=no-member info.dwFlags |= subprocess._subprocess.STARTF_USESHOWWINDOW # type: ignore else: info = None # run try: with no_bundled_libs(): process = subprocess.Popen(argv, startupinfo=info, **kwargs) except OSError: # command not found return -1 # wait for command to finish if wait: while 1: try: ret = process.wait() except OSError: # interrupted system call continue break else: ret = 0 return ret # OS helpers ############################################################################## isMac = sys.platform.startswith("darwin") isWin = sys.platform.startswith("win32") isLin = not isMac and not isWin devMode = os.getenv("ANKIDEV", "") INVALID_FILENAME_CHARS = ':*?"<>|' def invalid_filename(str: str, dirsep: bool = True) -> str | None: for char in INVALID_FILENAME_CHARS: if char in str: return char if (dirsep or isWin) and "/" in str: return "/" elif (dirsep or not isWin) and "\\" in str: return "\\" elif str.strip().startswith("."): return "." return None def plat_desc() -> str: # we may get an interrupted system call, so try this in a loop theos = "unknown" for _ in range(100): try: system = platform.system() if isMac: theos = f"mac:{platform.mac_ver()[0]}" elif isWin: theos = f"win:{platform.win32_ver()[0]}" elif system == "Linux": import distro # pytype: disable=import-error # pylint: disable=import-error dist_id = distro.id() dist_version = distro.version() theos = f"lin:{dist_id}:{dist_version}" else: theos = system break except: continue return theos # Version ############################################################################## def version_with_build() -> str: from anki.buildinfo import buildhash, version return f"{version} ({buildhash})" def point_version() -> int: from anki.buildinfo import version return int(version.split(".")[-1]) _deprecated_names = DeprecatedNamesMixinForModule(globals()) _deprecated_names.register_deprecated_aliases( stripHTML=strip_html, stripHTMLMedia=strip_html_media, timestampID=timestamp_id, maxID=max_id, invalidFilenameChars=(INVALID_FILENAME_CHARS, "INVALID_FILENAME_CHARS"), ) _deprecated_names.register_deprecated_attributes(json=((_json, "_json"), None)) @no_type_check def __getattr__(name: str) -> Any: return _deprecated_names.__getattr__(name)