Skip to content

Examples

This page provides comprehensive examples covering common Temporal-boost patterns and use cases.

Table of Contents

Basic Examples

Example 1: Simple Activity and Workflow

The most basic Temporal-boost application:

import logging
from datetime import timedelta
from temporalio import activity, workflow
from temporal_boost import BoostApp

logging.basicConfig(level=logging.INFO)

app = BoostApp(name="simple-example")

@activity.defn(name="say_hello")
async def say_hello(name: str) -> str:
    return f"Hello, {name}!"

@workflow.defn(sandboxed=False, name="GreetingWorkflow")
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> str:
        return await workflow.execute_activity(
            say_hello,
            name,
            task_queue="greeting_queue",
            start_to_close_timeout=timedelta(minutes=1),
        )

app.add_worker(
    "greeting_worker",
    "greeting_queue",
    activities=[say_hello],
    workflows=[GreetingWorkflow],
)

if __name__ == "__main__":
    app.run()

Example 2: Pydantic Models

Using Pydantic for type-safe data structures:

from pydantic import BaseModel
from temporalio import activity, workflow
from temporal_boost import BoostApp

app = BoostApp(name="pydantic-example", use_pydantic=True)

class User(BaseModel):
    id: int
    name: str
    email: str

class UserResponse(BaseModel):
    user_id: int
    created_at: str
    status: str

@activity.defn(name="create_user")
async def create_user(user: User) -> UserResponse:
    # Simulate user creation
    return UserResponse(
        user_id=user.id,
        created_at="2024-01-01T00:00:00Z",
        status="created",
    )

@workflow.defn(sandboxed=False, name="UserCreationWorkflow")
class UserCreationWorkflow:
    @workflow.run
    async def run(self, user: User) -> UserResponse:
        return await workflow.execute_activity(
            create_user,
            user,
            task_queue="user_queue",
            start_to_close_timeout=timedelta(minutes=5),
        )

app.add_worker(
    "user_worker",
    "user_queue",
    activities=[create_user],
    workflows=[UserCreationWorkflow],
)

if __name__ == "__main__":
    app.run()

Example 3: Multiple Activities in Sequence

from datetime import timedelta
from temporalio import activity, workflow
from temporal_boost import BoostApp

app = BoostApp(name="multi-activity-example")

@activity.defn(name="validate_order")
async def validate_order(order_id: str) -> dict:
    return {"valid": True, "order_id": order_id}

@activity.defn(name="charge_payment")
async def charge_payment(order_id: str, amount: float) -> dict:
    return {"charged": True, "amount": amount}

@activity.defn(name="send_confirmation")
async def send_confirmation(order_id: str) -> dict:
    return {"sent": True, "order_id": order_id}

@workflow.defn(sandboxed=False, name="OrderWorkflow")
class OrderWorkflow:
    @workflow.run
    async def run(self, order_id: str, amount: float) -> dict:
        # Step 1: Validate
        validation = await workflow.execute_activity(
            validate_order,
            order_id,
            task_queue="order_queue",
            start_to_close_timeout=timedelta(minutes=5),
        )

        if not validation["valid"]:
            return {"status": "failed", "reason": "validation"}

        # Step 2: Charge
        payment = await workflow.execute_activity(
            charge_payment,
            order_id,
            amount,
            task_queue="payment_queue",
            start_to_close_timeout=timedelta(minutes=10),
        )

        # Step 3: Confirm
        confirmation = await workflow.execute_activity(
            send_confirmation,
            order_id,
            task_queue="notification_queue",
            start_to_close_timeout=timedelta(minutes=5),
        )

        return {
            "status": "completed",
            "order_id": order_id,
            "payment": payment,
            "confirmation": confirmation,
        }

app.add_worker("order_worker", "order_queue", activities=[validate_order])
app.add_worker("payment_worker", "payment_queue", activities=[charge_payment])
app.add_worker("notification_worker", "notification_queue", activities=[send_confirmation])
app.add_worker("workflow_worker", "workflow_queue", workflows=[OrderWorkflow])

if __name__ == "__main__":
    app.run()

Advanced Patterns

Example 4: Workflow with Signals

from temporalio import workflow
from temporal_boost import BoostApp

app = BoostApp(name="signal-example")

