Creating application
This guide covers everything you need to know about creating Temporal-boost applications, from basic setup to advanced patterns.
Table of Contents
- Application Structure
- BoostApp Initialization
- Defining Activities
- Defining Workflows
- Adding Workers
- CRON Workers
- ASGI Workers
- FastStream Workers
- Best Practices
Application Structure
A typical Temporal-boost application follows this structure:
my_app/
├── main.py # Application entry point
├── activities.py # Activity definitions
├── workflows.py # Workflow definitions
├── config.py # Configuration (optional)
└── requirements.txt # Dependencies
BoostApp Initialization
The BoostApp class is the central component of your application. Initialize it at the start of your application:
from temporal_boost import BoostApp
app = BoostApp(
name="my-service", # Application name (optional)
temporal_endpoint=None, # Override TEMPORAL_TARGET_HOST (optional)
temporal_namespace=None, # Override TEMPORAL_NAMESPACE (optional)
debug_mode=False, # Enable debug mode (optional)
use_pydantic=None, # Override Pydantic converter (optional)
logger_config=None, # Custom logging config (optional)
)
Configuration Priority
Configuration is loaded in this order:
- Environment variables (highest priority)
- BoostApp initialization parameters
- Default values (lowest priority)
For example, if you set TEMPORAL_TARGET_HOST in your environment, it will override any value passed to BoostApp.
Defining Activities
Activities are functions that perform actual work. They should be deterministic-free and can perform I/O operations.
Basic Activity
from temporalio import activity
@activity.defn(name="process_payment")
async def process_payment(amount: float, currency: str) -> dict:
"""Process a payment transaction."""
# Your business logic here
return {"status": "success", "amount": amount, "currency": currency}
Activity with Pydantic Models
Using Pydantic models provides type safety and validation:
from pydantic import BaseModel
from temporalio import activity
class PaymentRequest(BaseModel):
amount: float
currency: str
customer_id: str
class PaymentResponse(BaseModel):
transaction_id: str
status: str
amount: float
@activity.defn(name="process_payment")
async def process_payment(request: PaymentRequest) -> PaymentResponse:
"""Process a payment with type-safe models."""
# Process payment...
return PaymentResponse(
transaction_id="tx_123",
status="completed",
amount=request.amount,
)
Activity with Retry Options
Activities can have custom retry policies:
from temporalio import activity
from temporalio.common import RetryPolicy
@activity.defn(
name="unreliable_api_call",
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0,
maximum_interval=timedelta(seconds=60),
maximum_attempts=5,
),
)
async def unreliable_api_call(url: str) -> dict:
"""Activity with custom retry policy."""
import httpx
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
Activity Best Practices
- ✅ Keep activities idempotent when possible
- ✅ Use appropriate timeouts (
start_to_close_timeout) - ✅ Handle errors gracefully
- ✅ Use Pydantic models for complex data structures
- ✅ Log important operations
- ❌ Don't use random numbers or current time in activities
- ❌ Don't perform operations that can't be retried
Defining Workflows
Workflows orchestrate activities and define business logic. They must be deterministic.
Basic Workflow
from datetime import timedelta
from temporalio import workflow
@workflow.defn(sandboxed=False, name="OrderProcessingWorkflow")
class OrderProcessingWorkflow:
@workflow.run
async def run(self, order_id: str) -> dict:
"""Process an order through multiple steps."""
# Step 1: Validate order
validation_result = await workflow.execute_activity(
validate_order,
order_id,
task_queue="order_queue",
start_to_close_timeout=timedelta(minutes=5),
)
if not validation_result["valid"]:
return {"status": "failed", "reason": "validation_failed"}
# Step 2: Process payment
payment_result = await workflow.execute_activity(
process_payment,
validation_result["amount"],
task_queue="payment_queue",
start_to_close_timeout=timedelta(minutes=10),
)
# Step 3: Fulfill order
fulfillment_result = await workflow.execute_activity(
fulfill_order,
order_id,
task_queue="fulfillment_queue",
start_to_close_timeout=timedelta(minutes=30),
)
return {
"status": "completed",
"order_id": order_id,
"payment": payment_result,
"fulfillment": fulfillment_result,
}
Workflow with Signals
Signals allow external systems to send data to running workflows:
from temporalio import workflow
@workflow.defn(sandboxed=False, name="ApprovalWorkflow")
class ApprovalWorkflow:
def __init__(self):
self.approved = False
self.rejected = False
@workflow.run
async def run(self, request_id: str) -> dict:
"""Wait for approval signal."""
await workflow.wait_condition(lambda: self.approved or self.rejected)
if self.approved:
return {"status": "approved", "request_id": request_id}
return {"status": "rejected", "request_id": request_id}
@workflow.signal(name="approve")
def approve(self) -> None:
"""Signal handler for approval."""
self.approved = True
@workflow.signal(name="reject")
def reject(self) -> None:
"""Signal handler for rejection."""
self.rejected = True
Workflow with Queries
Queries allow reading workflow state without affecting execution:
from temporalio import workflow
@workflow.defn(sandboxed=False, name="OrderStatusWorkflow")
class OrderStatusWorkflow:
def __init__(self):
self.status = "pending"
self.progress = 0
@workflow.run
async def run(self, order_id: str) -> dict:
"""Process order and update status."""
self.status = "processing"
await workflow.execute_activity(
process_order,
order_id,
task_queue="order_queue",
start_to_close_timeout=timedelta(minutes=30),
)
self.status = "completed"
self.progress = 100
return {"status": self.status, "order_id": order_id}
@workflow.query(name="status")
def get_status(self) -> dict:
"""Query workflow status."""
return {"status": self.status, "progress": self.progress}
Workflow Best Practices
- ✅ Keep workflows deterministic (no random, no time, no I/O)
- ✅ Use appropriate timeouts for activities
- ✅ Handle errors with try/except blocks
- ✅ Use signals for external input
- ✅ Use queries for state inspection
- ✅ Use
sandboxed=Falsefor most workflows (better performance) - ❌ Don't use
datetime.now()- useworkflow.now() - ❌ Don't perform I/O operations directly
Adding Workers
Workers connect your activities and workflows to Temporal task queues.
Basic Worker Registration
app.add_worker(
worker_name="payment_worker",
task_queue="payment_queue",
activities=[process_payment, refund_payment],
workflows=[PaymentWorkflow],
)
Worker Parameters
The add_worker method accepts:
app.add_worker(
worker_name: str, # Unique worker name
task_queue: str, # Temporal task queue name
activities: list[Callable] | None, # List of activity functions
workflows: list[type] | None, # List of workflow classes
interceptors: list[Interceptor] | None, # Optional interceptors
cron_schedule: str | None, # CRON schedule (for CRON workers)
cron_runner: Callable | None, # CRON runner method
**worker_kwargs: Any, # Additional worker options
) -> TemporalBoostWorker
Multiple Workers Example
# Activity-only worker
app.add_worker(
"payment_activities",
"payment_queue",
activities=[process_payment, refund_payment, validate_payment],
)
# Workflow-only worker
app.add_worker(
"order_workflows",
"order_queue",
workflows=[OrderWorkflow, RefundWorkflow],
)
# Combined worker
app.add_worker(
"combined_worker",
"main_queue",
activities=[process_order, send_notification],
workflows=[OrderWorkflow],
)
Worker Configuration
After adding a worker, you can configure it further:
worker = app.add_worker(
"custom_worker",
"custom_queue",
activities=[my_activity],
)
# Configure Temporal client
worker.configure_temporal_client(
target_host="custom-host:7233",
namespace="custom_namespace",
use_pydantic_data_converter=True,
)
# Configure runtime with Prometheus metrics
worker.configure_temporal_runtime(
prometheus_bind_address="0.0.0.0:9090",
)
CRON Workers
CRON workers automatically start workflows on a schedule.
Creating a CRON Worker
@workflow.defn(sandboxed=False, name="DailyReportWorkflow")
class DailyReportWorkflow:
@workflow.run
async def run(self) -> None:
"""Generate daily report."""
await workflow.execute_activity(
generate_report,
task_queue="report_queue",
start_to_close_timeout=timedelta(minutes=30),
)
app.add_worker(
"daily_report_cron",
"report_queue",
workflows=[DailyReportWorkflow],
cron_schedule="0 0 * * *", # Run at midnight every day
cron_runner=DailyReportWorkflow.run,
)
CRON Schedule Format
CRON schedules use standard format: minute hour day month weekday
Examples:
"0 * * * *"- Every hour at minute 0"0 0 * * *"- Every day at midnight"0 9 * * 1"- Every Monday at 9 AM"*/5 * * * *"- Every 5 minutes"0 0 1 * *"- First day of every month
Running CRON Workers
ASGI Workers
ASGI workers allow you to run FastAPI, Starlette, or any ASGI application alongside your Temporal workers.
Basic ASGI Worker
from fastapi import FastAPI
from temporal_boost import BoostApp, ASGIWorkerType
app = BoostApp("my-service")
# Create your FastAPI app
fastapi_app = FastAPI(title="My API")
@fastapi_app.get("/health")
async def health():
return {"status": "healthy"}
# Add ASGI worker
app.add_asgi_worker(
"api_worker",
fastapi_app,
host="0.0.0.0",
port=8000,
asgi_worker_type=ASGIWorkerType.auto, # Auto-detect available server
)
Specifying ASGI Server
# Use Uvicorn
app.add_asgi_worker(
"api_worker",
fastapi_app,
"0.0.0.0",
8000,
asgi_worker_type=ASGIWorkerType.uvicorn,
)
# Use Hypercorn
app.add_asgi_worker(
"api_worker",
fastapi_app,
"0.0.0.0",
8000,
asgi_worker_type=ASGIWorkerType.hypercorn,
)
# Use Granian
app.add_asgi_worker(
"api_worker",
fastapi_app,
"0.0.0.0",
8000,
asgi_worker_type=ASGIWorkerType.granian,
)
ASGI Worker from String Path
You can also load ASGI apps from a string path:
ASGI Worker with Temporal Integration
Combine ASGI endpoints with Temporal workflows:
from fastapi import FastAPI
from temporalio.client import Client
fastapi_app = FastAPI()
@fastapi_app.post("/orders")
async def create_order(order_data: dict):
"""Create an order via Temporal workflow."""
client = await Client.connect("localhost:7233")
workflow_id = await client.start_workflow(
"OrderWorkflow",
order_data,
id=f"order-{order_data['id']}",
task_queue="order_queue",
)
return {"workflow_id": workflow_id}
FastStream Workers
FastStream workers integrate event-driven architectures with Temporal. FastStream is a framework for building async message consumers and producers, supporting multiple message brokers (Redis, RabbitMQ, Kafka, etc.).
Basic FastStream Worker
from faststream import FastStream
from faststream.redis import RedisBroker
from temporal_boost import BoostApp
# Initialize FastStream broker and app
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("tasks")
async def process_task(message: dict):
"""Process task from message queue."""
logger.info(f"Processing task: {message['task_id']}")
app = BoostApp("event-driven-service")
app.add_faststream_worker("message_processor", faststream_app)
FastStream with Pydantic Models
Use Pydantic models for type-safe message handling:
from pydantic import BaseModel
from faststream import FastStream
from faststream.redis import RedisBroker
from temporal_boost import BoostApp
class OrderMessage(BaseModel):
order_id: str
customer_id: str
items: list[dict]
total: float
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("orders")
async def handle_order(message: OrderMessage):
"""Handle order messages."""
logger.info(f"Received order: {message.order_id}")
app = BoostApp("order-service")
app.add_faststream_worker("order_processor", faststream_app)
FastStream with Temporal Workflows
Combine FastStream message consumers with Temporal workflows:
from faststream import FastStream
from faststream.redis import RedisBroker
from temporalio.client import Client
from temporalio import activity, workflow
from temporal_boost import BoostApp
from datetime import timedelta
app = BoostApp("faststream-temporal")
# Temporal workflow
@workflow.defn(sandboxed=False, name="OrderWorkflow")
class OrderWorkflow:
@workflow.run
async def run(self, order_data: dict) -> dict:
# Process order...
return {"status": "completed"}
# FastStream app
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("orders")
async def handle_order(message: OrderMessage):
"""Handle order and start Temporal workflow."""
client = await Client.connect("localhost:7233")
workflow_id = await client.start_workflow(
"OrderWorkflow",
message.dict(),
id=f"order-{message.order_id}",
task_queue="order_queue",
)
logger.info(f"Started workflow {workflow_id}")
# Register both workers
app.add_worker("order_worker", "order_queue", workflows=[OrderWorkflow])
app.add_faststream_worker("message_processor", faststream_app)
Multiple Message Queues
Handle multiple message queues:
broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)
@broker.subscriber("orders")
async def handle_orders(message: OrderMessage):
"""Handle order messages."""
# Process orders...
@broker.subscriber("notifications")
async def handle_notifications(message: NotificationMessage):
"""Handle notification messages."""
# Process notifications...
app.add_faststream_worker("message_processor", faststream_app)
FastStream with Different Brokers
FastStream supports multiple brokers. Examples:
Redis:
RabbitMQ:
from faststream.rabbit import RabbitBroker
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
Kafka:
FastStream Worker Configuration
Configure FastStream worker with custom options:
app.add_faststream_worker(
"message_processor",
faststream_app,
log_level=logging.DEBUG, # Custom log level
# Additional FastStream options can be passed here
)
Best Practices for FastStream Integration
- Use Pydantic models: Define message schemas with Pydantic for validation
- Error handling: Handle errors in message consumers gracefully
- Idempotency: Make message processing idempotent when possible
- Workflow orchestration: Use Temporal workflows for complex processing
- Message filtering: Use FastStream filtering for routing messages
- Dead-letter queues: Implement dead-letter queues for failed messages
- Monitoring: Monitor message processing rates and errors
FastStream Producer Example
Publish messages to queues:
from faststream import FastStream
from faststream.redis import RedisBroker
broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)
async def publish_order(order_data: dict):
"""Publish order message."""
await broker.publish(order_data, "orders")
# Use in your application
asyncio.run(publish_order({"order_id": "123", "total": 99.99}))
Best Practices
Application Structure
- Separate concerns: Keep activities, workflows, and configuration in separate files
- Use modules: Organize code into logical modules
- Environment configuration: Use environment variables for all configuration
- Type hints: Use type hints throughout for better IDE support
Worker Organization
- One worker per queue: Each task queue should have dedicated workers
- Group related workers: Put related activities/workflows in the same worker
- Separate concerns: Keep different business domains in separate workers
- Resource limits: Set appropriate concurrency limits per worker
Error Handling
- Activity retries: Configure retry policies for activities
- Workflow timeouts: Set appropriate timeouts for workflow execution
- Error propagation: Handle errors appropriately in workflows
- Logging: Log errors with context for debugging
Performance
- Activity concurrency: Tune
TEMPORAL_MAX_CONCURRENT_ACTIVITIES - Workflow concurrency: Tune
TEMPORAL_MAX_CONCURRENT_WORKFLOW_TASKS - Task queue separation: Use separate queues for different workloads
- Monitoring: Enable Prometheus metrics for observability
Security
- TLS: Enable TLS for production Temporal connections
- API keys: Use API keys for Temporal Cloud or secured clusters
- Secrets: Store sensitive data in environment variables or secret managers
- Validation: Validate all inputs in activities and workflows