Skip to main content
This example demonstrates how tinychat handles real-world complexity: a chatbot that enriches every user message with CRM data and conversation history before generating context-aware responses.

What We’re Building

A production-ready conversational system that:
  • Retrieves user information from a CRM system
  • Loads conversation history from a database
  • Routes enriched messages through an agent system
  • Generates personalized responses based on full context
  • Uses nested composites to organize logical subsystems

Message Types: Documenting the Flow

This example defines four custom message types that make the information flow explicit:
@dataclass(frozen=True)
class CRMMessage(Message):
    content: dict
    user_id: str
    user_info: dict
    user_message: Message

@dataclass(frozen=True)
class EnrichedMessage(Message):
    content: dict
    user_id: str
    user_info: dict
    history: list[str]
    user_message: Message

@dataclass(frozen=True)
class ReplyRequestMessage(Message):
    content: str
    user_id: str
    user_info: dict
    history: list[str]
    user_message: Message

@dataclass(frozen=True)
class ReplyMessage(Message):
    content: str
    user_message: Message
Each message type represents a distinct stage of enrichment:
  1. IngressMessage → Raw user input
  2. CRMMessage → Input + user profile
  3. EnrichedMessage → Input + profile + history
  4. ReplyRequestMessage → Formatted request for agent
  5. ReplyMessage → Generated response
  6. EgressMessage → Final output
The types create a type-driven pipeline where each processor knows exactly what data it receives and what data it must produce.
Notice how each message type carries forward previous data:
  • CRMMessage includes user_message (the original ingress)
  • EnrichedMessage includes user_message AND user_info
  • ReplyRequestMessage includes everything
This is immutable accumulation—we’re not mutating messages, we’re creating new messages with more information. Each message is a complete snapshot of the processing state at that stage.
Just by reading the message type definitions, you understand the entire system:
# The progression tells the story:
IngressMessage          # → "User said something"
  → CRMMessage         # → "We know who they are"
    → EnrichedMessage  # → "We know their history"
      → ReplyRequest   # → "Agent needs to respond"
        → ReplyMessage # → "Agent generated response"
          → Egress     # → "Send to user"
No documentation needed. The types ARE the documentation.

Stage 1: CRM Enrichment

The CRMProcessor retrieves user profile information:
class CRMProcessor(MessageProcessor):
    def __init__(self, name: str = "CRM", output_types: set[type[Message]] | None = None):
        super().__init__(name=name, output_types=output_types)
        # Mock user database
        self._users = {
            "user_123": {
                "name": "Alice",
                "email": "[email protected]",
                "tier": "premium",
            },
        }
    
    async def _process(self, message: IngressMessage) -> Optional[Message]:
        user_id = message.user_id
        user_info = self._users.get(user_id, {"name": "Unknown", ...})
        
        return CRMMessage(
            content={"user_id": user_id, "user_info": user_info},
            user_id=user_id,
            user_message=message,
            user_info=user_info,
        )
The processor maintains internal state (the user database) but produces immutable messages. This separation means you can cache, replicate, or hot-swap the processor without worrying about message consistency.

Real-World Extensions

class CRMProcessor(MessageProcessor):
    def __init__(self, crm_client: CRMClient):
        self._crm = crm_client
    
    async def _process(self, message: IngressMessage) -> Optional[Message]:
        # Call external CRM API
        user_info = await self._crm.get_user(message.user_id)
        
        if not user_info:
            return None  # Filter out unknown users
        
        return CRMMessage(
            user_id=message.user_id,
            user_info=user_info,
            user_message=message,
        )
Notice how policies like caching and retry logic are your choice, not framework requirements. Implement what makes sense for your system.

Stage 2: Database Enrichment

