What's Changed
* pipelining, idle task reclaiming by Graeme22 in https://github.com/tastyware/streaq/pull/14
- Idle tasks are now re-enqueued by workers with a running scheduler process. This gives stronger pessimistic execution guarantees when eg a worker shuts down suddenly and doesn't get time to handle a signal.
- Tasks can now be pipelined (so results of one task can be fed into another):
python
worker.task()
async def double(ctx: WrappedContext[Context], val: int) -> int:
return val * 2
worker.task()
async def is_even(ctx: WrappedContext[Context], val: int) -> bool:
return val % 2 == 0
async with worker:
task = await double.enqueue(3).then(is_even)
print(await task.result())
This is useful for ETL pipelines or similar tasks, where each task builds upon the result of the previous one. For pipelined tasks, positional arguments must all come from the previous task (tuple outputs will be unpacked), and any additional arguments can be passed as kwargs to `then()`.
**Full Changelog**: https://github.com/tastyware/streaq/compare/v0.6.2...v0.7.0