Events - Message Bus
Overview
The Events system is a core component in the TKM AI Agency Platform that provides asynchronous communication and event handling between agents. It implements a publish-subscribe pattern for event distribution, logging, and response handling.
Directory Structure
Backend/Core_Components/
├── events.py # Event system implementation
└── types.py # Event type definitions
Main Components
AgentEventBus Class
The core component that handles event management:
- Event subscription
- Event emission
- Response handling
- History tracking
Event Types
Predefined system events:
- Conversation events
- Session management
- State changes
- Processing notifications
Processing Pipeline
-
Event Reception
- Event type validation
- Data formatting
- Subscriber notification
- History logging
-
Event Processing
- Callback execution
- Response handling
- Timeout management
- Error handling
-
Response Management
- Future handling
- Response routing
- Timeout control
- State tracking
API Operations
Subscribe to Event
- Method:
subscribe
- Purpose: Registers callbacks for event types
- Parameters:
{ "event_type": str, "callback": Callable }
Emit Event
- Method:
emit
- Purpose: Broadcasts events to subscribers
- Parameters:
{ "event_type": str, "data": Dict[str, Any], "agent_type": Optional[str] }
Emit and Wait
- Method:
emit_and_wait
- Purpose: Emits events and waits for responses
- Parameters:
{ "event_type": str, "data": Dict[str, Any], "timeout": float }
Key Features
-
Event Management
- Event subscription
- Asynchronous emission
- Response handling
- History tracking
-
Response System
- Future-based responses
- Timeout handling
- Response routing
- State management
-
History Tracking
- Event logging
- Type filtering
- Timestamp tracking
- Data preservation
Event Types
System Events
- CONVERSATION_CREATED
- CONVERSATION_UPDATED
- CONTEXT_REQUESTED
- SESSION_CREATED
- SESSION_UPDATED
Process Events
- MESSAGE_RECEIVED
- MESSAGE_PROCESSED
- LANGUAGE_NORMALIZED
- TRANSCRIPTION_COMPLETED
State Events
- FOLDER_STRUCTURE_DETERMINED
- ORION_STATE_CHANGED
- UPGRADE_REQUESTED
- UPGRADE_COMPLETED
Error Handling
-
Emission Errors
- Callback failures
- Timeout handling
- Invalid events
- State conflicts
-
Response Errors
- Missing responses
- Invalid data
- Timeout recovery
- State cleanup
Performance Features
-
Optimization
- Async processing
- Efficient routing
- Resource management
- Memory control
-
Monitoring
- Event tracking
- Response timing
- Error logging
- Performance metrics
Data Models
Event Data
{
"type": str,
"data": Dict[str, Any],
"agent_type": Optional[str],
"timestamp": str
}
Response Data
{
"event_id": str,
"response": Dict[str, Any],
"timestamp": str
}