Serialize CollectionOp, and QueryOp by default

Closes #2628
This commit is contained in:
Damien Elmes 2023-09-15 15:26:46 +10:00
parent e2603a73f5
commit 055d663970
2 changed files with 32 additions and 6 deletions

View file

@ -201,11 +201,22 @@ class QueryOp(Generic[T]):
self._parent = parent
self._op = op
self._success = success
self._uses_collection = True
def failure(self, failure: Callable[[Exception], Any] | None) -> QueryOp[T]:
self._failure = failure
return self
def without_collection(self) -> QueryOp[T]:
"""Flag this QueryOp as not needing the collection.
Operations that access the collection are serialized. If you're doing
something like a series of network queries, and your operation does not
access the collection, then you can call this to allow the requests to
run in parallel."""
self._uses_collection = False
return self
def with_progress(
self,
label: str | None = None,
@ -269,4 +280,6 @@ class QueryOp(Generic[T]):
elif self._progress:
mw.taskman.with_progress(op, on_done, label=label, parent=self._parent)
else:
mw.taskman.run_in_background(op, on_done)
mw.taskman.run_in_background(
op, on_done, uses_collection=self._uses_collection
)

View file

@ -28,7 +28,8 @@ class TaskManager(QObject):
def __init__(self, mw: aqt.AnkiQt) -> None:
QObject.__init__(self)
self.mw = mw.weakref()
self._executor = ThreadPoolExecutor()
self._no_collection_executor = ThreadPoolExecutor()
self._collection_executor = ThreadPoolExecutor(max_workers=1)
self._closures: list[Closure] = []
self._closures_lock = Lock()
qconnect(self._closures_pending, self._on_closures_pending)
@ -44,6 +45,7 @@ class TaskManager(QObject):
task: Callable,
on_done: Callable[[Future], None] | None = None,
args: dict[str, Any] | None = None,
uses_collection=True,
) -> Future:
"""Use QueryOp()/CollectionOp() in new code.
@ -52,7 +54,11 @@ class TaskManager(QObject):
If on_done is provided, it will be called on the main thread with
the completed future.
Args if provided will be passed on as keyword arguments to the task callable."""
Args if provided will be passed on as keyword arguments to the task callable.
Tasks that access the collection are serialized. If you're doing things that
don't require the collection (e.g. network requests), you can pass uses_collection
=False to allow multiple tasks to run in parallel."""
# Before we launch a background task, ensure any pending on_done closure are run on
# main. Qt's signal/slot system will have posted a notification, but it may
# not have been processed yet. The on_done() closures may make small queries
@ -64,7 +70,12 @@ class TaskManager(QObject):
if args is None:
args = {}
fut = self._executor.submit(task, **args)
executor = (
self._collection_executor
if uses_collection
else self._no_collection_executor
)
fut = executor.submit(task, **args)
if on_done is not None:
fut.add_done_callback(
@ -80,6 +91,7 @@ class TaskManager(QObject):
parent: QWidget | None = None,
label: str | None = None,
immediate: bool = False,
uses_collection=True,
) -> None:
"Use QueryOp()/CollectionOp() in new code."
self.mw.progress.start(parent=parent, label=label, immediate=immediate)
@ -89,7 +101,7 @@ class TaskManager(QObject):
if on_done:
on_done(fut)
self.run_in_background(task, wrapped_done)
self.run_in_background(task, wrapped_done, uses_collection=uses_collection)
def with_backend_progress(
self,
@ -98,6 +110,7 @@ class TaskManager(QObject):
on_done: Callable[[Future], None] | None = None,
parent: QWidget | None = None,
start_label: str | None = None,
uses_collection=True,
) -> None:
self.mw.progress.start_with_backend_updates(
progress_update,
@ -110,7 +123,7 @@ class TaskManager(QObject):
if on_done:
on_done(fut)
self.run_in_background(task, wrapped_done)
self.run_in_background(task, wrapped_done, uses_collection=uses_collection)
def _on_closures_pending(self) -> None:
"""Run any pending closures. This runs in the main thread."""