Skip to main content
tinychat provides three fundamental abstractions that allow you to conceptualize complex conversational flows as simple, independent units that communicate through message passing.

Message

Information quanta — the bits

MessageProcessor

Information transformations — the channels

CompositeProcessor

Information topology — the graph
Everything else in tinychat is derived from these three primitives. Databases, APIs, LLMs, and CRMs are all processors, while conversations and agents are composite processors.

Message: Information Quanta

Messages are the atomic units of information that flow through your system. They represent discrete pieces of data that processors transform and route.

Core Characteristics

Messages in tinychat are designed as immutable value objects with the following properties:
Message structure is frozen after creation, ensuring predictable behavior throughout the processing pipeline.
AIMessage.tool_calls uses mutable containers for updates as an acknowledged exception for LLM API compatibility.
Messages prefer bounded structures to keep them efficient and fast to transmit.
AIMessage.tool_calls is an acknowledged exception to support standard LLM API patterns.
No generic dictionaries. Every message has a specific, well-defined type that determines how it’s routed and processed.
Each message includes metadata like id and timestamp for tracking and debugging.

Semantic Boundaries

tinychat provides semantic markers for system boundaries:
class IngressMessage:
    """Entry point into a processing system"""
    pass

class EgressMessage:
    """Exit point from a processing system"""
    pass
These boundaries are semantic markers that help you reason about your system’s inputs and outputs. They’re not enforced by the framework, giving you flexibility in how you structure your flows.

MessageProcessor: Information Channels

Processors are stateful transformations that convert one message type into another. Think of them as channels through which information flows and gets transformed.

Processing Contract

Every processor implements a simple, powerful contract:
async def _process(self, message: Message) -> Optional[Message]:
    # Transform the message
    return transformed_message
The signature Message → Optional[Message] defines a processor as a potentially noisy channel that can fail (returning None) or succeed (returning a transformed message).

Key Properties

Processors maintain their own internal state, allowing them to learn, adapt, and make context-aware decisions across multiple message transformations.
class ConversationProcessor(MessageProcessor):
    def __init__(self):
        self._history = []  # Internal state
    
    async def _process(self, message: Message) -> Optional[Message]:
        self._history.append(message)
        # Use history to inform transformation
        return self._transform_with_context(message)
Processors can return None to signal processing failure or message filtering, modeling real-world noisy communication channels.
async def _process(self, message: Message) -> Optional[Message]:
    if not self._is_valid(message):
        return None  # Filter out invalid messages
    return self._transform(message)
Processors are asynchronous by default, allowing concurrent I/O operations while respecting system resource bounds.
Everything in tinychat is async. Concurrency is fundamental to I/O-bound conversational systems. Build sync wrappers yourself if needed.

Lifecycle Management

Processors require explicit setup before use:
config = SetupConfig(
    task_manager_params=TaskManagerParams(loop=asyncio.get_running_loop()),
    observers=[MyObserver()],
)

await processor.setup(config)
# Now ready to process messages
result = await processor.process(message)
1

Setup before processing

Call setup() before calling process() on any processor. This initializes shared resources like the TaskManager and registers observers.
2

Shared configuration

SetupConfig provides shared TaskManager and observers across all processors. In recursive composition, parent CompositeProcessor shares the TaskManager with child processors.
3

Cleanup when done

Call cleanup() to properly release resources when you’re finished with a processor.

Output Type Declaration

Processors can declare their output types for topology validation:
enricher = EnricherProcessor(output_types={EnrichedData})
analyzer = AnalyzerProcessor(output_types={ProcessedResult})
formatter = FormatterProcessor(output_types={EgressMessage})
Declaring output types is optional but highly recommended for complex topologies. It helps catch routing errors at construction time rather than runtime.

CompositeProcessor: Information Topology

The CompositeProcessor is where the magic happens. It creates a type-based routing graph that automatically chains message transformations based on message types.

Type-Based Routing

