FastStream Integration
FastStream is a modern framework for building async message consumers and producers. Temporal-boost provides seamless integration with FastStream, allowing you to combine event-driven architectures with Temporal workflows.
Overview
FastStream integration allows you to: - Consume messages from message queues (Redis, RabbitMQ, Kafka, etc.) - Trigger Temporal workflows from message events - Process events asynchronously with reliable execution - Combine event-driven and workflow-based architectures
Installation
Install Temporal-boost with FastStream support:
This installs FastStream and its dependencies (including anyio).
Quick Start
Basic Example
from faststream import FastStream
from faststream.redis import RedisBroker
from pydantic import BaseModel
from temporal_boost import BoostApp
# Message model
class TaskMessage(BaseModel):
task_id: str
description: str
priority: int
# Initialize FastStream
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("tasks")
async def process_task(message: TaskMessage):
"""Process task messages."""
logger.info(f"Processing task: {message.task_id}")
# Initialize Temporal-boost
app = BoostApp("faststream-example")
# Register FastStream worker
app.add_faststream_worker("message_processor", faststream_app)
if __name__ == "__main__":
app.run()
Integration Patterns
Pattern 1: Message Queue → Temporal Workflow
Trigger Temporal workflows from message queue events:
from faststream import FastStream
from faststream.redis import RedisBroker
from temporalio.client import Client
from temporal_boost import BoostApp
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("orders")
async def handle_order(message: OrderMessage):
"""Handle order message and start workflow."""
client = await Client.connect("localhost:7233")
await client.start_workflow(
"OrderWorkflow",
message.dict(),
id=f"order-{message.order_id}",
task_queue="order_queue",
)
app = BoostApp("order-service")
app.add_worker("order_worker", "order_queue", workflows=[OrderWorkflow])
app.add_faststream_worker("message_processor", faststream_app)
Pattern 2: Multiple Message Queues
Handle multiple message queues with different handlers:
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("orders")
async def handle_orders(message: OrderMessage):
"""Handle order messages."""
# Process orders...
@broker.subscriber("notifications")
async def handle_notifications(message: NotificationMessage):
"""Handle notification messages."""
# Process notifications...
@broker.subscriber("emails")
async def handle_emails(message: EmailMessage):
"""Handle email messages."""
# Process emails...
app.add_faststream_worker("message_processor", faststream_app)
Pattern 3: Conditional Processing
Route messages based on content:
@broker.subscriber("tasks")
async def handle_task(message: TaskMessage):
"""Handle tasks with conditional routing."""
if message.priority > 5:
# High priority: execute immediately
await process_high_priority_task(message)
else:
# Normal priority: start workflow
client = await Client.connect("localhost:7233")
await client.start_workflow(
"TaskWorkflow",
message.dict(),
task_queue="task_queue",
)
Supported Brokers
FastStream supports multiple message brokers:
Redis
from faststream.redis import RedisBroker
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
RabbitMQ
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
faststream_app = FastStream(broker)
Kafka
from faststream.kafka import KafkaBroker
broker = KafkaBroker("localhost:9092")
faststream_app = FastStream(broker)
Configuration
Worker Configuration
Configure FastStream worker with custom options:
app.add_faststream_worker(
"message_processor",
faststream_app,
log_level=logging.DEBUG, # Custom log level
# Additional FastStream options
)
Message Broker Configuration
Configure broker connection:
# Redis with authentication
broker = RedisBroker("redis://user:password@localhost:6379")
# Redis with custom settings
broker = RedisBroker(
"redis://localhost:6379",
max_connections=10,
socket_keepalive=True,
)
# RabbitMQ with custom settings
broker = RabbitBroker(
"amqp://guest:guest@localhost:5672/",
max_connections=10,
virtualhost="/",
)
Error Handling
Message Processing Errors
Handle errors in message consumers:
@broker.subscriber("orders")
async def handle_order(message: OrderMessage):
"""Handle order with error handling."""
try:
client = await Client.connect("localhost:7233")
await client.start_workflow(
"OrderWorkflow",
message.dict(),
task_queue="order_queue",
)
except Exception as e:
logger.error(f"Failed to process order {message.order_id}: {e}")
# Optionally publish to dead-letter queue
await broker.publish(message.dict(), "orders-dlq")
raise
Dead-Letter Queues
Implement dead-letter queues for failed messages:
@broker.subscriber("orders")
async def handle_order(message: OrderMessage):
"""Handle order with DLQ support."""
try:
# Process message...
pass
except Exception:
# Publish to dead-letter queue
await broker.publish(message.dict(), "orders-dlq")
raise
Best Practices
- Use Pydantic Models: Define message schemas for type safety and validation
- Idempotency: Make message processing idempotent when possible
- Error Handling: Always handle errors gracefully with retries or DLQ
- Workflow Orchestration: Use Temporal workflows for complex processing
- Message Filtering: Use FastStream filtering for routing messages
- Monitoring: Monitor message processing rates and errors
- Resource Limits: Set appropriate concurrency limits for message processing
Examples
See the examples directory for comprehensive FastStream examples:
example_simple_faststream.py- Basic FastStream integrationexample_faststream_temporal.py- FastStream with Temporal workflowsexample_faststream_advanced.py- Advanced patterns and error handlingexample_faststream_producer.py- Message producer example