Backend
Events - Message Bus

Events - Message Bus

Overview

The Events system is a core component in the TKM AI Agency Platform that provides asynchronous communication and event handling between agents. It implements a publish-subscribe pattern for event distribution, logging, and response handling.

Directory Structure

Backend/Core_Components/
├── events.py           # Event system implementation
└── types.py           # Event type definitions

Main Components

AgentEventBus Class

The core component that handles event management:

  • Event subscription
  • Event emission
  • Response handling
  • History tracking

Event Types

Predefined system events:

  • Conversation events
  • Session management
  • State changes
  • Processing notifications

Processing Pipeline

  1. Event Reception

    • Event type validation
    • Data formatting
    • Subscriber notification
    • History logging
  2. Event Processing

    • Callback execution
    • Response handling
    • Timeout management
    • Error handling
  3. Response Management

    • Future handling
    • Response routing
    • Timeout control
    • State tracking

API Operations

Subscribe to Event

  • Method: subscribe
  • Purpose: Registers callbacks for event types
  • Parameters:
    {
      "event_type": str,
      "callback": Callable
    }

Emit Event

  • Method: emit
  • Purpose: Broadcasts events to subscribers
  • Parameters:
    {
      "event_type": str,
      "data": Dict[str, Any],
      "agent_type": Optional[str]
    }

Emit and Wait

  • Method: emit_and_wait
  • Purpose: Emits events and waits for responses
  • Parameters:
    {
      "event_type": str,
      "data": Dict[str, Any],
      "timeout": float
    }

Key Features

  1. Event Management

    • Event subscription
    • Asynchronous emission
    • Response handling
    • History tracking
  2. Response System

    • Future-based responses
    • Timeout handling
    • Response routing
    • State management
  3. History Tracking

    • Event logging
    • Type filtering
    • Timestamp tracking
    • Data preservation

Event Types

System Events

  • CONVERSATION_CREATED
  • CONVERSATION_UPDATED
  • CONTEXT_REQUESTED
  • SESSION_CREATED
  • SESSION_UPDATED

Process Events

  • MESSAGE_RECEIVED
  • MESSAGE_PROCESSED
  • LANGUAGE_NORMALIZED
  • TRANSCRIPTION_COMPLETED

State Events

  • FOLDER_STRUCTURE_DETERMINED
  • ORION_STATE_CHANGED
  • UPGRADE_REQUESTED
  • UPGRADE_COMPLETED

Error Handling

  1. Emission Errors

    • Callback failures
    • Timeout handling
    • Invalid events
    • State conflicts
  2. Response Errors

    • Missing responses
    • Invalid data
    • Timeout recovery
    • State cleanup

Performance Features

  1. Optimization

    • Async processing
    • Efficient routing
    • Resource management
    • Memory control
  2. Monitoring

    • Event tracking
    • Response timing
    • Error logging
    • Performance metrics

Data Models

Event Data

{
    "type": str,
    "data": Dict[str, Any],
    "agent_type": Optional[str],
    "timestamp": str
}

Response Data

{
    "event_id": str,
    "response": Dict[str, Any],
    "timestamp": str
}