@workflow.defn(sandboxed=False, name="ApprovalWorkflow")
class ApprovalWorkflow:
    def __init__(self):
        self.approved = False
        self.rejected = False
        self.comments = ""

    @workflow.run
    async def run(self, request_id: str) -> dict:
        await workflow.wait_condition(lambda: self.approved or self.rejected)

        return {
            "request_id": request_id,
            "status": "approved" if self.approved else "rejected",
            "comments": self.comments,
        }

    @workflow.signal(name="approve")
    def approve(self, comments: str = "") -> None:
        self.approved = True
        self.comments = comments

    @workflow.signal(name="reject")
    def reject(self, comments: str) -> None:
        self.rejected = True
        self.comments = comments

app.add_worker("approval_worker", "approval_queue", workflows=[ApprovalWorkflow])

if __name__ == "__main__":
    app.run()

Example 5: CRON Worker

from datetime import timedelta
from temporalio import activity, workflow
from temporal_boost import BoostApp

app = BoostApp(name="cron-example")

@activity.defn(name="generate_report")
async def generate_report() -> dict:
    # Generate daily report
    return {"report_id": "report_123", "generated_at": "2024-01-01"}

@workflow.defn(sandboxed=False, name="DailyReportWorkflow")
class DailyReportWorkflow:
    @workflow.run
    async def run(self) -> None:
        await workflow.execute_activity(
            generate_report,
            task_queue="report_queue",
            start_to_close_timeout=timedelta(minutes=30),
        )

app.add_worker(
    "report_worker",
    "report_queue",
    activities=[generate_report],
    workflows=[DailyReportWorkflow],
    cron_schedule="0 0 * * *",  # Daily at midnight
    cron_runner=DailyReportWorkflow.run,
)

if __name__ == "__main__":
    app.run()

Example 6: Parallel Activities

import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporal_boost import BoostApp

app = BoostApp(name="parallel-example")

@activity.defn(name="fetch_user_data")
async def fetch_user_data(user_id: str) -> dict:
    return {"user_id": user_id, "data": "user_data"}

@activity.defn(name="fetch_order_data")
async def fetch_order_data(order_id: str) -> dict:
    return {"order_id": order_id, "data": "order_data"}

@activity.defn(name="fetch_payment_data")
async def fetch_payment_data(payment_id: str) -> dict:
    return {"payment_id": payment_id, "data": "payment_data"}

@workflow.defn(sandboxed=False, name="DataAggregationWorkflow")
class DataAggregationWorkflow:
    @workflow.run
    async def run(self, user_id: str, order_id: str, payment_id: str) -> dict:
        # Execute activities in parallel
        user_data, order_data, payment_data = await asyncio.gather(
            workflow.execute_activity(
                fetch_user_data,
                user_id,
                task_queue="data_queue",
                start_to_close_timeout=timedelta(minutes=5),
            ),
            workflow.execute_activity(
                fetch_order_data,
                order_id,
                task_queue="data_queue",
                start_to_close_timeout=timedelta(minutes=5),
            ),
            workflow.execute_activity(
                fetch_payment_data,
                payment_id,
                task_queue="data_queue",
                start_to_close_timeout=timedelta(minutes=5),
            ),
        )

        return {
            "user": user_data,
            "order": order_data,
            "payment": payment_data,
        }

app.add_worker(
    "data_worker",
    "data_queue",
    activities=[fetch_user_data, fetch_order_data, fetch_payment_data],
    workflows=[DataAggregationWorkflow],
)

if __name__ == "__main__":
    app.run()

Example 7: Activity Retry Policy

from datetime import timedelta
from temporalio import activity
from temporalio.common import RetryPolicy
from temporal_boost import BoostApp

app = BoostApp(name="retry-example")

@activity.defn(
    name="unreliable_api_call",
    start_to_close_timeout=timedelta(seconds=30),
    retry_policy=RetryPolicy(
        initial_interval=timedelta(seconds=1),
        backoff_coefficient=2.0,
        maximum_interval=timedelta(seconds=60),
        maximum_attempts=5,
    ),
)
async def unreliable_api_call(url: str) -> dict:
    import httpx
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()

app.add_worker("api_worker", "api_queue", activities=[unreliable_api_call])

if __name__ == "__main__":
    app.run()

Integration Examples

Example 8: FastAPI Integration

from fastapi import FastAPI
from temporal_boost import BoostApp, ASGIWorkerType
from temporalio.client import Client

