diff --git a/qt/aqt/mediasrv.py b/qt/aqt/mediasrv.py index b444d1ec0..6cb61d916 100644 --- a/qt/aqt/mediasrv.py +++ b/qt/aqt/mediasrv.py @@ -7,7 +7,9 @@ import enum import logging import mimetypes import os +import random import re +import string import sys import threading import traceback @@ -698,7 +700,6 @@ def _extract_collection_post_request(path: str) -> DynamicRequest | NotFound: def _check_dynamic_request_permissions(): if request.method == "GET": return - context = _extract_page_context() def warn() -> None: show_warning( @@ -710,24 +711,16 @@ def _check_dynamic_request_permissions(): aqt.mw.taskman.run_on_main(warn) abort(403) - if context in [ - PageContext.NON_LEGACY_PAGE, - PageContext.EDITOR, - PageContext.ADDON_PAGE, - PageContext.DECK_OPTIONS, - ]: - pass - elif context == PageContext.REVIEWER and request.path in ( + # does page have access to entire API? + if _have_api_access(): + return + + # whitelisted API endpoints for reviewer/previewer + if request.path in ( "/_anki/getSchedulingStatesWithContext", "/_anki/setSchedulingStates", "/_anki/i18nResources", ): - # reviewer is only allowed to access custom study methods - pass - elif ( - context == PageContext.PREVIEWER or context == PageContext.CARD_LAYOUT - ) and request.path == "/_anki/i18nResources": - # previewers are only allowed to access i18n resources pass else: # other legacy pages may contain third-party JS, so we do not @@ -752,23 +745,11 @@ def legacy_page_data() -> Response: return _text_response(HTTPStatus.NOT_FOUND, "page not found") -def _extract_page_context() -> PageContext: - "Get context based on referer header." - from urllib.parse import parse_qs, urlparse +_APIKEY = "".join(random.choices(string.ascii_letters + string.digits, k=32)) - referer = urlparse(request.headers.get("Referer", "")) - if referer.path.startswith("/_anki/pages/") or is_sveltekit_page(referer.path[1:]): - return PageContext.NON_LEGACY_PAGE - elif referer.path == "/_anki/legacyPageData": - query_params = parse_qs(referer.query) - query_id = query_params.get("id") - if not query_id: - return PageContext.UNKNOWN - id = int(query_id[0]) - page_context = aqt.mw.mediaServer.get_page_context(id) - return page_context if page_context else PageContext.UNKNOWN - else: - return PageContext.UNKNOWN + +def _have_api_access() -> bool: + return request.headers.get("Authorization") == f"Bearer {_APIKEY}" # this currently only handles a single method; in the future, idempotent diff --git a/qt/aqt/sound.py b/qt/aqt/sound.py index 48a1852d6..55e052e1d 100644 --- a/qt/aqt/sound.py +++ b/qt/aqt/sound.py @@ -141,7 +141,7 @@ class AVPlayer: # audio be stopped? interrupt_current_audio = True # caller key for the current playback (optional) - current_caller = None + current_caller: Any = None # whether the last call to play_file_with_caller interrupted another current_caller_interrupted = False @@ -167,7 +167,7 @@ class AVPlayer: self._enqueued = [] self._stop_if_playing() - def stop_and_clear_queue_if_caller(self, caller) -> None: + def stop_and_clear_queue_if_caller(self, caller: Any) -> None: if caller == self.current_caller: self.stop_and_clear_queue() @@ -179,7 +179,7 @@ class AVPlayer: def play_file(self, filename: str) -> None: self.play_tags([SoundOrVideoTag(filename=filename)]) - def play_file_with_caller(self, filename: str, caller) -> None: + def play_file_with_caller(self, filename: str, caller: Any) -> None: if self.current_caller: self.current_caller_interrupted = True self.current_caller = caller diff --git a/qt/aqt/webview.py b/qt/aqt/webview.py index b5b50ad72..704af4cf7 100644 --- a/qt/aqt/webview.py +++ b/qt/aqt/webview.py @@ -28,35 +28,88 @@ serverbaseurl = re.compile(r"^.+:\/\/[^\/]+") if TYPE_CHECKING: from aqt.mediasrv import PageContext - -# Page for debug messages -########################################################################## - BridgeCommandHandler = Callable[[str], Any] +class AnkiWebViewKind(Enum): + """Enum registry of all web views managed by Anki + + The value of each entry corresponds to the web view's title. + + When introducing a new web view, please add it to the registry below. + """ + + DEFAULT = "default" + MAIN = "main webview" + TOP_TOOLBAR = "top toolbar" + BOTTOM_TOOLBAR = "bottom toolbar" + DECK_OPTIONS = "deck options" + EDITOR = "editor" + LEGACY_DECK_STATS = "legacy deck stats" + DECK_STATS = "deck stats" + PREVIEWER = "previewer" + CHANGE_NOTETYPE = "change notetype" + CARD_LAYOUT = "card layout" + BROWSER_CARD_INFO = "browser card info" + IMPORT_CSV = "csv import" + EMPTY_CARDS = "empty cards" + FIND_DUPLICATES = "find duplicates" + FIELDS = "fields" + IMPORT_LOG = "import log" + IMPORT_ANKI_PACKAGE = "anki package import" + + +class AuthInterceptor(QWebEngineUrlRequestInterceptor): + _api_enabled = False + + def __init__(self, parent: QObject | None = None, api_enabled: bool = False): + super().__init__(parent) + self._api_enabled = api_enabled + + def interceptRequest(self, info): + from aqt.mediasrv import _APIKEY + + if self._api_enabled: + info.setHttpHeader(b"Authorization", f"Bearer {_APIKEY}".encode("utf-8")) + + +_profile_with_api_access: QWebEngineProfile | None = None +_profile_without_api_access: QWebEngineProfile | None = None + + class AnkiWebPage(QWebEnginePage): - def __init__(self, onBridgeCmd: BridgeCommandHandler) -> None: - QWebEnginePage.__init__(self) + def __init__( + self, + onBridgeCmd: BridgeCommandHandler, + kind: AnkiWebViewKind, + parent: QObject | None, + ) -> None: + profile = self._profileForPage(kind) + QWebEnginePage.__init__(self, profile, parent) self._onBridgeCmd = onBridgeCmd + self._kind = kind self._setupBridge() self.open_links_externally = True - def _setupBridge(self) -> None: - class Bridge(QObject): - def __init__(self, bridge_handler: Callable[[str], Any]) -> None: - super().__init__() - self.onCmd = bridge_handler + def _profileForPage(self, kind: AnkiWebViewKind) -> QWebEngineProfile: + have_api_access = kind in ( + AnkiWebViewKind.DECK_OPTIONS, + AnkiWebViewKind.EDITOR, + AnkiWebViewKind.DECK_STATS, + AnkiWebViewKind.CHANGE_NOTETYPE, + AnkiWebViewKind.BROWSER_CARD_INFO, + ) - @pyqtSlot(str, result=str) # type: ignore - def cmd(self, str: str) -> Any: - return json.dumps(self.onCmd(str)) + global _profile_with_api_access, _profile_without_api_access - self._bridge = Bridge(self._onCmd) + # Use cached profile if available + if have_api_access and _profile_with_api_access is not None: + return _profile_with_api_access + elif not have_api_access and _profile_without_api_access is not None: + return _profile_without_api_access - self._channel = QWebChannel(self) - self._channel.registerObject("py", self._bridge) - self.setWebChannel(self._channel) + # Create a new profile if not cached + profile = QWebEngineProfile() qwebchannel = ":/qtwebchannel/qwebchannel.js" jsfile = QFile(qwebchannel) @@ -90,12 +143,35 @@ class AnkiWebPage(QWebEnginePage): script.setInjectionPoint(QWebEngineScript.InjectionPoint.DocumentReady) script.setRunsOnSubFrames(False) - profile = self.profile() - assert profile is not None scripts = profile.scripts() assert scripts is not None scripts.insert(script) + interceptor = AuthInterceptor(profile, api_enabled=have_api_access) + profile.setUrlRequestInterceptor(interceptor) + if have_api_access: + _profile_with_api_access = profile + else: + _profile_without_api_access = profile + + return profile + + def _setupBridge(self) -> None: + class Bridge(QObject): + def __init__(self, bridge_handler: Callable[[str], Any]) -> None: + super().__init__() + self.onCmd = bridge_handler + + @pyqtSlot(str, result=str) # type: ignore + def cmd(self, str: str) -> Any: + return json.dumps(self.onCmd(str)) + + self._bridge = Bridge(self._onCmd) + + self._channel = QWebChannel(self) + self._channel.registerObject("py", self._bridge) + self.setWebChannel(self._channel) + def javaScriptConsoleMessage( self, level: QWebEnginePage.JavaScriptConsoleMessageLevel, @@ -247,34 +323,6 @@ class WebContent: ########################################################################## -class AnkiWebViewKind(Enum): - """Enum registry of all web views managed by Anki - - The value of each entry corresponds to the web view's title. - - When introducing a new web view, please add it to the registry below. - """ - - DEFAULT = "default" - MAIN = "main webview" - TOP_TOOLBAR = "top toolbar" - BOTTOM_TOOLBAR = "bottom toolbar" - DECK_OPTIONS = "deck options" - EDITOR = "editor" - LEGACY_DECK_STATS = "legacy deck stats" - DECK_STATS = "deck stats" - PREVIEWER = "previewer" - CHANGE_NOTETYPE = "change notetype" - CARD_LAYOUT = "card layout" - BROWSER_CARD_INFO = "browser card info" - IMPORT_CSV = "csv import" - EMPTY_CARDS = "empty cards" - FIND_DUPLICATES = "find duplicates" - FIELDS = "fields" - IMPORT_LOG = "import log" - IMPORT_ANKI_PACKAGE = "anki package import" - - class AnkiWebView(QWebEngineView): allow_drops = False _kind: AnkiWebViewKind @@ -287,11 +335,8 @@ class AnkiWebView(QWebEngineView): ) -> None: QWebEngineView.__init__(self, parent=parent) self.set_kind(kind) - if title: - self.set_title(title) - self._page = AnkiWebPage(self._onBridgeCmd) # reduce flicker - self._page.setBackgroundColor(theme_manager.qcolor(colors.CANVAS)) + self.page().setBackgroundColor(theme_manager.qcolor(colors.CANVAS)) # in new code, use .set_bridge_command() instead of setting this directly self.onBridgeCmd: Callable[[str], Any] = self.defaultOnBridgeCmd @@ -299,7 +344,6 @@ class AnkiWebView(QWebEngineView): self._domDone = True self._pendingActions: list[tuple[str, Sequence[Any]]] = [] self.requiresCol = True - self.setPage(self._page) self._disable_zoom = False self.resetHandlers() @@ -323,6 +367,15 @@ class AnkiWebView(QWebEngineView): def set_kind(self, kind: AnkiWebViewKind) -> None: self._kind = kind self.set_title(kind.value) + # this is an ugly hack to avoid breakages caused by + # creating a default webview then immediately calling set_kind, which results + # in the creation of two pages, and the second fails as the domDone + # signal from the first one is received + if kind != AnkiWebViewKind.DEFAULT: + self.setPage(AnkiWebPage(self._onBridgeCmd, kind, self)) + + def page(self) -> AnkiWebPage: + return cast(AnkiWebPage, super().page()) @property def kind(self) -> AnkiWebViewKind: @@ -357,7 +410,7 @@ class AnkiWebView(QWebEngineView): return False def set_open_links_externally(self, enable: bool) -> None: - self._page.open_links_externally = enable + self.page().open_links_externally = enable def onEsc(self) -> None: w = self.parent() @@ -822,7 +875,7 @@ html {{ {font} }} Must be done on Windows prior to changing current working directory.""" self.requiresCol = False self._domReady = False - self._page.setContent(cast(QByteArray, bytes("", "ascii"))) + self.page().setContent(cast(QByteArray, bytes("", "ascii"))) def cleanup(self) -> None: try: @@ -836,14 +889,14 @@ html {{ {font} }} # defer page cleanup so that in-flight requests have a chance to complete first # https://forums.ankiweb.net/t/error-when-exiting-browsing-when-the-software-is-installed-in-the-path-c-program-files-anki/38363 mw.progress.single_shot(5000, lambda: mw.mediaServer.clear_page_html(id(self))) - self._page.deleteLater() + self.page().deleteLater() def on_theme_did_change(self) -> None: # avoid flashes if page reloaded - self._page.setBackgroundColor(theme_manager.qcolor(colors.CANVAS)) + self.page().setBackgroundColor(theme_manager.qcolor(colors.CANVAS)) if hasattr(QWebEngineSettings.WebAttribute, "ForceDarkMode"): force_dark_mode = getattr(QWebEngineSettings.WebAttribute, "ForceDarkMode") - page_settings = self._page.settings() + page_settings = self.page().settings() if page_settings is not None: page_settings.setAttribute( force_dark_mode,