Skip to content

マルチエージェントシステム

注: この記事は英語版からの翻訳です。コードブロックおよびMermaidダイアグラムは原文のまま保持しています。

TL;DR

マルチエージェントシステムは、複数の専門LLMエージェントを連携させ、単一エージェントの能力を超える複雑なタスクを解決します。主要なパターンには、階層型オーケストレーション(スーパーバイザーエージェント)、ピアツーピア協調、ディベート/対抗型システム、アセンブリラインパイプラインがあります。成功の鍵は、明確な役割定義、構造化された通信プロトコル、状態管理、および競合解決戦略にあります。


単一エージェントの課題

単一エージェントは、複雑で多面的なタスクに直面すると根本的な限界に達します。


マルチエージェントアーキテクチャパターン

パターン1: 階層型(スーパーバイザー)アーキテクチャ

スーパーバイザーエージェントが専門のワーカーエージェントを調整します。

python
from typing import Literal
from pydantic import BaseModel
import asyncio

class TaskAssignment(BaseModel):
    """Supervisor's decision on task routing."""
    agent: Literal["researcher", "coder", "reviewer", "done"]
    task_description: str
    context: str
    priority: int = 1

class SupervisorAgent:
    """Coordinates specialized worker agents."""

    def __init__(self, llm_client):
        self.llm = llm_client
        self.workers = {
            "researcher": ResearchAgent(llm_client),
            "coder": CodeAgent(llm_client),
            "reviewer": ReviewAgent(llm_client),
        }
        self.conversation_history = []
        self.task_results = {}

    async def run(self, user_request: str, max_iterations: int = 10) -> str:
        """Main orchestration loop."""
        self.conversation_history.append({
            "role": "user",
            "content": user_request
        })

        for iteration in range(max_iterations):
            # Supervisor decides next action
            assignment = await self._decide_next_action()

            if assignment.agent == "done":
                return await self._synthesize_final_response()

            # Delegate to appropriate worker
            worker = self.workers[assignment.agent]
            result = await worker.execute(
                task=assignment.task_description,
                context=assignment.context,
                previous_results=self.task_results
            )

            # Track results
            self.task_results[f"{assignment.agent}_{iteration}"] = result
            self.conversation_history.append({
                "role": "assistant",
                "content": f"[{assignment.agent}]: {result.summary}"
            })

        return await self._synthesize_final_response()

    async def _decide_next_action(self) -> TaskAssignment:
        """Supervisor LLM decides which agent to invoke next."""
        system_prompt = """You are a supervisor coordinating a team of agents:
        - researcher: Gathers information, searches web, analyzes documents
        - coder: Writes, debugs, and tests code
        - reviewer: Reviews code for quality, security, best practices
        - done: All tasks complete, ready to deliver final response

        Based on the conversation and completed tasks, decide the next action.
        Return a JSON with: agent, task_description, context, priority"""

        response = await self.llm.generate(
            system=system_prompt,
            messages=self.conversation_history,
            response_format=TaskAssignment
        )
        return response

    async def _synthesize_final_response(self) -> str:
        """Combine all worker results into coherent response."""
        synthesis_prompt = f"""Synthesize the following agent outputs into
        a coherent final response:

        {self.task_results}

        Original request: {self.conversation_history[0]['content']}
        """
        return await self.llm.generate(synthesis_prompt)


class ResearchAgent:
    """Specialized agent for research tasks."""

    def __init__(self, llm_client):
        self.llm = llm_client
        self.tools = [WebSearchTool(), DocumentAnalyzer(), CitationManager()]

    async def execute(self, task: str, context: str, previous_results: dict):
        system_prompt = """You are a research specialist. Your capabilities:
        - Search the web for current information
        - Analyze and summarize documents
        - Provide properly cited sources

        Be thorough but concise. Always cite sources."""

        # ReAct loop for research
        return await self._react_loop(system_prompt, task, context)

パターン2: ピアツーピア協調

中央のコーディネーションなしに、エージェント同士が直接通信します。

各エージェントは以下が可能です: 全体へのブロードキャスト、ダイレクトメッセージ送信、ヘルプのリクエスト、意思決定への投票