app = BoostApp(name="fastapi-example")

# Create FastAPI app
fastapi_app = FastAPI(title="Temporal API")

@fastapi_app.get("/health")
async def health():
    return {"status": "healthy"}

@fastapi_app.post("/workflows")
async def start_workflow(workflow_data: dict):
    client = await Client.connect("localhost:7233")
    workflow_id = await client.start_workflow(
        "MyWorkflow",
        workflow_data,
        id=f"workflow-{workflow_data['id']}",
        task_queue="workflow_queue",
    )
    return {"workflow_id": workflow_id}

# Add ASGI worker
app.add_asgi_worker(
    "api_worker",
    fastapi_app,
    "0.0.0.0",
    8000,
    asgi_worker_type=ASGIWorkerType.auto,
)

if __name__ == "__main__":
    app.run()

Example 9: FastStream Integration

Basic FastStream integration:

from faststream import FastStream
from faststream.redis import RedisBroker
from pydantic import BaseModel
from temporal_boost import BoostApp

app = BoostApp(name="faststream-example")

class TaskMessage(BaseModel):
    task_id: str
    description: str
    priority: int

broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)

@broker.subscriber("tasks")
async def process_task(message: TaskMessage):
    """Process task from message queue."""
    logger.info(f"Processing task: {message.task_id}")

app.add_faststream_worker("message_processor", faststream_app)

if __name__ == "__main__":
    app.run()

Example 10: FastStream with Temporal Workflows

Integrate FastStream consumers with Temporal workflows:

from faststream import FastStream
from faststream.redis import RedisBroker
from temporalio.client import Client
from temporalio import workflow
from temporal_boost import BoostApp
from datetime import timedelta
from pydantic import BaseModel

app = BoostApp(name="faststream-temporal")

class OrderMessage(BaseModel):
    order_id: str
    customer_id: str
    items: list[dict]
    total: float

@workflow.defn(sandboxed=False, name="OrderWorkflow")
class OrderWorkflow:
    @workflow.run
    async def run(self, order_data: dict) -> dict:
        # Process order...
        return {"status": "completed", "order_id": order_data["order_id"]}

broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)

@broker.subscriber("orders")
async def handle_order(message: OrderMessage):
    """Handle order and start Temporal workflow."""
    client = await Client.connect("localhost:7233")

    workflow_id = await client.start_workflow(
        "OrderWorkflow",
        message.dict(),
        id=f"order-{message.order_id}",
        task_queue="order_queue",
    )

    logger.info(f"Started workflow {workflow_id}")

app.add_worker("order_worker", "order_queue", workflows=[OrderWorkflow])
app.add_faststream_worker("message_processor", faststream_app)

if __name__ == "__main__":
    app.run()

Example 11: Multiple FastStream Subscribers

Handle multiple message queues:

from faststream import FastStream
from faststream.redis import RedisBroker
from pydantic import BaseModel
from temporal_boost import BoostApp

app = BoostApp(name="faststream-multi")

class EmailMessage(BaseModel):
    to: str
    subject: str
    body: str

class NotificationMessage(BaseModel):
    notification_id: str
    user_id: str
    content: dict

broker = RedisBroker("redis://localhost:6379")
faststream_app = FastStream(broker)

@broker.subscriber("emails")
async def handle_email(message: EmailMessage):
    """Handle email messages."""
    logger.info(f"Processing email to {message.to}")

@broker.subscriber("notifications")
async def handle_notification(message: NotificationMessage):
    """Handle notification messages."""
    logger.info(f"Processing notification {message.notification_id}")

app.add_faststream_worker("message_processor", faststream_app)

if __name__ == "__main__":
    app.run()

Real-World Scenarios

Example 12: E-commerce Order Processing

from datetime import timedelta
from pydantic import BaseModel
from temporalio import activity, workflow
from temporal_boost import BoostApp

app = BoostApp(name="ecommerce-example", use_pydantic=True)

class Order(BaseModel):
    order_id: str
    customer_id: str
    items: list[dict]
    total: float

class PaymentResult(BaseModel):
    transaction_id: str
    status: str

@activity.defn(name="validate_inventory")
async def validate_inventory(order: Order) -> dict:
    # Check inventory
    return {"valid": True, "items_available": True}