The DBProcessor adds conversation history:
class DBProcessor(MessageProcessor):
    def __init__(self, name: str = "DB", output_types: set[type[Message]] | None = None):
        super().__init__(name=name, output_types=output_types)
        # Mock chat history database
        self._history = {
            "user_123": [
                "User: Hello!",
                "Bot: Hi Alice! How can I help?",
                ...
            ],
        }
    
    async def _process(self, message: CRMMessage) -> Optional[Message]:
        user_id = message.user_id
        history = self._history.get(user_id, [])
        
        return EnrichedMessage(
            content={"user_id": user_id, "user_info": message.user_info, "history": history},
            user_id=user_id,
            user_info=message.user_info,
            history=history,
            user_message=message.user_message,
        )
The DB processor receives CRMMessage and produces EnrichedMessage. The type system ensures we can’t accidentally skip the CRM stage—the types enforce the enrichment order.

Automatic Chaining

Notice we didn’t write any code to chain CRM → DB. The CompositeProcessor handles it automatically:
bus = CompositeProcessor(
    handlers={
        IngressMessage: crm,   # Produces CRMMessage
        CRMMessage: db,        # Produces EnrichedMessage
        EnrichedMessage: agent_bus,
    },
)

# This automatically chains:
# Ingress → CRM → CRMMessage → DB → EnrichedMessage → agent_bus
The routing is type-driven and automatic. No manual wiring, no callbacks, no complex orchestration code.

Stage 3: Agent System (Nested Composite)

Here’s where it gets interesting. We create a nested composite to handle the agent logic:
# Inner message bus for agents
agent_bus = CompositeProcessor(
    handlers={
        EnrichedMessage: orchestrator,      # Routes to reply agent
        ReplyRequestMessage: reply_agent,   # Generates response
        ReplyMessage: orchestrator,         # Approves and returns
    },
    max_hops=10,
    output_types={EgressMessage},
)

# Outer message bus
bus = CompositeProcessor(
    handlers={
        IngressMessage: crm,
        CRMMessage: db,
        EnrichedMessage: agent_bus,  # Nested composite!
    },
    max_hops=5,
    output_types={EgressMessage},
)
Nesting creates logical subsystems with clear boundaries:
  • Outer bus: Handles enrichment (CRM → DB)
  • Inner bus: Handles agent logic (Orchestrator → Reply → Orchestrator)
Each subsystem has its own max_hops limit, error handling, and lifecycle. The outer bus doesn’t need to know about the inner agent routing—it just knows “send EnrichedMessage to agent system, get back EgressMessage.”
This is the key insight: CompositeProcessor implements MessageProcessor interface. That means a composite can be a handler in another composite.
# agent_bus is a MessageProcessor
handlers={
    EnrichedMessage: agent_bus,  # It's just another processor!
}
This is recursive composition—the same abstraction at every level.
agent_bus = CompositeProcessor(max_hops=10)  # Inner limit
bus = CompositeProcessor(max_hops=5)         # Outer limit
The inner agent bus can do up to 10 message transformations internally. The outer bus can do up to 5 transformations in its own routing. These limits are independent and composable.

The Orchestrator: State-Dependent Routing (1:N)

The OrchestratorAgent demonstrates the 1:N handler-to-types pattern:
class OrchestratorAgent(MessageProcessor):
    async def _process(self, message: Message) -> Optional[Message]:
        if isinstance(message, EnrichedMessage):
            # Forward to reply agent
            return ReplyRequestMessage(
                content=f"New message from {message.user_id}: {message.user_message.content}",
                user_id=message.user_id,
                user_info=message.user_info,
                history=message.history,
                user_message=message.user_message,
            )
        
        elif isinstance(message, ReplyMessage):
            # Approve and return as egress
            return EgressMessage(
                content=message.content,
            )
        
        else:
            raise ValueError(f"Unexpected message type: {type(message)}")
The orchestrator handles two different input types and produces two different output types. This is what enables complex routing topologies.

Flow Through the Agent Bus

Let’s trace a message through the agent bus:
1

EnrichedMessage arrives

The outer bus routes EnrichedMessage to the agent bus.
# agent_bus receives EnrichedMessage
2

Orchestrator transforms to ReplyRequest

The orchestrator receives EnrichedMessage and transforms it to ReplyRequestMessage.
EnrichedMessage → Orchestrator → ReplyRequestMessage
3

ReplyAgent generates response