python
import asyncio
from dataclasses import dataclass
from typing import Optional
from enum import Enum

class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    BROADCAST = "broadcast"
    VOTE_REQUEST = "vote_request"
    VOTE = "vote"

@dataclass
class AgentMessage:
    sender: str
    recipient: Optional[str]  # None = broadcast
    message_type: MessageType
    content: dict
    correlation_id: str

class MessageBus:
    """Central message broker for agent communication."""

    def __init__(self):
        self.subscribers: dict[str, asyncio.Queue] = {}
        self.message_history: list[AgentMessage] = []

    def register(self, agent_id: str) -> asyncio.Queue:
        """Register an agent to receive messages."""
        queue = asyncio.Queue()
        self.subscribers[agent_id] = queue
        return queue

    async def publish(self, message: AgentMessage):
        """Publish a message to appropriate recipients."""
        self.message_history.append(message)

        if message.recipient is None:
            # Broadcast to all except sender
            for agent_id, queue in self.subscribers.items():
                if agent_id != message.sender:
                    await queue.put(message)
        else:
            # Direct message
            if message.recipient in self.subscribers:
                await self.subscribers[message.recipient].put(message)


class PeerAgent:
    """Base class for peer-to-peer agents."""

    def __init__(self, agent_id: str, role: str, llm_client, message_bus: MessageBus):
        self.agent_id = agent_id
        self.role = role
        self.llm = llm_client
        self.bus = message_bus
        self.inbox = message_bus.register(agent_id)
        self.pending_requests: dict[str, asyncio.Future] = {}

    async def run(self):
        """Main agent loop - process incoming messages."""
        while True:
            message = await self.inbox.get()
            asyncio.create_task(self._handle_message(message))

    async def _handle_message(self, message: AgentMessage):
        """Process incoming message based on type."""
        if message.message_type == MessageType.REQUEST:
            response = await self._process_request(message)
            await self.send(
                recipient=message.sender,
                message_type=MessageType.RESPONSE,
                content=response,
                correlation_id=message.correlation_id
            )
        elif message.message_type == MessageType.RESPONSE:
            if message.correlation_id in self.pending_requests:
                self.pending_requests[message.correlation_id].set_result(message)
        elif message.message_type == MessageType.BROADCAST:
            await self._handle_broadcast(message)

    async def request(self, recipient: str, content: dict, timeout: float = 30) -> dict:
        """Send request and wait for response."""
        correlation_id = str(uuid.uuid4())
        future = asyncio.Future()
        self.pending_requests[correlation_id] = future

        await self.send(recipient, MessageType.REQUEST, content, correlation_id)

        try:
            response = await asyncio.wait_for(future, timeout)
            return response.content
        finally:
            del self.pending_requests[correlation_id]

    async def broadcast(self, content: dict):
        """Broadcast message to all agents."""
        await self.send(None, MessageType.BROADCAST, content, str(uuid.uuid4()))


class PlannerAgent(PeerAgent):
    """Plans and coordinates task execution."""

    async def _process_request(self, message: AgentMessage) -> dict:
        if message.content.get("action") == "create_plan":
            plan = await self._create_plan(message.content["task"])

            # Broadcast plan for feedback
            await self.broadcast({
                "action": "plan_proposal",
                "plan": plan,
                "request_feedback": True
            })

            return {"status": "plan_created", "plan": plan}

    async def _create_plan(self, task: str) -> list[dict]:
        response = await self.llm.generate(
            system="You are a planning specialist. Break down tasks into steps.",
            prompt=f"Create a plan for: {task}"
        )
        return response


class CriticAgent(PeerAgent):
    """Reviews and critiques other agents' outputs."""

    async def _handle_broadcast(self, message: AgentMessage):
        if message.content.get("action") == "plan_proposal":
            critique = await self._critique_plan(message.content["plan"])

            # Send critique back to planner
            await self.send(
                recipient=message.sender,
                message_type=MessageType.RESPONSE,
                content={"critique": critique},
                correlation_id=message.correlation_id
            )

パターン3: ディベート / 対抗型パターン

複数のエージェントがディベートを行い、出力の品質を向上させます。