@activity.defn(name="process_payment")
async def process_payment(order: Order) -> PaymentResult:
    # Process payment
    return PaymentResult(
        transaction_id="tx_123",
        status="completed",
    )

@activity.defn(name="fulfill_order")
async def fulfill_order(order: Order) -> dict:
    # Fulfill order
    return {"fulfilled": True, "shipping_id": "ship_123"}

@activity.defn(name="send_notification")
async def send_notification(order_id: str, status: str) -> dict:
    # Send email notification
    return {"sent": True}

@workflow.defn(sandboxed=False, name="OrderProcessingWorkflow")
class OrderProcessingWorkflow:
    @workflow.run
    async def run(self, order: Order) -> dict:
        # Validate inventory
        validation = await workflow.execute_activity(
            validate_inventory,
            order,
            task_queue="inventory_queue",
            start_to_close_timeout=timedelta(minutes=5),
        )

        if not validation["valid"]:
            await workflow.execute_activity(
                send_notification,
                order.order_id,
                "failed",
                task_queue="notification_queue",
                start_to_close_timeout=timedelta(minutes=2),
            )
            return {"status": "failed", "reason": "inventory"}

        # Process payment
        payment = await workflow.execute_activity(
            process_payment,
            order,
            task_queue="payment_queue",
            start_to_close_timeout=timedelta(minutes=10),
        )

        if payment.status != "completed":
            return {"status": "failed", "reason": "payment"}

        # Fulfill order
        fulfillment = await workflow.execute_activity(
            fulfill_order,
            order,
            task_queue="fulfillment_queue",
            start_to_close_timeout=timedelta(minutes=30),
        )

        # Send confirmation
        await workflow.execute_activity(
            send_notification,
            order.order_id,
            "completed",
            task_queue="notification_queue",
            start_to_close_timeout=timedelta(minutes=2),
        )

        return {
            "status": "completed",
            "order_id": order.order_id,
            "payment": payment.dict(),
            "fulfillment": fulfillment,
        }

app.add_worker("inventory_worker", "inventory_queue", activities=[validate_inventory])
app.add_worker("payment_worker", "payment_queue", activities=[process_payment])
app.add_worker("fulfillment_worker", "fulfillment_queue", activities=[fulfill_order])
app.add_worker("notification_worker", "notification_queue", activities=[send_notification])
app.add_worker("order_workflow_worker", "workflow_queue", workflows=[OrderProcessingWorkflow])

if __name__ == "__main__":
    app.run()

Example 11: Data Processing Pipeline

from datetime import timedelta
from temporalio import activity, workflow
from temporal_boost import BoostApp

app = BoostApp(name="data-processing-example")

@activity.defn(name="extract_data")
async def extract_data(source: str) -> dict:
    # Extract data from source
    return {"data": [1, 2, 3], "source": source}

@activity.defn(name="transform_data")
async def transform_data(data: dict) -> dict:
    # Transform data
    return {"data": [x * 2 for x in data["data"]], "transformed": True}

@activity.defn(name="load_data")
async def load_data(data: dict, destination: str) -> dict:
    # Load data to destination
    return {"loaded": True, "destination": destination, "records": len(data["data"])}

@workflow.defn(sandboxed=False, name="ETLWorkflow")
class ETLWorkflow:
    @workflow.run
    async def run(self, source: str, destination: str) -> dict:
        # Extract
        extracted = await workflow.execute_activity(
            extract_data,
            source,
            task_queue="etl_queue",
            start_to_close_timeout=timedelta(minutes=10),
        )

        # Transform
        transformed = await workflow.execute_activity(
            transform_data,
            extracted,
            task_queue="etl_queue",
            start_to_close_timeout=timedelta(minutes=15),
        )

        # Load
        loaded = await workflow.execute_activity(
            load_data,
            transformed,
            destination,
            task_queue="etl_queue",
            start_to_close_timeout=timedelta(minutes=20),
        )

        return {
            "status": "completed",
            "source": source,
            "destination": destination,
            "records_processed": loaded["records"],
        }

app.add_worker(
    "etl_worker",
    "etl_queue",
    activities=[extract_data, transform_data, load_data],
    workflows=[ETLWorkflow],
)

if __name__ == "__main__":
    app.run()

These examples demonstrate common patterns and use cases. For more advanced patterns, see Advanced Usage.