More tweaks to API security

- Allow custom study methods in reviewer to prevent errors
- Ensure we 'fail closed' if referer header has been removed
- Ensure we ignore opaque POST requests from other origins

Thanks again to Daniel for the feedback.
This commit is contained in:
Damien Elmes 2023-11-09 20:34:22 +10:00
parent b3da6f117b
commit f73eb01047
3 changed files with 57 additions and 31 deletions

View file

@ -64,15 +64,18 @@ class NotFound:
DynamicRequest = Callable[[], Response] DynamicRequest = Callable[[], Response]
class LegacyPageContext(enum.Enum): class PageContext(enum.Enum):
OTHER = 0 UNKNOWN = 0
EDITOR = 1 EDITOR = 1
REVIEWER = 2
# something in /_anki/pages/
NON_LEGACY_PAGE = 3
@dataclass @dataclass
class LegacyPage: class LegacyPage:
html: str html: str
context: LegacyPageContext context: PageContext
class MediaServer(threading.Thread): class MediaServer(threading.Thread):
@ -126,7 +129,7 @@ class MediaServer(threading.Thread):
return int(self.server.effective_port) # type: ignore return int(self.server.effective_port) # type: ignore
def set_page_html( def set_page_html(
self, id: int, html: str, context: LegacyPageContext = LegacyPageContext.OTHER self, id: int, html: str, context: PageContext = PageContext.UNKNOWN
) -> None: ) -> None:
self._legacy_pages[id] = LegacyPage(html, context) self._legacy_pages[id] = LegacyPage(html, context)
@ -136,7 +139,7 @@ class MediaServer(threading.Thread):
else: else:
return None return None
def get_page_context(self, id: int) -> LegacyPageContext | None: def get_page_context(self, id: int) -> PageContext | None:
if page := self._legacy_pages.get(id): if page := self._legacy_pages.get(id):
return page.context return page.context
else: else:
@ -627,21 +630,40 @@ def _extract_collection_post_request(path: str) -> DynamicRequest | NotFound:
return NotFound(message=f"{path} not found") return NotFound(message=f"{path} not found")
def _handle_dynamic_request(request: DynamicRequest) -> Response: def _check_dynamic_request_permissions():
if legacy_context := _extract_legacy_page_context(): if request.method == "GET":
# legacy pages, apart from the editor, may contain third-party JS, so we do not return
context = _extract_page_context()
def warn() -> None:
show_warning(
"Unexpected API access. Please report this message on the Anki forums."
)
# check content type header to ensure this isn't an opaque request from another origin
if request.headers["Content-type"] != "application/binary":
aqt.mw.taskman.run_on_main(warn)
abort(403)
if context == PageContext.NON_LEGACY_PAGE or context == PageContext.EDITOR:
pass
elif context == PageContext.REVIEWER and request.path in (
"/_anki/getSchedulingStatesWithContext",
"/_anki/setSchedulingStates",
):
# reviewer is only allowed to access custom study methods
pass
else:
# other legacy pages may contain third-party JS, so we do not
# allow them to access our API # allow them to access our API
if legacy_context != LegacyPageContext.EDITOR: aqt.mw.taskman.run_on_main(warn)
abort(403)
def warn() -> None:
show_warning(
"Unexpected API access. Please report this message on the Anki forums."
)
aqt.mw.taskman.run_on_main(warn) def _handle_dynamic_request(req: DynamicRequest) -> Response:
abort(403) _check_dynamic_request_permissions()
try: try:
return request() return req()
except Exception as e: except Exception as e:
return flask.make_response(str(e), HTTPStatus.INTERNAL_SERVER_ERROR) return flask.make_response(str(e), HTTPStatus.INTERNAL_SERVER_ERROR)
@ -654,18 +676,19 @@ def legacy_page_data() -> Response:
return flask.make_response("page not found", HTTPStatus.NOT_FOUND) return flask.make_response("page not found", HTTPStatus.NOT_FOUND)
def _extract_legacy_page_context() -> LegacyPageContext | None: def _extract_page_context() -> PageContext:
"Get context based on referer header." "Get context based on referer header."
from urllib.parse import parse_qs, urlparse from urllib.parse import parse_qs, urlparse
referer = request.headers.get("Referer", "") referer = urlparse(request.headers.get("Referer", ""))
if "legacyPageData" in referer: if referer.path.startswith("/_anki/pages/"):
parsed_url = urlparse(referer) return PageContext.NON_LEGACY_PAGE
query_params = parse_qs(parsed_url.query) elif referer.path == "/_anki/legacyPageData":
query_params = parse_qs(referer.query)
id = int(query_params.get("id", [None])[0]) id = int(query_params.get("id", [None])[0])
return aqt.mw.mediaServer.get_page_context(id) return aqt.mw.mediaServer.get_page_context(id)
else: else:
return None return PageContext.UNKNOWN
# this currently only handles a single method; in the future, idempotent # this currently only handles a single method; in the future, idempotent