python
class DebateSystem:
    """Implements adversarial debate between agents."""

    def __init__(self, llm_client, num_rounds: int = 3):
        self.llm = llm_client
        self.num_rounds = num_rounds
        self.proposer = DebateAgent("proposer", "advocate", llm_client)
        self.challenger = DebateAgent("challenger", "critic", llm_client)
        self.judge = JudgeAgent(llm_client)

    async def debate(self, topic: str) -> dict:
        """Run a full debate on a topic."""
        debate_history = []

        # Initial proposal
        proposal = await self.proposer.make_argument(
            topic=topic,
            role="proposer",
            history=[]
        )
        debate_history.append({"agent": "proposer", "argument": proposal})

        # Debate rounds
        for round_num in range(self.num_rounds):
            # Challenger responds
            challenge = await self.challenger.make_argument(
                topic=topic,
                role="challenger",
                history=debate_history
            )
            debate_history.append({"agent": "challenger", "argument": challenge})

            # Proposer responds
            response = await self.proposer.make_argument(
                topic=topic,
                role="proposer",
                history=debate_history
            )
            debate_history.append({"agent": "proposer", "argument": response})

        # Judge decides
        verdict = await self.judge.evaluate(topic, debate_history)

        return {
            "debate_history": debate_history,
            "verdict": verdict,
            "final_answer": verdict["synthesized_answer"]
        }


class DebateAgent:
    """Agent that participates in debates."""

    ROLE_PROMPTS = {
        "proposer": """You are advocating for your position.
        Make strong arguments, address counterpoints, and defend your stance.
        Be persuasive but intellectually honest.""",

        "challenger": """You are challenging the proposal.
        Find weaknesses, ask probing questions, propose alternatives.
        Be critical but constructive."""
    }

    async def make_argument(self, topic: str, role: str, history: list) -> str:
        formatted_history = "\n".join([
            f"{h['agent']}: {h['argument']}" for h in history
        ])

        response = await self.llm.generate(
            system=self.ROLE_PROMPTS[role],
            prompt=f"""Topic: {topic}

            Debate so far:
            {formatted_history}

            Make your next argument (2-3 paragraphs):"""
        )
        return response


class JudgeAgent:
    """Evaluates debate and synthesizes final answer."""

    async def evaluate(self, topic: str, history: list) -> dict:
        formatted = "\n".join([f"{h['agent']}: {h['argument']}" for h in history])

        response = await self.llm.generate(
            system="""You are an impartial judge evaluating a debate.
            Consider the strength of arguments, evidence provided, and logical consistency.
            Synthesize the best elements from both sides into a final answer.""",
            prompt=f"""Topic: {topic}

            Full debate:
            {formatted}

            Provide:
            1. Analysis of each side's strongest points
            2. Weaknesses in each position
            3. Your synthesized final answer combining the best insights""",
            response_format=JudgeVerdict
        )
        return response

パターン4: アセンブリライン(パイプライン)

専門化されたステージによる逐次処理を行います。

各ステージは以下の特徴を持ちます: 特定の責務を持つ、前のステージにリジェクト/差し戻しが可能、次のステージのためにメタデータを付加する

python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field

@dataclass
class PipelineContext:
    """Carries data through the pipeline."""
    original_input: str
    current_data: any
    stage_outputs: dict = field(default_factory=dict)
    metadata: dict = field(default_factory=dict)
    errors: list = field(default_factory=list)

class PipelineStage(ABC):
    """Base class for pipeline stages."""

    @abstractmethod
    async def process(self, context: PipelineContext) -> PipelineContext:
        pass

    @abstractmethod
    def can_process(self, context: PipelineContext) -> bool:
        pass


class CodeGenerationPipeline:
    """Assembly line for code generation."""

    def __init__(self, llm_client):
        self.stages = [
            RequirementsAgent(llm_client),
            ArchitectureAgent(llm_client),
            ImplementationAgent(llm_client),
            TestingAgent(llm_client),
            ReviewAgent(llm_client),
        ]

    async def run(self, user_request: str) -> PipelineContext:
        context = PipelineContext(
            original_input=user_request,
            current_data=user_request
        )

        for stage in self.stages:
            if not stage.can_process(context):
                context.errors.append(f"Stage {stage.__class__.__name__} cannot process")
                break

            context = await stage.process(context)
            context.stage_outputs[stage.__class__.__name__] = context.current_data

            # Check for stage failures
            if context.metadata.get("stage_failed"):
                # Optionally retry or route to error handling
                break

        return context


