Skip to main content
Let’s build something real: a calculator agent that uses LLM tool calling to perform mathematical operations. This example demonstrates how tinychat’s three primitives combine to create a functional conversational AI system.

What We’re Building

A conversational agent that:
  • Receives natural language math questions
  • Uses an LLM to understand intent and call tools
  • Performs calculations through a custom calculator tool
  • Returns formatted results

Input

“What is 42 multiplied by 13? Then add 100 to that result.”

Output

“Result: 646.00”

The Building Blocks in Action

This example perfectly illustrates why tinychat’s primitives are powerful: each abstraction handles exactly one concern, and they compose naturally.

Message: Defining Information Flow

We use two message types to mark system boundaries:
# Entry point - natural language from user
IngressMessage(
    content="What is 42 multiplied by 13? Then add 100 to that result.",
    conversation_id="calculator-demo",
)

# Exit point - final answer
EgressMessage(
    content="Result: 646.00",
    conversation_id="calculator-demo",
)
IngressMessage and EgressMessage are semantic markers. They tell us “this is where data enters” and “this is where data exits,” making the system’s boundaries explicit.
These messages are immutable value objects—they carry information without behavior. The calculator agent doesn’t need to know where the question came from (a chat UI, API, CLI) or where the answer goes. It just transforms information.

MessageProcessor: Encapsulating Behavior

The LLMProcessor is where transformation happens:
class LLMProcessor(MessageProcessor):
    # Configure the agent with tools
    calculator_tool = CalculatorTool(
        name="calculate",
        description="Perform basic arithmetic operations",
        parameters=[...],
    )
    
    agent = OpenAIAgent(config)
    
    async def _process(self, message: IngressMessage) -> Optional[EgressMessage]:
        # Convert to LLM format
        messages = [{"role": "user", "content": message.content}]
        
        # Generate response (agent handles tool calling internally)
        response = await self.agent.reply(messages)
        
        # Return as EgressMessage
        return EgressMessage(
            content=response,
            conversation_id=message.conversation_id,
        )
The processor is stateful—the OpenAIAgent maintains conversation context and tool definitions. But the messages it processes are stateless values.This separation is crucial:
  • The processor can be warm-started, cached, or replicated
  • Messages can be logged, replayed, or tested independently
  • State is explicit and contained, not scattered across message objects
The signature IngressMessage → Optional[EgressMessage] documents the entire flow:
  • Input: Natural language question
  • Output: Final answer (or None if processing fails)
No need to trace through complex inheritance hierarchies or hidden state mutations. The types tell you exactly what data flows where.

CompositeProcessor: Orchestrating Flow

Even though this example has just one processor, we use CompositeProcessor to set up the routing topology:
chatbot = CompositeProcessor(
    handlers={
        IngressMessage: llm,
    },
)
This might seem like overkill for a single processor, but it establishes a pattern that scales. When you need to add preprocessing, validation, or multi-step reasoning, you just add more handlers—the composite takes care of routing.

Scaling the Topology

Imagine extending this to a more sophisticated agent:
chatbot = CompositeProcessor(
    handlers={
        IngressMessage: validator,        # Check input safety
        ValidatedInput: llm,               # Process with LLM
        ToolCallMessage: tool_executor,    # Execute tools
        ToolResultMessage: llm,            # LLM processes results
        FinalAnswer: formatter,            # Format output
        # Produces EgressMessage (terminal)
    },
    max_hops=10,
)
The same three primitives, the same composition pattern—just more message types and more processors. Complexity grows linearly, not exponentially.

The Tool: Arbitrary Complexity Welcome

The CalculatorTool demonstrates that processors can wrap arbitrarily complex behavior:
@dataclass
class CalculatorTool(Tool):
    precision: int = 2
    
    def _validate_operation(self, operation: str) -> bool:
        return operation in ["add", "subtract", "multiply", "divide"]
    
    def _perform_calculation(self, operation: str, a: float, b: float) -> float | str:
        operations = {
            "add": lambda x, y: x + y,
            "subtract": lambda x, y: x - y,
            "multiply": lambda x, y: x * y,
            "divide": lambda x, y: x / y if y != 0 else "Error: Division by zero",
        }
        return operations[operation](a, b)
    
    async def run(self, operation: str, a: float, b: float) -> str:
        if not self._validate_operation(operation):
            return "Error: Invalid operation..."
        
        result = self._perform_calculation(operation, a, b)
        return f"Result: {result:.{self.precision}f}"