View file

@ -23,7 +23,7 @@ from aqt.utils import askUser, is_gesture_or_zoom_event, openLink, showInfo, tr
serverbaseurl = re.compile(r"^.+:\/\/[^\/]+") serverbaseurl = re.compile(r"^.+:\/\/[^\/]+")
if TYPE_CHECKING: if TYPE_CHECKING:
from aqt.mediasrv import LegacyPageContext from aqt.mediasrv import PageContext
# Page for debug messages # Page for debug messages
@ -383,21 +383,21 @@ class AnkiWebView(QWebEngineView):
super().dropEvent(evt) super().dropEvent(evt)
def setHtml( # type: ignore[override] def setHtml( # type: ignore[override]
self, html: str, context: LegacyPageContext | None = None self, html: str, context: PageContext | None = None
) -> None: ) -> None:
from aqt.mediasrv import LegacyPageContext from aqt.mediasrv import PageContext
# discard any previous pending actions # discard any previous pending actions
self._pendingActions = [] self._pendingActions = []
self._domDone = True self._domDone = True
if context is None: if context is None:
context = LegacyPageContext.OTHER context = PageContext.UNKNOWN
self._queueAction("setHtml", html, context) self._queueAction("setHtml", html, context)
self.set_open_links_externally(True) self.set_open_links_externally(True)
self.allow_drops = False self.allow_drops = False
self.show() self.show()
def _setHtml(self, html: str, context: LegacyPageContext) -> None: def _setHtml(self, html: str, context: PageContext) -> None:
"""Send page data to media server, then surf to it. """Send page data to media server, then surf to it.
This function used to be implemented by QWebEngine's This function used to be implemented by QWebEngine's
@ -582,12 +582,15 @@ html {{ {font} }}
</html>""" </html>"""
# print(html) # print(html)
import aqt.editor import aqt.editor
from aqt.mediasrv import LegacyPageContext import aqt.reviewer
from aqt.mediasrv import PageContext
if isinstance(context, aqt.editor.Editor): if isinstance(context, aqt.editor.Editor):
page_context = LegacyPageContext.EDITOR page_context = PageContext.EDITOR
elif isinstance(context, aqt.reviewer.Reviewer):
page_context = PageContext.REVIEWER
else: else:
page_context = LegacyPageContext.OTHER page_context = PageContext.UNKNOWN
self.setHtml(html, page_context) self.setHtml(html, page_context)
@classmethod @classmethod

View file

@ -29,7 +29,7 @@ async function postProtoInner(url: string, body: Uint8Array): Promise<Uint8Array
const result = await fetch(url, { const result = await fetch(url, {
method: "POST", method: "POST",
headers: { headers: {
"Content-Type": "application/octet-stream", "Content-Type": "application/binary",
}, },
body, body,
}); });