mirror of
https://github.com/bec-project/ophyd_devices.git
synced 2025-06-12 14:27:14 +02:00
feat (psi-device-base-utils): enhance TaskHandler to support task arguments in submit_task method
This commit is contained in:
@ -48,6 +48,44 @@ def test_utils_task_status(device):
|
||||
assert status.state == "completed"
|
||||
|
||||
|
||||
def test_utils_task_handler_submit_task_with_args(task_handler):
|
||||
"""Ensure that task_handler has a submit_task method"""
|
||||
|
||||
def my_task(input_arg: bool, input_kwarg: bool = False):
|
||||
if input_kwarg is True:
|
||||
raise ValueError("input_kwarg is True")
|
||||
if input_arg is True:
|
||||
return True
|
||||
return False
|
||||
|
||||
# This should fail
|
||||
with pytest.raises(TypeError):
|
||||
status = task_handler.submit_task(my_task)
|
||||
status.wait()
|
||||
# This should pass
|
||||
|
||||
task_stopped = threading.Event()
|
||||
|
||||
def finished_cb():
|
||||
task_stopped.set()
|
||||
|
||||
status = task_handler.submit_task(
|
||||
my_task, task_args=(True,), task_kwargs={"input_kwarg": False}
|
||||
)
|
||||
status.add_callback(finished_cb)
|
||||
task_stopped.wait()
|
||||
assert status.done is True
|
||||
assert status.state == TaskState.COMPLETED
|
||||
# This should fail
|
||||
task_stopped = threading.Event()
|
||||
status = task_handler.submit_task(my_task, task_args=(True,), task_kwargs={"input_kwarg": True})
|
||||
with pytest.raises(ValueError):
|
||||
status.wait()
|
||||
assert status.state == TaskState.ERROR
|
||||
assert status.done is True
|
||||
assert status.exception().__class__ == ValueError
|
||||
|
||||
|
||||
@pytest.mark.timeout(100)
|
||||
def test_utils_task_handler_task_killed(task_handler):
|
||||
"""Ensure that task_handler has a submit_task method"""
|
||||
|
Reference in New Issue
Block a user