The calculator validates, computes, and formats—all the behaviors of a processor. But because it’s defined as a Tool, the LLM can invoke it automatically.This is composition at the abstraction level: the LLM processor composes with tool processors, but the framework doesn’t dictate how tools work internally.
Want to swap the calculator for a database query tool? A web search tool? An API call? Just implement the run method:
@dataclass
class DatabaseTool(Tool):
    db_connection: DatabaseConnection
    
    async def run(self, query: str) -> str:
        # Arbitrary complexity
        results = await self.db_connection.execute(query)
        return self._format_results(results)
The framework doesn’t care. It just passes messages.

Lifecycle: Setup and Execution

The main flow demonstrates the lifecycle pattern:
async def main():
    # 1. Create configuration
    config = SetupConfig(
        task_manager_params=TaskManagerParams(loop=asyncio.get_running_loop()),
        observers=[LoggingObserver()],
    )
    
    # 2. Create processors with output types
    llm = LLMProcessor(name="calculator_agent", output_types={EgressMessage})
    
    # 3. Build topology
    chatbot = CompositeProcessor(
        handlers={IngressMessage: llm},
    )
    
    # 4. Setup (must happen before processing)
    await chatbot.setup(config)
    
    # 5. Process message
    result = await chatbot.process(message)
1

Configuration phase

Create shared resources like the TaskManager and observers. These are injected into all processors during setup.
2

Assembly phase

Define your processors and declare their output types. Build the routing topology by mapping message types to handlers.
Output type declaration enables validation—the composite verifies that all declared types have handlers before you process any messages.
3

Setup phase

Call setup() to initialize processors with shared configuration. This is where observers get registered and async resources are prepared.
You must call setup() before process(). This explicit lifecycle prevents subtle bugs from implicit initialization.
4

Execution phase

Process messages. The composite automatically routes through handlers, chains transformations, and returns when a terminal condition is met.

Observability Without Coupling

Notice the LoggingObserver:
class LoggingObserver(BaseObserver):
    async def on_message_received(self, message: MessageReceived) -> None:
        logger.debug(f"📨 [{message.source_processor.name}] Received: {message.content}")
    
    async def on_message_processed(self, message: MessageProcessed) -> None:
        logger.debug(f"✅ [{message.source_processor.name}] Returned: {message.content}")
Observers handle cross-cutting concerns like logging, metrics, tracing, and debugging without polluting processor logic.
The processor doesn’t know or care about logging:
# Clean processor - no logging code
async def _process(self, message: IngressMessage) -> Optional[EgressMessage]:
    messages = [{"role": "user", "content": message.content}]
    response = await self.agent.reply(messages)
    return EgressMessage(content=response, conversation_id=message.conversation_id)
Want to add metrics? Create a MetricsObserver. Want distributed tracing? Create a TracingObserver. The processor code never changes.

Why This Architecture Works

Let’s connect this example back to tinychat’s philosophy:

Simplicity

Three primitives, clearly separated. Messages carry data, processors transform it, composites route it.

Composability

Add more processors, more message types, more complex routing—the pattern stays the same.

Explicitness

Types document flow, setup is explicit, boundaries are marked. No magic, no surprises.

Flexibility

Swap tools, change LLMs, add preprocessing, inject state—it’s all just processors.

What You Don’t See (By Design)

This example notably doesn’t include:
  • Retry logic with exponential backoff
  • Rate limiting for API calls
  • Caching of responses
  • Authentication/authorization
  • Request idempotency
  • Persistent conversation history
These are policy concerns, not primitive concerns. tinychat gives you the building blocks; you implement the policies that make sense for your use case.
Want retry logic? Wrap your processor:
class RetryProcessor(MessageProcessor):
    def __init__(self, inner: MessageProcessor, max_retries: int = 3):
        self.inner = inner
        self.max_retries = max_retries
    
    async def _process(self, message: Message) -> Optional[Message]:
        for attempt in range(self.max_retries):
            try:
                return await self.inner.process(message)
            except Exception as e:
                if attempt == self.max_retries - 1:
                    raise
                await asyncio.sleep(2 ** attempt)
The framework doesn’t dictate this—you build what you need.

Key Takeaways

Well-defined message types document your system’s information flow. Look at the types, understand the system.
Each processor does one thing. State is internal, interface is simple. Test in isolation, compose in production.
CompositeProcessor turns independent units into sophisticated flows. Type-based routing makes the topology explicit and verifiable.
The same three primitives work for a single LLM call or a multi-agent system with dozens of processors. Complexity is in the topology, not the abstractions.

Ready to dive deeper?

Learn about the information theory foundations and advanced topology patterns in the Core Primitives guide.