class RequirementsAgent(PipelineStage):
    """Extracts and clarifies requirements."""

    def __init__(self, llm_client):
        self.llm = llm_client

    def can_process(self, context: PipelineContext) -> bool:
        return isinstance(context.current_data, str)

    async def process(self, context: PipelineContext) -> PipelineContext:
        response = await self.llm.generate(
            system="""Extract structured requirements from the user request.
            Identify: functional requirements, non-functional requirements,
            constraints, and ambiguities that need clarification.""",
            prompt=context.current_data,
            response_format=Requirements
        )

        context.current_data = response
        context.metadata["requirements_extracted"] = True
        return context


class ArchitectureAgent(PipelineStage):
    """Designs system architecture based on requirements."""

    async def process(self, context: PipelineContext) -> PipelineContext:
        requirements = context.current_data

        response = await self.llm.generate(
            system="""Design a system architecture based on requirements.
            Include: components, interfaces, data flow, and technology choices.""",
            prompt=f"Requirements: {requirements}",
            response_format=Architecture
        )

        context.current_data = response
        context.metadata["architecture_designed"] = True
        return context

通信プロトコル

構造化メッセージフォーマット

python
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, Any

class AgentMessage(BaseModel):
    """Standardized message format for agent communication."""

    # Routing
    message_id: str
    correlation_id: Optional[str] = None  # For request-response pairing
    sender_id: str
    recipient_id: Optional[str] = None  # None = broadcast

    # Timing
    timestamp: datetime
    ttl_seconds: Optional[int] = None

    # Content
    message_type: str  # "request", "response", "event", "command"
    action: str        # Specific action requested
    payload: dict[str, Any]

    # Context
    conversation_id: str
    parent_message_id: Optional[str] = None

    # Metadata
    priority: int = 5  # 1-10, higher = more urgent
    require_ack: bool = False


class ConversationProtocol:
    """Manages structured conversations between agents."""

    def __init__(self):
        self.conversations: dict[str, list[AgentMessage]] = {}

    def start_conversation(self, initiator: str, topic: str) -> str:
        """Start a new conversation thread."""
        conv_id = str(uuid.uuid4())
        self.conversations[conv_id] = []
        return conv_id

    def add_message(self, message: AgentMessage):
        """Add message to conversation history."""
        if message.conversation_id in self.conversations:
            self.conversations[message.conversation_id].append(message)

    def get_context(self, conversation_id: str, max_messages: int = 10) -> list:
        """Get recent conversation context."""
        return self.conversations.get(conversation_id, [])[-max_messages:]

共有メモリ / ブラックボードパターン

python
from typing import Optional
import asyncio
from datetime import datetime

class Blackboard:
    """Shared memory space for multi-agent collaboration."""

    def __init__(self):
        self._state: dict[str, Any] = {}
        self._locks: dict[str, asyncio.Lock] = {}
        self._subscribers: dict[str, list[callable]] = {}
        self._history: list[dict] = []

    async def read(self, key: str) -> Optional[Any]:
        """Read value from blackboard."""
        return self._state.get(key)

    async def write(self, key: str, value: Any, writer_id: str):
        """Write value to blackboard with locking."""
        if key not in self._locks:
            self._locks[key] = asyncio.Lock()

        async with self._locks[key]:
            old_value = self._state.get(key)
            self._state[key] = value

            # Record history
            self._history.append({
                "timestamp": datetime.now(),
                "key": key,
                "old_value": old_value,
                "new_value": value,
                "writer": writer_id
            })

            # Notify subscribers
            await self._notify(key, value, writer_id)

    def subscribe(self, key: str, callback: callable):
        """Subscribe to changes on a key."""
        if key not in self._subscribers:
            self._subscribers[key] = []
        self._subscribers[key].append(callback)

    async def _notify(self, key: str, value: Any, writer_id: str):
        """Notify all subscribers of a change."""
        for callback in self._subscribers.get(key, []):
            await callback(key, value, writer_id)