The agent bus routes ReplyRequestMessage to the reply agent, which produces ReplyMessage.
ReplyRequestMessage → ReplyAgent → ReplyMessage
4

Orchestrator approves and returns

The agent bus routes ReplyMessage back to the orchestrator, which approves and produces EgressMessage.
ReplyMessage → Orchestrator → EgressMessage
5

Agent bus returns to outer bus

The agent bus returns EgressMessage to the outer bus, which completes processing.
Three message transformations within the agent bus, all automatic. The outer bus just sees “sent EnrichedMessage, got back EgressMessage.”

Why This Pattern?

Separation of Concerns

Orchestrator = routing logic. ReplyAgent = generation logic. Each processor does one thing.

Easy to Extend

Need approval logic? Add an approval processor. Need multiple agents? Add more handlers.

Testable in Isolation

Test orchestrator routing separately from reply generation. Mock agents, test flows.

Composable State Machines

The orchestrator can route to different processors based on any logic—user tier, message type, business rules.

The Reply Agent: Context-Aware Generation

The ReplyAgent uses all the enriched context:
class ReplyAgent(MessageProcessor):
    async def _process(self, message: ReplyRequestMessage) -> Optional[Message]:
        # All context available in the message
        user_name = message.user_info.get("name", "there")
        tier = message.user_info.get("tier", "free")
        history_count = len(message.history)
        
        # Generate personalized response
        reply_text = (
            f"Hello {user_name}! Thanks for your message: '{message.user_message.content}'. "
            f"I can see you're a {tier} member and we've chatted {history_count} times before. "
            f"How can I assist you today?"
        )
        
        return ReplyMessage(
            content=reply_text,
            user_message=message.user_message,
        )
The reply agent doesn’t need to know about CRM processors or database processors. It just receives a ReplyRequestMessage with all the context it needs. This is message-driven decoupling.

Swapping Implementations

Want to use an LLM instead of template-based replies?
class LLMReplyAgent(MessageProcessor):
    def __init__(self, llm_client: LLMClient):
        self._llm = llm_client
    
    async def _process(self, message: ReplyRequestMessage) -> Optional[Message]:
        # Build prompt with enriched context
        prompt = f"""
        User: {message.user_info['name']} (tier: {message.user_info['tier']})
        History: {message.history}
        Current message: {message.user_message.content}
        
        Generate a helpful, personalized response.
        """
        
        response = await self._llm.generate(prompt)
        
        return ReplyMessage(
            content=response,
            user_message=message.user_message,
        )
Just swap the handler in the composite—the rest of the system doesn’t change. The orchestrator doesn’t care how replies are generated, only that ReplyRequestMessage → ReplyMessage.

Observability: Tracking the Journey

The LoggingObserver tracks each message transformation with latency metrics:
class LoggingObserver(BaseObserver):
    def __init__(self):
        self._received_messages: dict[str, MessageReceived] = {}
    
    async def on_message_received(self, message: MessageReceived) -> None:
        logger.info(f"📨 [{message.source_processor.name}] Received: {message.content}")
        self._received_messages[message.source_message.id] = message
    
    async def on_message_processed(self, message: MessageProcessed) -> None:
        logger.info(f"✅ [{message.source_processor.name}] Processed: {message.content}")
        
        # Calculate latency
        received_msg = self._received_messages.get(message.source_message.id)
        if received_msg:
            latency_ns = message.timestamp - received_msg.timestamp
            latency_us = latency_ns / 1_000
            logger.info(f"⏱️  [{message.source_processor.name}] Latency: {latency_us:.2f}μs")
The observer receives notifications for:
  • Every message received by every processor
  • Every message processed by every processor
  • Every exception that occurs
This gives you complete visibility into the system without touching processor logic.
The observer tracks message IDs to calculate per-processor latency. You could also calculate:
  • End-to-end latency (Ingress → Egress)
  • Throughput (messages/second per processor)
  • Error rates (exceptions/total messages)
  • Hop counts (messages hitting max_hops)
