mirror of
https://github.com/ankitects/anki.git
synced 2025-09-19 06:22:22 -04:00
Require an auth token for API access
We were previously inspecting the referrer, but that is spoofable, and doesn't guard against other processes on the machine. To accomplish this, we use a request interceptor to automatically add an auth token to webviews with the right context. Some related changes were required: - We avoid storing _page, which was leading to leaks & warning on exit - At webview creation (or set_kind() invocation), we assign either an authenticated or unauthenticated web profile. - Some of our screens initialize the AnkiWebView when calling, e.g., aqt.forms.stats.Ui_Dialog(). They then immediately call .set_kind(). This reveals a race condition in our DOM handling code: the webview initialization creates an empty page with the injected script, which causes a domDone signal to be sent back. This signal arrives after we've created another page with .set_kind(), causing our code to think the DOM is ready when it's not. Then when we try to inject the dynamic styling, we get an error, as the DOM is not ready yet. In the absence of better solutions, I've added a hack to set_kind() to deal with this for now.
This commit is contained in:
parent
1c156905f8
commit
24bca15fd3
3 changed files with 126 additions and 92 deletions
|
@ -7,7 +7,9 @@ import enum
|
|||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import string
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
|
@ -698,7 +700,6 @@ def _extract_collection_post_request(path: str) -> DynamicRequest | NotFound:
|
|||
def _check_dynamic_request_permissions():
|
||||
if request.method == "GET":
|
||||
return
|
||||
context = _extract_page_context()
|
||||
|
||||
def warn() -> None:
|
||||
show_warning(
|
||||
|
@ -710,24 +711,16 @@ def _check_dynamic_request_permissions():
|
|||
aqt.mw.taskman.run_on_main(warn)
|
||||
abort(403)
|
||||
|
||||
if context in [
|
||||
PageContext.NON_LEGACY_PAGE,
|
||||
PageContext.EDITOR,
|
||||
PageContext.ADDON_PAGE,
|
||||
PageContext.DECK_OPTIONS,
|
||||
]:
|
||||
pass
|
||||
elif context == PageContext.REVIEWER and request.path in (
|
||||
# does page have access to entire API?
|
||||
if _have_api_access():
|
||||
return
|
||||
|
||||
# whitelisted API endpoints for reviewer/previewer
|
||||
if request.path in (
|
||||
"/_anki/getSchedulingStatesWithContext",
|
||||
"/_anki/setSchedulingStates",
|
||||
"/_anki/i18nResources",
|
||||
):
|
||||
# reviewer is only allowed to access custom study methods
|
||||
pass
|
||||
elif (
|
||||
context == PageContext.PREVIEWER or context == PageContext.CARD_LAYOUT
|
||||
) and request.path == "/_anki/i18nResources":
|
||||
# previewers are only allowed to access i18n resources
|
||||
pass
|
||||
else:
|
||||
# other legacy pages may contain third-party JS, so we do not
|
||||
|
@ -752,23 +745,11 @@ def legacy_page_data() -> Response:
|
|||
return _text_response(HTTPStatus.NOT_FOUND, "page not found")
|
||||
|
||||
|
||||
def _extract_page_context() -> PageContext:
|
||||
"Get context based on referer header."
|
||||
from urllib.parse import parse_qs, urlparse
|
||||
_APIKEY = "".join(random.choices(string.ascii_letters + string.digits, k=32))
|
||||
|
||||
referer = urlparse(request.headers.get("Referer", ""))
|
||||
if referer.path.startswith("/_anki/pages/") or is_sveltekit_page(referer.path[1:]):
|
||||
return PageContext.NON_LEGACY_PAGE
|
||||
elif referer.path == "/_anki/legacyPageData":
|
||||
query_params = parse_qs(referer.query)
|
||||
query_id = query_params.get("id")
|
||||
if not query_id:
|
||||
return PageContext.UNKNOWN
|
||||
id = int(query_id[0])
|
||||
page_context = aqt.mw.mediaServer.get_page_context(id)
|
||||
return page_context if page_context else PageContext.UNKNOWN
|
||||
else:
|
||||
return PageContext.UNKNOWN
|
||||
|
||||
def _have_api_access() -> bool:
|
||||
return request.headers.get("Authorization") == f"Bearer {_APIKEY}"
|
||||
|
||||
|
||||
# this currently only handles a single method; in the future, idempotent
|
||||
|
|
|
@ -141,7 +141,7 @@ class AVPlayer:
|
|||
# audio be stopped?
|
||||
interrupt_current_audio = True
|
||||
# caller key for the current playback (optional)
|
||||
current_caller = None
|
||||
current_caller: Any = None
|
||||
# whether the last call to play_file_with_caller interrupted another
|
||||
current_caller_interrupted = False
|
||||
|
||||
|
@ -167,7 +167,7 @@ class AVPlayer:
|
|||
self._enqueued = []
|
||||
self._stop_if_playing()
|
||||
|
||||
def stop_and_clear_queue_if_caller(self, caller) -> None:
|
||||
def stop_and_clear_queue_if_caller(self, caller: Any) -> None:
|
||||
if caller == self.current_caller:
|
||||
self.stop_and_clear_queue()
|
||||
|
||||
|
@ -179,7 +179,7 @@ class AVPlayer:
|
|||
def play_file(self, filename: str) -> None:
|
||||
self.play_tags([SoundOrVideoTag(filename=filename)])
|
||||
|
||||
def play_file_with_caller(self, filename: str, caller) -> None:
|
||||
def play_file_with_caller(self, filename: str, caller: Any) -> None:
|
||||
if self.current_caller:
|
||||
self.current_caller_interrupted = True
|
||||
self.current_caller = caller
|
||||
|
|
|
@ -28,35 +28,88 @@ serverbaseurl = re.compile(r"^.+:\/\/[^\/]+")
|
|||
if TYPE_CHECKING:
|
||||
from aqt.mediasrv import PageContext
|
||||
|
||||
|
||||
# Page for debug messages
|
||||
##########################################################################
|
||||
|
||||
BridgeCommandHandler = Callable[[str], Any]
|
||||
|
||||
|
||||
class AnkiWebViewKind(Enum):
|
||||
"""Enum registry of all web views managed by Anki
|
||||
|
||||
The value of each entry corresponds to the web view's title.
|
||||
|
||||
When introducing a new web view, please add it to the registry below.
|
||||
"""
|
||||
|
||||
DEFAULT = "default"
|
||||
MAIN = "main webview"
|
||||
TOP_TOOLBAR = "top toolbar"
|
||||
BOTTOM_TOOLBAR = "bottom toolbar"
|
||||
DECK_OPTIONS = "deck options"
|
||||
EDITOR = "editor"
|
||||
LEGACY_DECK_STATS = "legacy deck stats"
|
||||
DECK_STATS = "deck stats"
|
||||
PREVIEWER = "previewer"
|
||||
CHANGE_NOTETYPE = "change notetype"
|
||||
CARD_LAYOUT = "card layout"
|
||||
BROWSER_CARD_INFO = "browser card info"
|
||||
IMPORT_CSV = "csv import"
|
||||
EMPTY_CARDS = "empty cards"
|
||||
FIND_DUPLICATES = "find duplicates"
|
||||
FIELDS = "fields"
|
||||
IMPORT_LOG = "import log"
|
||||
IMPORT_ANKI_PACKAGE = "anki package import"
|
||||
|
||||
|
||||
class AuthInterceptor(QWebEngineUrlRequestInterceptor):
|
||||
_api_enabled = False
|
||||
|
||||
def __init__(self, parent: QObject | None = None, api_enabled: bool = False):
|
||||
super().__init__(parent)
|
||||
self._api_enabled = api_enabled
|
||||
|
||||
def interceptRequest(self, info):
|
||||
from aqt.mediasrv import _APIKEY
|
||||
|
||||
if self._api_enabled:
|
||||
info.setHttpHeader(b"Authorization", f"Bearer {_APIKEY}".encode("utf-8"))
|
||||
|
||||
|
||||
_profile_with_api_access: QWebEngineProfile | None = None
|
||||
_profile_without_api_access: QWebEngineProfile | None = None
|
||||
|
||||
|
||||
class AnkiWebPage(QWebEnginePage):
|
||||
def __init__(self, onBridgeCmd: BridgeCommandHandler) -> None:
|
||||
QWebEnginePage.__init__(self)
|
||||
def __init__(
|
||||
self,
|
||||
onBridgeCmd: BridgeCommandHandler,
|
||||
kind: AnkiWebViewKind,
|
||||
parent: QObject | None,
|
||||
) -> None:
|
||||
profile = self._profileForPage(kind)
|
||||
QWebEnginePage.__init__(self, profile, parent)
|
||||
self._onBridgeCmd = onBridgeCmd
|
||||
self._kind = kind
|
||||
self._setupBridge()
|
||||
self.open_links_externally = True
|
||||
|
||||
def _setupBridge(self) -> None:
|
||||
class Bridge(QObject):
|
||||
def __init__(self, bridge_handler: Callable[[str], Any]) -> None:
|
||||
super().__init__()
|
||||
self.onCmd = bridge_handler
|
||||
def _profileForPage(self, kind: AnkiWebViewKind) -> QWebEngineProfile:
|
||||
have_api_access = kind in (
|
||||
AnkiWebViewKind.DECK_OPTIONS,
|
||||
AnkiWebViewKind.EDITOR,
|
||||
AnkiWebViewKind.DECK_STATS,
|
||||
AnkiWebViewKind.CHANGE_NOTETYPE,
|
||||
AnkiWebViewKind.BROWSER_CARD_INFO,
|
||||
)
|
||||
|
||||
@pyqtSlot(str, result=str) # type: ignore
|
||||
def cmd(self, str: str) -> Any:
|
||||
return json.dumps(self.onCmd(str))
|
||||
global _profile_with_api_access, _profile_without_api_access
|
||||
|
||||
self._bridge = Bridge(self._onCmd)
|
||||
# Use cached profile if available
|
||||
if have_api_access and _profile_with_api_access is not None:
|
||||
return _profile_with_api_access
|
||||
elif not have_api_access and _profile_without_api_access is not None:
|
||||
return _profile_without_api_access
|
||||
|
||||
self._channel = QWebChannel(self)
|
||||
self._channel.registerObject("py", self._bridge)
|
||||
self.setWebChannel(self._channel)
|
||||
# Create a new profile if not cached
|
||||
profile = QWebEngineProfile()
|
||||
|
||||
qwebchannel = ":/qtwebchannel/qwebchannel.js"
|
||||
jsfile = QFile(qwebchannel)
|
||||
|
@ -90,12 +143,35 @@ class AnkiWebPage(QWebEnginePage):
|
|||
script.setInjectionPoint(QWebEngineScript.InjectionPoint.DocumentReady)
|
||||
script.setRunsOnSubFrames(False)
|
||||
|
||||
profile = self.profile()
|
||||
assert profile is not None
|
||||
scripts = profile.scripts()
|
||||
assert scripts is not None
|
||||
scripts.insert(script)
|
||||
|
||||
interceptor = AuthInterceptor(profile, api_enabled=have_api_access)
|
||||
profile.setUrlRequestInterceptor(interceptor)
|
||||
if have_api_access:
|
||||
_profile_with_api_access = profile
|
||||
else:
|
||||
_profile_without_api_access = profile
|
||||
|
||||
return profile
|
||||
|
||||
def _setupBridge(self) -> None:
|
||||
class Bridge(QObject):
|
||||
def __init__(self, bridge_handler: Callable[[str], Any]) -> None:
|
||||
super().__init__()
|
||||
self.onCmd = bridge_handler
|
||||
|
||||
@pyqtSlot(str, result=str) # type: ignore
|
||||
def cmd(self, str: str) -> Any:
|
||||
return json.dumps(self.onCmd(str))
|
||||
|
||||
self._bridge = Bridge(self._onCmd)
|
||||
|
||||
self._channel = QWebChannel(self)
|
||||
self._channel.registerObject("py", self._bridge)
|
||||
self.setWebChannel(self._channel)
|
||||
|
||||
def javaScriptConsoleMessage(
|
||||
self,
|
||||
level: QWebEnginePage.JavaScriptConsoleMessageLevel,
|
||||
|
@ -247,34 +323,6 @@ class WebContent:
|
|||
##########################################################################
|
||||
|
||||
|
||||
class AnkiWebViewKind(Enum):
|
||||
"""Enum registry of all web views managed by Anki
|
||||
|
||||
The value of each entry corresponds to the web view's title.
|
||||
|
||||
When introducing a new web view, please add it to the registry below.
|
||||
"""
|
||||
|
||||
DEFAULT = "default"
|
||||
MAIN = "main webview"
|
||||
TOP_TOOLBAR = "top toolbar"
|
||||
BOTTOM_TOOLBAR = "bottom toolbar"
|
||||
DECK_OPTIONS = "deck options"
|
||||
EDITOR = "editor"
|
||||
LEGACY_DECK_STATS = "legacy deck stats"
|
||||
DECK_STATS = "deck stats"
|
||||
PREVIEWER = "previewer"
|
||||
CHANGE_NOTETYPE = "change notetype"
|
||||
CARD_LAYOUT = "card layout"
|
||||
BROWSER_CARD_INFO = "browser card info"
|
||||
IMPORT_CSV = "csv import"
|
||||
EMPTY_CARDS = "empty cards"
|
||||
FIND_DUPLICATES = "find duplicates"
|
||||
FIELDS = "fields"
|
||||
IMPORT_LOG = "import log"
|
||||
IMPORT_ANKI_PACKAGE = "anki package import"
|
||||
|
||||
|
||||
class AnkiWebView(QWebEngineView):
|
||||
allow_drops = False
|
||||
_kind: AnkiWebViewKind
|
||||
|
@ -287,11 +335,8 @@ class AnkiWebView(QWebEngineView):
|
|||
) -> None:
|
||||
QWebEngineView.__init__(self, parent=parent)
|
||||
self.set_kind(kind)
|
||||
if title:
|
||||
self.set_title(title)
|
||||
self._page = AnkiWebPage(self._onBridgeCmd)
|
||||
# reduce flicker
|
||||
self._page.setBackgroundColor(theme_manager.qcolor(colors.CANVAS))
|
||||
self.page().setBackgroundColor(theme_manager.qcolor(colors.CANVAS))
|
||||
|
||||
# in new code, use .set_bridge_command() instead of setting this directly
|
||||
self.onBridgeCmd: Callable[[str], Any] = self.defaultOnBridgeCmd
|
||||
|
@ -299,7 +344,6 @@ class AnkiWebView(QWebEngineView):
|
|||
self._domDone = True
|
||||
self._pendingActions: list[tuple[str, Sequence[Any]]] = []
|
||||
self.requiresCol = True
|
||||
self.setPage(self._page)
|
||||
self._disable_zoom = False
|
||||
|
||||
self.resetHandlers()
|
||||
|
@ -323,6 +367,15 @@ class AnkiWebView(QWebEngineView):
|
|||
def set_kind(self, kind: AnkiWebViewKind) -> None:
|
||||
self._kind = kind
|
||||
self.set_title(kind.value)
|
||||
# this is an ugly hack to avoid breakages caused by
|
||||
# creating a default webview then immediately calling set_kind, which results
|
||||
# in the creation of two pages, and the second fails as the domDone
|
||||
# signal from the first one is received
|
||||
if kind != AnkiWebViewKind.DEFAULT:
|
||||
self.setPage(AnkiWebPage(self._onBridgeCmd, kind, self))
|
||||
|
||||
def page(self) -> AnkiWebPage:
|
||||
return cast(AnkiWebPage, super().page())
|
||||
|
||||
@property
|
||||
def kind(self) -> AnkiWebViewKind:
|
||||
|
@ -357,7 +410,7 @@ class AnkiWebView(QWebEngineView):
|
|||
return False
|
||||
|
||||
def set_open_links_externally(self, enable: bool) -> None:
|
||||
self._page.open_links_externally = enable
|
||||
self.page().open_links_externally = enable
|
||||
|
||||
def onEsc(self) -> None:
|
||||
w = self.parent()
|
||||
|
@ -822,7 +875,7 @@ html {{ {font} }}
|
|||
Must be done on Windows prior to changing current working directory."""
|
||||
self.requiresCol = False
|
||||
self._domReady = False
|
||||
self._page.setContent(cast(QByteArray, bytes("", "ascii")))
|
||||
self.page().setContent(cast(QByteArray, bytes("", "ascii")))
|
||||
|
||||
def cleanup(self) -> None:
|
||||
try:
|
||||
|
@ -836,14 +889,14 @@ html {{ {font} }}
|
|||
# defer page cleanup so that in-flight requests have a chance to complete first
|
||||
# https://forums.ankiweb.net/t/error-when-exiting-browsing-when-the-software-is-installed-in-the-path-c-program-files-anki/38363
|
||||
mw.progress.single_shot(5000, lambda: mw.mediaServer.clear_page_html(id(self)))
|
||||
self._page.deleteLater()
|
||||
self.page().deleteLater()
|
||||
|
||||
def on_theme_did_change(self) -> None:
|
||||
# avoid flashes if page reloaded
|
||||
self._page.setBackgroundColor(theme_manager.qcolor(colors.CANVAS))
|
||||
self.page().setBackgroundColor(theme_manager.qcolor(colors.CANVAS))
|
||||
if hasattr(QWebEngineSettings.WebAttribute, "ForceDarkMode"):
|
||||
force_dark_mode = getattr(QWebEngineSettings.WebAttribute, "ForceDarkMode")
|
||||
page_settings = self._page.settings()
|
||||
page_settings = self.page().settings()
|
||||
if page_settings is not None:
|
||||
page_settings.setAttribute(
|
||||
force_dark_mode,
|
||||
|
|
Loading…
Reference in a new issue