class BlackboardAgent:
    """Agent that collaborates via blackboard."""

    def __init__(self, agent_id: str, blackboard: Blackboard, llm_client):
        self.agent_id = agent_id
        self.blackboard = blackboard
        self.llm = llm_client

        # Subscribe to relevant keys
        blackboard.subscribe("problem_state", self.on_state_change)
        blackboard.subscribe("help_requests", self.on_help_request)

    async def on_state_change(self, key: str, value: Any, writer_id: str):
        """React to problem state changes."""
        if writer_id == self.agent_id:
            return  # Ignore own writes

        # Check if we can contribute
        contribution = await self._evaluate_contribution(value)
        if contribution:
            current = await self.blackboard.read("partial_solutions") or []
            current.append({
                "agent": self.agent_id,
                "contribution": contribution
            })
            await self.blackboard.write("partial_solutions", current, self.agent_id)

    async def _evaluate_contribution(self, state: dict) -> Optional[dict]:
        """Determine if agent can contribute to current state."""
        response = await self.llm.generate(
            system=f"You are a specialist in {self.specialty}. "
                   "Evaluate if you can contribute to solving this problem.",
            prompt=f"Current state: {state}\n\nCan you contribute? If so, what?"
        )
        return response if response.get("can_contribute") else None

状態管理

イベントソーシングによる分散状態管理

python
from dataclasses import dataclass
from typing import List
from datetime import datetime
import json

@dataclass
class AgentEvent:
    """Immutable event representing a state change."""
    event_id: str
    agent_id: str
    event_type: str
    payload: dict
    timestamp: datetime
    version: int

class EventStore:
    """Persistent store for agent events."""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.subscribers: list[callable] = []

    async def append(self, event: AgentEvent):
        """Append event to store."""
        await self.storage.save(event)

        # Notify subscribers
        for subscriber in self.subscribers:
            await subscriber(event)

    async def get_events(
        self,
        agent_id: str = None,
        event_type: str = None,
        since: datetime = None
    ) -> List[AgentEvent]:
        """Query events with filters."""
        return await self.storage.query(agent_id, event_type, since)

    async def rebuild_state(self, agent_id: str) -> dict:
        """Rebuild agent state from events."""
        events = await self.get_events(agent_id=agent_id)
        state = {}

        for event in events:
            state = self._apply_event(state, event)

        return state

    def _apply_event(self, state: dict, event: AgentEvent) -> dict:
        """Apply event to state (reducer pattern)."""
        if event.event_type == "task_started":
            state["current_task"] = event.payload["task"]
            state["status"] = "working"
        elif event.event_type == "task_completed":
            state["completed_tasks"] = state.get("completed_tasks", [])
            state["completed_tasks"].append(event.payload)
            state["status"] = "idle"
        elif event.event_type == "error_occurred":
            state["last_error"] = event.payload
            state["status"] = "error"

        return state


class StatefulAgent:
    """Agent with event-sourced state management."""

    def __init__(self, agent_id: str, event_store: EventStore, llm_client):
        self.agent_id = agent_id
        self.event_store = event_store
        self.llm = llm_client
        self._state = None

    async def initialize(self):
        """Rebuild state from event history."""
        self._state = await self.event_store.rebuild_state(self.agent_id)

    async def execute_task(self, task: dict):
        """Execute task with state tracking."""
        # Emit start event
        await self._emit_event("task_started", {"task": task})

        try:
            result = await self._do_work(task)
            await self._emit_event("task_completed", {
                "task": task,
                "result": result
            })
            return result
        except Exception as e:
            await self._emit_event("error_occurred", {
                "task": task,
                "error": str(e)
            })
            raise

    async def _emit_event(self, event_type: str, payload: dict):
        """Emit and persist an event."""
        event = AgentEvent(
            event_id=str(uuid.uuid4()),
            agent_id=self.agent_id,
            event_type=event_type,
            payload=payload,
            timestamp=datetime.now(),
            version=len(await self.event_store.get_events(self.agent_id)) + 1
        )
        await self.event_store.append(event)

        # Update local state
        self._state = self.event_store._apply_event(self._state, event)