All without modifying any processor code.
config = SetupConfig(
    observers=[
        LoggingObserver(),
        MetricsObserver(),
        TracingObserver(),
        DebugObserver(),
    ],
)
Each observer handles different concerns. They don’t interfere with each other or with processing logic.

Lifecycle: Assembly and Execution

The main flow demonstrates the full lifecycle:
async def main():
    # 1. Configuration
    config = SetupConfig(
        task_manager_params=TaskManagerParams(loop=asyncio.get_running_loop()),
        observers=[LoggingObserver()],
    )
    
    # 2. Create processors with output types
    crm = CRMProcessor(output_types={CRMMessage})
    db = DBProcessor(output_types={EnrichedMessage})
    orchestrator = OrchestratorAgent(
        name="orchestrator",
        output_types={ReplyRequestMessage, EgressMessage}  # 1:N!
    )
    reply_agent = ReplyAgent(name="reply_agent", output_types={ReplyMessage})
    
    # 3. Build nested topology
    agent_bus = CompositeProcessor(
        handlers={
            EnrichedMessage: orchestrator,
            ReplyRequestMessage: reply_agent,
            ReplyMessage: orchestrator,
        },
        max_hops=10,
        output_types={EgressMessage},
    )
    
    bus = CompositeProcessor(
        handlers={
            IngressMessage: crm,
            CRMMessage: db,
            EnrichedMessage: agent_bus,  # Nested!
        },
        max_hops=5,
        output_types={EgressMessage},
    )
    
    # 4. Setup
    await bus.setup(config)
    
    # 5. Process
    message = IngressMessage(
        content="I need help with my account",
        conversation_id="enriched-demo",
        user_id="user_123",
    )
    
    result = await bus.process(message)
Output type validation happens at composite construction. If you declare that a processor outputs MessageType but no handler exists for MessageType, you’ll get an error immediately—not at runtime.

Topology Visualization

The system’s structure is self-documenting:
# Get topology for debugging
topology = bus.get_topology()
# Shows:
# IngressMessage → CRMProcessor → CRMMessage
# CRMMessage → DBProcessor → EnrichedMessage  
# EnrichedMessage → CompositeProcessor (agent_bus) → EgressMessage
#   [Inside agent_bus]
#   EnrichedMessage → Orchestrator → ReplyRequestMessage
#   ReplyRequestMessage → ReplyAgent → ReplyMessage
#   ReplyMessage → Orchestrator → EgressMessage
Use get_topology() to generate documentation, debug routing issues, or visualize complex systems in dashboards.

Scaling Patterns

This architecture scales both vertically and horizontally:
bus = CompositeProcessor(
    handlers={
        IngressMessage: crm,
        CRMMessage: preferences_processor,  # New stage
        PreferencesMessage: db,
        EnrichedMessage: sentiment_analyzer, # New stage
        AnalyzedMessage: agent_bus,
    },
)
Just add message types and handlers. The composite handles routing.

Real-World Production Patterns

class RobustCRMProcessor(MessageProcessor):
    def __init__(self, primary_crm: CRMClient, fallback_crm: CRMClient):
        self._primary = primary_crm
        self._fallback = fallback_crm
    
    async def _process(self, message: IngressMessage) -> Optional[Message]:
        try:
            user_info = await self._primary.get_user(message.user_id)
        except CRMError:
            logger.warning("Primary CRM failed, trying fallback")
            try:
                user_info = await self._fallback.get_user(message.user_id)
            except CRMError:
                # Return default user info instead of failing
                user_info = {"name": "Valued Customer", "tier": "standard"}
        
        return CRMMessage(user_info=user_info, ...)
Processors handle their own errors. The framework doesn’t dictate policy.
class RateLimitedProcessor(MessageProcessor):
    def __init__(self, inner: MessageProcessor, rate_limiter: RateLimiter):
        self._inner = inner
        self._rate_limiter = rate_limiter
    
    async def _process(self, message: Message) -> Optional[Message]:
        await self._rate_limiter.acquire(message.user_id)
        return await self._inner.process(message)
