Core Classes
The core classes provide the main API for building and running agent teams.
TaskExecutor
The main entry point for executing tasks with agent teams.
Constructor
from agentx import TaskExecutor
executor = TaskExecutor(config_path="path/to/config")
Parameters:
config_path
(str): Path to the configuration directory containingteam.yaml
Methods
execute_task(task: str) -> TaskResult
Execute a task with the configured agent team.
result = await executor.execute_task("Write a report on renewable energy")
print(f"Success: {result.success}")
print(f"Summary: {result.summary}")
print(f"Messages: {len(result.conversation_history)}")
Parameters:
task
(str): The task description to execute
Returns:
TaskResult
: Contains success status, summary, and conversation history
stream_task(task: str) -> AsyncIterator[Event]
Stream task execution events in real-time.
async for event in executor.stream_task("Analyze this data"):
if event.type == "agent_message":
print(f"{event.sender}: {event.content}")
elif event.type == "tool_call":
print(f"Tool: {event.tool_name}")
Parameters:
task
(str): The task description to execute
Yields:
Event
: Stream of execution events
Agent
Represents an individual AI agent with specific capabilities.
Properties
agent.name # Agent name
agent.role # Agent role/specialization
agent.tools # Available tools
agent.memory # Memory system
agent.llm_config # LLM configuration
Methods
send_message(content: str, recipient: str = None)
Send a message to another agent or the team.
await agent.send_message("Please review this document", recipient="reviewer")
Team
Represents a collection of agents working together.
Class Methods
Team.from_config(config_path: str) -> Team
Create a team from configuration files.
team = Team.from_config("path/to/config")
Methods
run(task: str) -> TaskResult
Execute a task with the team.
result = await team.run("Create a marketing plan")
Brain
The LLM interface used by agents for reasoning and communication.
Methods
complete(messages: List[Message]) -> str
Generate a completion for the given messages.
response = await brain.complete([
Message(role="user", content="Hello")
])
stream_complete(messages: List[Message]) -> AsyncIterator[str]
Stream completion tokens.
async for token in brain.stream_complete(messages):
print(token, end="", flush=True)
ToolExecutor
Manages tool execution and security.
Methods
execute_tool(name: str, args: dict) -> ToolResult
Execute a tool with given arguments.
result = await tool_executor.execute_tool("web_search", {
"query": "AI agent frameworks"
})
Configuration Classes
TeamConfig
Configuration for agent teams.
@dataclass
class TeamConfig:
name: str
agents: List[AgentConfig]
speaker_selection_method: str = "auto"
max_rounds: int = 10
termination_condition: str = "TERMINATE"
AgentConfig
Configuration for individual agents.
@dataclass
class AgentConfig:
name: str
role: str
prompt_file: Optional[str] = None
tools: List[str] = field(default_factory=list)
enable_memory: bool = False
human_input_mode: str = "NEVER"
llm: Optional[LLMConfig] = None
LLMConfig
Configuration for language models.
@dataclass
class LLMConfig:
model: str
temperature: float = 0.7
max_tokens: Optional[int] = None
api_key: Optional[str] = None
Event Types
Events emitted during task execution:
AgentMessageEvent
@dataclass
class AgentMessageEvent:
type: str = "agent_message"
sender: str
content: str
timestamp: datetime
ToolCallEvent
@dataclass
class ToolCallEvent:
type: str = "tool_call"
tool_name: str
arguments: dict
timestamp: datetime
TaskCompleteEvent
@dataclass
class TaskCompleteEvent:
type: str = "task_complete"
success: bool
summary: str
timestamp: datetime
Error Handling
AgentXError
Base exception for all AgentX errors.
try:
result = await executor.execute_task("Invalid task")
except AgentXError as e:
print(f"AgentX error: {e}")
Common Exceptions
ConfigurationError
: Invalid configurationToolExecutionError
: Tool execution failedAgentCommunicationError
: Agent communication failedLLMError
: Language model error
Usage Patterns
Basic Usage
import asyncio
from agentx import TaskExecutor
async def main():
executor = TaskExecutor(config_path="config")
result = await executor.execute_task("Hello world")
print(result.summary)
asyncio.run(main())
Streaming Usage
async def stream_example():
executor = TaskExecutor(config_path="config")
async for event in executor.stream_task("Write a story"):
match event.type:
case "agent_message":
print(f"{event.sender}: {event.content}")
case "tool_call":
print(f"Using tool: {event.tool_name}")
case "task_complete":
print(f"Task completed: {event.success}")
Error Handling
async def robust_execution():
executor = TaskExecutor(config_path="config")
try:
result = await executor.execute_task("Complex task")
if result.success:
print("Task completed successfully")
else:
print(f"Task failed: {result.error}")
except ConfigurationError as e:
print(f"Configuration error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")