競合解決

投票とコンセンサス

python
from enum import Enum
from collections import Counter

class VotingStrategy(Enum):
    MAJORITY = "majority"
    UNANIMOUS = "unanimous"
    WEIGHTED = "weighted"
    RANKED_CHOICE = "ranked_choice"

class ConflictResolver:
    """Resolves conflicts between agent outputs."""

    def __init__(self, strategy: VotingStrategy = VotingStrategy.MAJORITY):
        self.strategy = strategy

    async def resolve(
        self,
        question: str,
        agent_responses: dict[str, Any],
        weights: dict[str, float] = None
    ) -> dict:
        """Resolve conflicting agent responses."""

        if self.strategy == VotingStrategy.MAJORITY:
            return self._majority_vote(agent_responses)
        elif self.strategy == VotingStrategy.WEIGHTED:
            return self._weighted_vote(agent_responses, weights)
        elif self.strategy == VotingStrategy.RANKED_CHOICE:
            return await self._ranked_choice(agent_responses)

    def _majority_vote(self, responses: dict[str, Any]) -> dict:
        """Simple majority voting."""
        # Normalize responses for comparison
        normalized = {k: self._normalize(v) for k, v in responses.items()}

        # Count votes
        vote_counts = Counter(normalized.values())
        winner = vote_counts.most_common(1)[0]

        # Find original response
        for agent_id, response in responses.items():
            if self._normalize(response) == winner[0]:
                return {
                    "decision": response,
                    "method": "majority_vote",
                    "vote_count": winner[1],
                    "total_votes": len(responses),
                    "winning_agent": agent_id
                }

    def _weighted_vote(
        self,
        responses: dict[str, Any],
        weights: dict[str, float]
    ) -> dict:
        """Weighted voting based on agent expertise/reliability."""
        scores = {}

        for agent_id, response in responses.items():
            normalized = self._normalize(response)
            weight = weights.get(agent_id, 1.0)

            if normalized not in scores:
                scores[normalized] = {"score": 0, "response": response, "voters": []}

            scores[normalized]["score"] += weight
            scores[normalized]["voters"].append(agent_id)

        # Find highest scored response
        winner = max(scores.values(), key=lambda x: x["score"])

        return {
            "decision": winner["response"],
            "method": "weighted_vote",
            "score": winner["score"],
            "voters": winner["voters"]
        }

    async def _ranked_choice(self, responses: dict[str, Any]) -> dict:
        """Ranked choice voting with elimination rounds."""
        # Each agent ranks all responses (including their own)
        # Simulate by having an LLM rank them
        pass

    def _normalize(self, response: Any) -> str:
        """Normalize response for comparison."""
        if isinstance(response, dict):
            return json.dumps(response, sort_keys=True)
        return str(response).lower().strip()


class ConsensusBuilder:
    """Builds consensus through iterative refinement."""

    def __init__(self, llm_client, max_rounds: int = 3):
        self.llm = llm_client
        self.max_rounds = max_rounds

    async def build_consensus(
        self,
        topic: str,
        agent_positions: dict[str, str]
    ) -> dict:
        """Iteratively build consensus among agents."""

        positions = agent_positions.copy()

        for round_num in range(self.max_rounds):
            # Check if consensus reached
            if self._is_consensus(positions):
                return {
                    "consensus": True,
                    "rounds": round_num + 1,
                    "final_position": list(positions.values())[0]
                }

            # Each agent considers others' positions
            new_positions = {}
            for agent_id, position in positions.items():
                others = {k: v for k, v in positions.items() if k != agent_id}

                revised = await self._revise_position(
                    agent_id, position, others, topic
                )
                new_positions[agent_id] = revised

            positions = new_positions

        # No full consensus - find common ground
        return {
            "consensus": False,
            "rounds": self.max_rounds,
            "final_positions": positions,
            "common_ground": await self._find_common_ground(positions, topic)
        }

    async def _revise_position(
        self,
        agent_id: str,
        position: str,
        others: dict,
        topic: str
    ) -> str:
        """Agent revises position considering others' views."""
        response = await self.llm.generate(
            system=f"You are agent {agent_id}. Consider other perspectives "
                   "and revise your position if you find merit in their arguments. "
                   "Maintain your position if you believe you are correct.",
            prompt=f"""Topic: {topic}

            Your current position: {position}

            Other agents' positions:
            {json.dumps(others, indent=2)}

            Provide your revised position (or reaffirm your current one):"""
        )
        return response