Wrap processors with rate limiting. Composition over framework features.
class CircuitBreakerProcessor(MessageProcessor):
    def __init__(self, inner: MessageProcessor, breaker: CircuitBreaker):
        self._inner = inner
        self._breaker = breaker
    
    async def _process(self, message: Message) -> Optional[Message]:
        if self._breaker.is_open():
            logger.warning("Circuit breaker open, skipping processor")
            return None
        
        try:
            result = await self._inner.process(message)
            self._breaker.record_success()
            return result
        except Exception as e:
            self._breaker.record_failure()
            raise
Standard patterns, implemented as wrappers.

Key Insights

Types Document Flow

Custom message types make the enrichment pipeline self-documenting. Read the types, understand the system.

Processors Are Independent

CRM, DB, Orchestrator, ReplyAgent—each testable in isolation, swappable, and reusable.

Composition Creates Structure

Nested composites organize complex systems into logical subsystems with clear boundaries.

Routing Is Automatic

Type-based routing eliminates manual chaining. Define handlers, let the composite wire them up.

What Makes This Production-Ready

If CRM fails, return None and stop. If DB fails, return None. If reply agent fails, orchestrator can return a default response. Failures are explicit, not cascading.
Mock the CRM response, test DB processor. Mock enriched messages, test agent logic. Unit test each processor, integration test the composite.
Observers track every transformation, calculate latencies, log errors. Full visibility without code changes.
Run multiple instances of the bus, process messages concurrently. Stateless messages + stateful processors = horizontal scaling.

Try It Yourself

1

Run the example

Copy the full code and run it. Watch the logs to see:
  • CRM lookup for user info
  • Database fetch for history
  • Orchestrator routing to reply agent
  • Agent generating personalized response
  • Orchestrator approving and returning
2

Extend the enrichment

Add another processor between CRM and DB:
class PreferencesProcessor(MessageProcessor):
    async def _process(self, message: CRMMessage) -> PreferencesMessage:
        prefs = await self._load_preferences(message.user_id)
        return PreferencesMessage(
            user_info=message.user_info,
            preferences=prefs,
            user_message=message.user_message,
        )
Update the handlers, and it just works.
3

Add more agents

Create specialist agents for different question types:
class TechnicalAgent(MessageProcessor): ...
class BillingAgent(MessageProcessor): ...

# Update orchestrator to route based on message content
class SmartOrchestrator(MessageProcessor):
    async def _process(self, message: EnrichedMessage) -> Message:
        if "technical" in message.user_message.content.lower():
            return TechnicalQuestion(...)
        elif "billing" in message.user_message.content.lower():
            return BillingQuestion(...)
        else:
            return GeneralQuestion(...)
4

Add observability

Create a metrics observer:
class MetricsObserver(BaseObserver):
    async def on_message_processed(self, message: MessageProcessed):
        # Send to your metrics system
        metrics.increment(f"{message.source_processor.name}.messages")
        metrics.timing(f"{message.source_processor.name}.latency", latency)
Register it in setup config, and you have metrics without touching processor code.

Comparing Complexity

Without tinychat’s primitives:
# Manual orchestration, coupled logic
async def handle_message(user_message: str, user_id: str):
    user_info = await crm_client.get_user(user_id)
    history = await db.get_history(user_id)
    request = format_request(user_message, user_info, history)
    reply = await agent.generate(request)
    approved_reply = approve_reply(reply)
    return approved_reply

# Everything coupled, hard to test, hard to extend
# Where do you add caching? Retry logic? Metrics?
# How do you compose this with other flows?
With tinychat’s primitives:
# Declarative topology, automatic routing
bus = CompositeProcessor(
    handlers={
        IngressMessage: crm,
        CRMMessage: db,
        EnrichedMessage: agent_bus,
    },
)

result = await bus.process(message)

# Each processor independent, testable, reusable
# Add caching: wrap processor. Add retry: wrap processor.
# Add metrics: add observer. Compose: nest composites.
tinychat turns complex orchestration into simple composition. The primitives do the heavy lifting.

Ready for more?

Explore the information theory foundations and advanced topology patterns in the Core Primitives guide.