Build a multi-stage chatbot with CRM integration, database enrichment, and nested agent systems
This example demonstrates how tinychat handles real-world complexity: a chatbot that enriches every user message with CRM data and conversation history before generating context-aware responses.
Each message type represents a distinct stage of enrichment:
IngressMessage → Raw user input
CRMMessage → Input + user profile
EnrichedMessage → Input + profile + history
ReplyRequestMessage → Formatted request for agent
ReplyMessage → Generated response
EgressMessage → Final output
The types create a type-driven pipeline where each processor knows exactly what data it receives and what data it must produce.
Immutability with accumulated state
Notice how each message type carries forward previous data:
CRMMessage includes user_message (the original ingress)
EnrichedMessage includes user_message AND user_info
ReplyRequestMessage includes everything
This is immutable accumulation—we’re not mutating messages, we’re creating new messages with more information. Each message is a complete snapshot of the processing state at that stage.
Self-documenting architecture
Just by reading the message type definitions, you understand the entire system:
Copy
# The progression tells the story:IngressMessage # → "User said something" → CRMMessage # → "We know who they are" → EnrichedMessage # → "We know their history" → ReplyRequest # → "Agent needs to respond" → ReplyMessage # → "Agent generated response" → Egress # → "Send to user"
No documentation needed. The types ARE the documentation.
The processor maintains internal state (the user database) but produces immutable messages. This separation means you can cache, replicate, or hot-swap the processor without worrying about message consistency.
class DBProcessor(MessageProcessor): def __init__(self, name: str = "DB", output_types: set[type[Message]] | None = None): super().__init__(name=name, output_types=output_types) # Mock chat history database self._history = { "user_123": [ "User: Hello!", "Bot: Hi Alice! How can I help?", ... ], } async def _process(self, message: CRMMessage) -> Optional[Message]: user_id = message.user_id history = self._history.get(user_id, []) return EnrichedMessage( content={"user_id": user_id, "user_info": message.user_info, "history": history}, user_id=user_id, user_info=message.user_info, history=history, user_message=message.user_message, )
The DB processor receives CRMMessage and produces EnrichedMessage. The type system ensures we can’t accidentally skip the CRM stage—the types enforce the enrichment order.
Each subsystem has its own max_hops limit, error handling, and lifecycle. The outer bus doesn’t need to know about the inner agent routing—it just knows “send EnrichedMessage to agent system, get back EgressMessage.”
CompositeProcessor is a MessageProcessor
This is the key insight: CompositeProcessor implements MessageProcessor interface. That means a composite can be a handler in another composite.
Copy
# agent_bus is a MessageProcessorhandlers={ EnrichedMessage: agent_bus, # It's just another processor!}
This is recursive composition—the same abstraction at every level.
The inner agent bus can do up to 10 message transformations internally. The outer bus can do up to 5 transformations in its own routing. These limits are independent and composable.
class ReplyAgent(MessageProcessor): async def _process(self, message: ReplyRequestMessage) -> Optional[Message]: # All context available in the message user_name = message.user_info.get("name", "there") tier = message.user_info.get("tier", "free") history_count = len(message.history) # Generate personalized response reply_text = ( f"Hello {user_name}! Thanks for your message: '{message.user_message.content}'. " f"I can see you're a {tier} member and we've chatted {history_count} times before. " f"How can I assist you today?" ) return ReplyMessage( content=reply_text, user_message=message.user_message, )
The reply agent doesn’t need to know about CRM processors or database processors. It just receives a ReplyRequestMessage with all the context it needs. This is message-driven decoupling.
Just swap the handler in the composite—the rest of the system doesn’t change. The orchestrator doesn’t care how replies are generated, only that ReplyRequestMessage → ReplyMessage.
Output type validation happens at composite construction. If you declare that a processor outputs MessageType but no handler exists for MessageType, you’ll get an error immediately—not at runtime.
If CRM fails, return None and stop. If DB fails, return None. If reply agent fails, orchestrator can return a default response. Failures are explicit, not cascading.
Easy to test each component
Mock the CRM response, test DB processor. Mock enriched messages, test agent logic. Unit test each processor, integration test the composite.
Observable end-to-end
Observers track every transformation, calculate latencies, log errors. Full visibility without code changes.
Scales horizontally
Run multiple instances of the bus, process messages concurrently. Stateless messages + stateful processors = horizontal scaling.
Create specialist agents for different question types:
Copy
class TechnicalAgent(MessageProcessor): ...class BillingAgent(MessageProcessor): ...# Update orchestrator to route based on message contentclass SmartOrchestrator(MessageProcessor): async def _process(self, message: EnrichedMessage) -> Message: if "technical" in message.user_message.content.lower(): return TechnicalQuestion(...) elif "billing" in message.user_message.content.lower(): return BillingQuestion(...) else: return GeneralQuestion(...)
4
Add observability
Create a metrics observer:
Copy
class MetricsObserver(BaseObserver): async def on_message_processed(self, message: MessageProcessed): # Send to your metrics system metrics.increment(f"{message.source_processor.name}.messages") metrics.timing(f"{message.source_processor.name}.latency", latency)
Register it in setup config, and you have metrics without touching processor code.
# Manual orchestration, coupled logicasync def handle_message(user_message: str, user_id: str): user_info = await crm_client.get_user(user_id) history = await db.get_history(user_id) request = format_request(user_message, user_info, history) reply = await agent.generate(request) approved_reply = approve_reply(reply) return approved_reply# Everything coupled, hard to test, hard to extend# Where do you add caching? Retry logic? Metrics?# How do you compose this with other flows?