CompositeProcessor routes messages through handlers based on their types, creating a 1:1 mapping from message type to handler:
# Define processors for each stage
crm = CRMProcessor(output_types={CRMMessage})
db = DBProcessor(output_types={EnrichedMessage})
llm = LLMProcessor(output_types={ReplyMessage})
formatter = FormatterProcessor(output_types={EgressMessage})

# Build routing topology via type-based handlers
chatbot = CompositeProcessor(
    handlers={
        IngressMessage: crm,      # Entry point
        CRMMessage: db,           # CRM → DB
        EnrichedMessage: llm,     # DB → LLM
        ReplyMessage: formatter,  # LLM → Format
        # Formatter produces EgressMessage (terminal)
    },
    max_hops=10,
)

# Process returns when EgressMessage is produced or max_hops reached
result = await chatbot.process(user_message)

Message flow through a type-based routing topology

Routing Semantics

Each message type maps to exactly one handler. This ensures deterministic routing and prevents ambiguity.
handlers={
    UserInput: processor_a,    # UserInput always goes to processor_a
    DataRequest: processor_b,  # DataRequest always goes to processor_b
}
Each handler can produce multiple different message types based on internal state and logic. This creates state-dependent routing paths.
class RouterProcessor(MessageProcessor):
    async def _process(self, message: InputMessage) -> Message:
        if message.needs_approval:
            return ApprovalMessage(content=message.content)
        elif message.is_urgent:
            return UrgentMessage(content=message.content)
        else:
            return StandardMessage(content=message.content)
Results are automatically re-routed through handlers until:
  • An EgressMessage is produced (terminal condition)
  • None is returned by a processor
  • max_hops is reached (prevents infinite loops)
# No manual chaining needed - the composite handles it
result = await composite.process(ingress_message)
# Automatically routes through: Ingress → A → B → C → Egress

Topology Patterns

The 1:N handler-to-types relationship enables rich topology patterns:
Linear chain where each processor produces exactly one message type:
# A → B → C → D
handlers={
    MessageA: processor_b,  # Produces MessageB
    MessageB: processor_c,  # Produces MessageC
    MessageC: processor_d,  # Produces MessageD
    MessageD: formatter,    # Produces EgressMessage
}

Execution Model

Processing is sequential within a chain but concurrent across chains. One message transformation happens at a time within a single message flow, but multiple independent message flows can be processed concurrently.
# Sequential: A → B → C (one at a time within this chain)
chain1 = await composite.process(message1)

# Concurrent: Multiple independent chains process simultaneously
results = await asyncio.gather(
    composite.process(message1),  # Chain 1
    composite.process(message2),  # Chain 2  
    composite.process(message3),  # Chain 3
)

Topology Validation

Validate your graph structure at construction time:
# CompositeProcessor validates that all declared output types have handlers
composite = CompositeProcessor(
    handlers={
        IngressMessage: processor_a,  # Declares output: MessageB
        MessageB: processor_b,        # Declares output: MessageC
        # ❌ Missing handler for MessageC!
    },
    max_hops=10,
)
# Raises validation error: MessageC has no handler
Use composite.get_topology() to visualize your graph structure and verify routing logic.
Terminal types (None and EgressMessage) are excluded from validation but still shown in topology visualizations.

Interruption Control

Stop processing gracefully when needed:
# Start processing
task = asyncio.create_task(composite.process(message))

# Interrupt if needed
await composite.interrupt_processing()

# Processing will stop at the next message transformation

Design Philosophy

Simplicity First

Minimal, essential building blocks. No assumptions about caching, rate limiting, or retry logic in core primitives.

Information Theory

Grounded in information theory: messages as quanta, processors as channels, composition as topology.

Composable by Nature

Simple primitives combine to create arbitrarily complex systems. CompositeProcessor is itself a MessageProcessor.

Async by Default

Built for I/O-bound conversational systems. Concurrency is fundamental, not an afterthought.
The framework provides the primitives. You implement the policies. Features like caching, rate limiting, retry logic, circuit breakers, authentication, persistence, and monitoring are user concerns that should be implemented as observers or custom processors.