CDX-301i · Module 1
Queue-Based Dispatch
3 min read
Direct agent invocation works for ad-hoc workflows but breaks at enterprise scale. When hundreds of tasks arrive per hour — CI-triggered reviews, issue triage, automated fixes, documentation updates — you need a queue. Queue-based dispatch decouples task submission from task execution: producers add tasks to a queue, and a dispatcher assigns them to available agents based on priority, capacity, and agent specialization.
The dispatch architecture has three components: the task queue (ordered by priority and submission time), the agent pool (a set of available agents with their current status and capabilities), and the dispatcher (the logic that matches tasks to agents). The dispatcher runs continuously, pulling the highest-priority unassigned task and assigning it to the best-matched available agent. When an agent completes its task, it reports back and becomes available for the next assignment.
from dataclasses import dataclass, field
from enum import IntEnum
import heapq
from datetime import datetime
class Priority(IntEnum):
CRITICAL = 0 # Production incidents
HIGH = 1 # CI failures, blocking PRs
NORMAL = 2 # Standard tasks
LOW = 3 # Documentation, cleanup
@dataclass(order=True)
class QueuedTask:
priority: Priority
submitted_at: datetime = field(compare=True)
task_id: str = field(compare=False)
task_type: str = field(compare=False) # "review", "fix", "test"
payload: dict = field(compare=False, default_factory=dict)
class TaskDispatcher:
def __init__(self):
self.queue: list[QueuedTask] = []
self.agents: dict[str, dict] = {} # name -> status
def submit(self, task: QueuedTask):
heapq.heappush(self.queue, task)
def dispatch_next(self) -> tuple[str, QueuedTask] | None:
available = [
name for name, info in self.agents.items()
if info["status"] == "idle"
]
if not available or not self.queue:
return None
task = heapq.heappop(self.queue)
agent = self._best_match(available, task)
self.agents[agent]["status"] = "busy"
return (agent, task)
- Define task types Enumerate the task types your system handles: review, fix, test, document, triage. Each type maps to a queue priority and agent specialization.
- Implement the queue Use a priority queue ordered by (priority, submission_time). Critical tasks preempt normal tasks; within the same priority, FIFO ordering is fair.
- Build the dispatcher loop The dispatcher polls continuously: pull highest-priority task, find best-matched idle agent, assign, repeat. Add backpressure when all agents are busy.