実例: ソフトウェア開発チーム

python
class SoftwareDevTeam:
    """Multi-agent system simulating a software development team."""

    def __init__(self, llm_client):
        self.llm = llm_client
        self.message_bus = MessageBus()
        self.blackboard = Blackboard()
        self.event_store = EventStore(InMemoryStorage())

        # Create specialized agents
        self.agents = {
            "product_manager": ProductManagerAgent(
                "pm", self.message_bus, self.blackboard, llm_client
            ),
            "architect": ArchitectAgent(
                "arch", self.message_bus, self.blackboard, llm_client
            ),
            "developer": DeveloperAgent(
                "dev", self.message_bus, self.blackboard, llm_client
            ),
            "qa": QAAgent(
                "qa", self.message_bus, self.blackboard, llm_client
            ),
            "tech_writer": TechWriterAgent(
                "docs", self.message_bus, self.blackboard, llm_client
            ),
        }

        self.supervisor = TeamLeadAgent(
            "lead", self.agents, self.message_bus, llm_client
        )

    async def develop_feature(self, feature_request: str) -> dict:
        """Complete feature development lifecycle."""

        # Phase 1: Requirements
        await self.blackboard.write(
            "feature_request",
            feature_request,
            "system"
        )

        requirements = await self.supervisor.delegate(
            agent="product_manager",
            task="analyze_requirements",
            input=feature_request
        )
        await self.blackboard.write("requirements", requirements, "pm")

        # Phase 2: Architecture
        architecture = await self.supervisor.delegate(
            agent="architect",
            task="design_architecture",
            input=requirements
        )

        # Review architecture with debate
        debate = DebateSystem(self.llm)
        refined_arch = await debate.debate(
            topic=f"Review architecture: {architecture}"
        )
        await self.blackboard.write("architecture", refined_arch, "arch")

        # Phase 3: Implementation
        implementation = await self.supervisor.delegate(
            agent="developer",
            task="implement",
            input={"requirements": requirements, "architecture": refined_arch}
        )

        # Phase 4: Testing
        test_results = await self.supervisor.delegate(
            agent="qa",
            task="test",
            input=implementation
        )

        # Iterate if tests fail
        while not test_results["all_passed"]:
            fixes = await self.supervisor.delegate(
                agent="developer",
                task="fix_bugs",
                input=test_results["failures"]
            )
            test_results = await self.supervisor.delegate(
                agent="qa",
                task="test",
                input=fixes
            )

        # Phase 5: Documentation
        docs = await self.supervisor.delegate(
            agent="tech_writer",
            task="document",
            input={"code": implementation, "architecture": refined_arch}
        )

        return {
            "requirements": requirements,
            "architecture": refined_arch,
            "implementation": implementation,
            "tests": test_results,
            "documentation": docs
        }

トレードオフ

アプローチメリットデメリット
階層型明確な制御、デバッグが容易単一障害点、ボトルネック
ピアツーピア耐障害性が高い、柔軟調整が複雑、カオスの可能性
ディベートより高品質な出力遅い、コストが高い(複数のLLM呼び出し)
パイプラインシンプル、予測可能柔軟性が低い、後戻り不可
ブラックボード柔軟な協調競合状態、複雑性

マルチエージェントシステムの適用場面

適している場合:

  • 多様な専門知識を必要とする複雑なタスク
  • 複数の視点から恩恵を受けるタスク
  • 明確なフェーズを持つ長時間実行ワークフロー
  • チェックアンドバランスが必要なシナリオ

適していない場合:

  • シンプルな単一ステップのタスク
  • レイテンシが重要なアプリケーション
  • コストに敏感な環境
  • 強い一貫性が必要なタスク

参考文献

MITライセンスの下で公開。Babushkaiコミュニティが構築。