Back to Deck
[utility]
Python Async Queue Runner
ИСХОДНИК Python
ВЕРСИЯ 1.0
АВТОР Cododel
Утилитарный класс для создания очереди асинхронных задач с chainable interface (цепным интерфейсом). Позволяет последовательно добавлять задачи, устанавливать аргументы и запускать их одну за другой.
Основные возможности
- Цепной интерфейс для удобного добавления задач
- Последовательное выполнение всех задач в очереди
- Поддержка позиционных и именованных аргументов
- Простая и понятная структура
Использование
import asyncioimport randomfrom services.QueueRunner import QueueRunner
async def task(val): sleep = random.uniform(0.05, 1.0) print(f'{val} sleeping for {sleep:.2f} seconds') await asyncio.sleep(sleep) print(f'{val} woke up')
async def main(): queue = QueueRunner()
# Добавляем 10 задач в очередь с цепным интерфейсом for i in range(10): queue.task(task).args(i)
# Запускаем последовательное выполнение await queue.run()
asyncio.run(main())Исходный код
AsyncQueueRunner.py
from typing import List, Callable
class QueueTask: task: Callable _args: tuple = tuple() _kwargs: dict = dict()
def __init__(self, task: Callable): self.task = task
def args(self, *args, **kwargs): self._args = args self._kwargs = kwargs return self
async def run(self): await self.task(*self._args, **self._kwargs)
class QueueRunner: queue: List[QueueTask] = []
def task(self, task: Callable): self.queue.append(QueueTask(task)) return self
def args(self, *args, **kwargs): self.queue[-1].args(*args, **kwargs) return self
async def run(self): for task in self.queue: await task.run()