Skip to content

Advanced Usage

This guide covers advanced patterns, customization options, and techniques for power users of Temporal-boost.

Table of Contents

Custom Runtime Configuration

Configure Temporal runtime with custom telemetry and metrics:

from temporal_boost import BoostApp
from temporalio.runtime import LoggingConfig, PrometheusConfig, Runtime

app = BoostApp("advanced-app")

worker = app.add_worker("custom_worker", "custom_queue", activities=[...])

# Configure custom runtime
worker.configure_temporal_runtime(
    prometheus_bind_address="0.0.0.0:9090",
    prometheus_counters_total_suffix=True,
    prometheus_unit_suffix=True,
    prometheus_durations_as_seconds=True,
    global_tags={"environment": "production", "service": "my-service"},
    attach_service_name=True,
    metric_prefix="temporal_boost",
)

Custom Logging Configuration

import logging
import logging.config
from temporal_boost import BoostApp

LOGGING_CONFIG = {
    "version": 1,
    "disable_existing_loggers": False,
    "formatters": {
        "detailed": {
            "format": "%(asctime)s [%(levelname)s] %(name)s: %(message)s",
            "datefmt": "%Y-%m-%d %H:%M:%S",
        },
        "json": {
            "format": "%(asctime)s %(name)s %(levelname)s %(message)s",
            "class": "pythonjsonlogger.jsonlogger.JsonFormatter",
        },
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "formatter": "detailed",
            "level": "INFO",
        },
        "file": {
            "class": "logging.handlers.RotatingFileHandler",
            "filename": "temporal.log",
            "maxBytes": 10485760,
            "backupCount": 5,
            "formatter": "json",
            "level": "DEBUG",
        },
    },
    "root": {
        "level": "INFO",
        "handlers": ["console", "file"],
    },
}

app = BoostApp(logger_config=LOGGING_CONFIG)

Worker Customization

Per-Worker Configuration

Customize individual workers with specific settings:

worker = app.add_worker(
    "high_throughput_worker",
    "high_throughput_queue",
    activities=[...],
    max_concurrent_activities=1000,
    max_concurrent_workflow_tasks=500,
)

# Configure client
worker.configure_temporal_client(
    target_host="temporal.example.com:7233",
    namespace="production",
    use_pydantic_data_converter=True,
)

# Configure runtime
worker.configure_temporal_runtime(
    prometheus_bind_address="0.0.0.0:9091",
)

Worker Builder Pattern

Use builders directly for maximum control:

from temporal_boost.temporal.client import TemporalClientBuilder
from temporal_boost.temporal.runtime import TemporalRuntimeBuilder
from temporal_boost.temporal.worker import TemporalWorkerBuilder
from temporal_boost.workers.temporal import TemporalBoostWorker

# Build custom client
client_builder = TemporalClientBuilder(
    target_host="custom-host:7233",
    namespace="custom-namespace",
    use_pydantic_data_converter=True,
)

# Build custom runtime
runtime_builder = TemporalRuntimeBuilder(
    prometheus_bind_address="0.0.0.0:9090",
    global_tags={"custom": "tag"},
)

# Build custom worker
worker_builder = TemporalWorkerBuilder(
    task_queue="custom_queue",
    max_concurrent_activities=200,
    max_concurrent_workflow_tasks=100,
)

# Create worker
client = await client_builder.build()
runtime = runtime_builder.build()
worker_builder.set_client(client)
worker = worker_builder.build()

Interceptors

Interceptors allow you to add cross-cutting concerns like logging, metrics, or authentication.

Creating an Interceptor

from temporalio.worker import ExecuteActivityInput, ExecuteWorkflowInput
from temporalio.worker.interceptor import (
    ActivityInboundInterceptor,
    ActivityInterceptor,
    WorkflowInboundInterceptor,
    WorkflowInterceptor,
)

class LoggingActivityInterceptor(ActivityInterceptor):
    def intercept_activity(
        self, next: ActivityInboundInterceptor
    ) -> ActivityInboundInterceptor:
        return LoggingActivityInboundInterceptor(next)

class LoggingActivityInboundInterceptor(ActivityInboundInterceptor):
    def __init__(self, next_inbound: ActivityInboundInterceptor):
        self._next = next_inbound

    async def execute_activity(self, input: ExecuteActivityInput) -> Any:
        logger.info(f"Executing activity: {input.func}")
        try:
            result = await self._next.execute_activity(input)
            logger.info(f"Activity completed: {input.func}")
            return result
        except Exception as e:
            logger.error(f"Activity failed: {input.func}, error: {e}")
            raise

class LoggingWorkflowInterceptor(WorkflowInterceptor):
    def intercept_workflow(
        self, next: WorkflowInboundInterceptor
    ) -> WorkflowInboundInterceptor:
        return LoggingWorkflowInboundInterceptor(next)

class LoggingWorkflowInboundInterceptor(WorkflowInboundInterceptor):
    def __init__(self, next_inbound: WorkflowInboundInterceptor):
        self._next = next_inbound

    def execute_workflow(self, input: ExecuteWorkflowInput) -> Any:
        logger.info(f"Executing workflow: {input.workflow_class}")
        return self._next.execute_workflow(input)

Using Interceptors

from temporalio.worker._interceptor import Interceptor

app = BoostApp("interceptor-example")

interceptor = Interceptor(
    activity_interceptor=LoggingActivityInterceptor(),
    workflow_interceptor=LoggingWorkflowInterceptor(),
)

worker = app.add_worker(
    "logged_worker",
    "logged_queue",
    activities=[my_activity],
    workflows=[my_workflow],
    interceptors=[interceptor],
)

Custom Logging

Structured Logging with Context

import logging
import logging.config
from contextvars import ContextVar

request_id_var: ContextVar[str | None] = ContextVar("request_id", default=None)

class ContextualFormatter(logging.Formatter):
    def format(self, record):
        record.request_id = request_id_var.get()
        return super().format(record)

logging_config = {
    "version": 1,
    "formatters": {
        "contextual": {
            "()": ContextualFormatter,
            "format": "%(asctime)s [%(levelname)s] [%(request_id)s] %(message)s",
        },
    },
    "handlers": {
        "console": {
            "class": "logging.StreamHandler",
            "formatter": "contextual",
        },
    },
    "root": {
        "level": "INFO",
        "handlers": ["console"],
    },
}

app = BoostApp(logger_config=logging_config)

Activity Logging Decorator

from functools import wraps
import logging

logger = logging.getLogger(__name__)

def log_activity(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        logger.info(f"Starting activity: {func.__name__}")
        try:
            result = await func(*args, **kwargs)
            logger.info(f"Completed activity: {func.__name__}")
            return result
        except Exception as e:
            logger.error(f"Failed activity: {func.__name__}, error: {e}")
            raise
    return wrapper

@activity.defn(name="logged_activity")
@log_activity
async def my_activity(data: str) -> str:
    return f"Processed: {data}"

Multiple Clients

Multiple Temporal Clusters

app = BoostApp("multi-cluster")

# Worker 1: Production cluster
worker1 = app.add_worker("prod_worker", "prod_queue", activities=[...])
worker1.configure_temporal_client(
    target_host="prod.temporal.example.com:7233",
    namespace="production",
)

# Worker 2: Staging cluster
worker2 = app.add_worker("staging_worker", "staging_queue", activities=[...])
worker2.configure_temporal_client(
    target_host="staging.temporal.example.com:7233",
    namespace="staging",
)

Worker Lifecycle

Custom Worker Shutdown

import signal
import sys

app = BoostApp("lifecycle-example")

def signal_handler(sig, frame):
    logger.info("Received shutdown signal")
    # Custom cleanup logic here
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

worker = app.add_worker("lifecycle_worker", "lifecycle_queue", activities=[...])

# Custom shutdown hook
async def custom_shutdown():
    logger.info("Performing custom cleanup")
    # Your cleanup logic

# Note: Temporal-boost handles graceful shutdown automatically

Error Handling Patterns

Activity Retry with Custom Logic

from temporalio import activity
from temporalio.common import RetryPolicy
from datetime import timedelta

@activity.defn(
    name="retryable_activity",
    start_to_close_timeout=timedelta(minutes=5),
    retry_policy=RetryPolicy(
        initial_interval=timedelta(seconds=1),
        backoff_coefficient=2.0,
        maximum_interval=timedelta(seconds=60),
        maximum_attempts=3,
    ),
)
async def retryable_activity(data: str) -> str:
    try:
        # Your logic here
        return process_data(data)
    except TransientError:
        # Will be retried automatically
        raise
    except PermanentError:
        # Will not be retried
        raise

Workflow Error Handling

from temporalio import workflow
from temporalio.exceptions import ActivityError, ApplicationError

@workflow.defn(sandboxed=False, name="ErrorHandlingWorkflow")
class ErrorHandlingWorkflow:
    @workflow.run
    async def run(self, data: str) -> dict:
        try:
            result = await workflow.execute_activity(
                risky_activity,
                data,
                task_queue="error_queue",
                start_to_close_timeout=timedelta(minutes=5),
            )
            return {"status": "success", "result": result}
        except ActivityError as e:
            # Activity failed
            workflow.logger.error(f"Activity failed: {e}")
            return {"status": "failed", "error": str(e)}
        except ApplicationError as e:
            # Application-specific error
            workflow.logger.error(f"Application error: {e}")
            raise

Performance Optimization

Tuning Concurrency

# High-throughput worker
high_throughput_worker = app.add_worker(
    "high_throughput",
    "high_throughput_queue",
    activities=[...],
    max_concurrent_activities=1000,
    max_concurrent_workflow_tasks=500,
    max_concurrent_activity_task_polls=50,
    max_concurrent_workflow_task_polls=50,
)

# Low-latency worker
low_latency_worker = app.add_worker(
    "low_latency",
    "low_latency_queue",
    activities=[...],
    max_concurrent_activities=100,
    max_concurrent_workflow_tasks=50,
    nonsticky_to_sticky_poll_ratio=0.1,  # Prefer sticky workflows
)

Sticky Workflows

Sticky workflows keep workflow state in memory, improving performance:

# High sticky ratio for better performance
worker.configure_temporal_runtime(
    # Worker polls are configured at worker level
)

# In worker configuration
worker = app.add_worker(
    "sticky_worker",
    "sticky_queue",
    workflows=[MyWorkflow],
    nonsticky_to_sticky_poll_ratio=0.1,  # 10% non-sticky, 90% sticky
)

Connection Pooling

Temporal SDK handles connection pooling automatically, but you can optimize:

# Multiple workers share the same runtime (default)
# For better resource usage, configure shared runtime

runtime_builder = TemporalRuntimeBuilder(
    prometheus_bind_address="0.0.0.0:9090",
)

runtime = runtime_builder.build()

# Use same runtime for multiple workers
worker1 = app.add_worker("worker1", "queue1", activities=[...])
worker1.configure_temporal_client(...)
# Runtime is shared automatically

Activity Result Caching

For expensive activities that can be cached:

from functools import lru_cache
from temporalio import activity

# Cache at activity level (use with caution)
@activity.defn(name="cached_activity")
async def cached_activity(key: str) -> str:
    return expensive_operation(key)

# Or implement caching in workflow
@workflow.defn(sandboxed=False, name="CachedWorkflow")
class CachedWorkflow:
    def __init__(self):
        self._cache: dict[str, str] = {}

    @workflow.run
    async def run(self, key: str) -> str:
        if key in self._cache:
            return self._cache[key]

        result = await workflow.execute_activity(
            cached_activity,
            key,
            task_queue="cache_queue",
            start_to_close_timeout=timedelta(minutes=5),
        )
        self._cache[key] = result
        return result

Advanced Patterns

Workflow Versioning

@workflow.defn(sandboxed=False, name="VersionedWorkflow")
class VersionedWorkflow:
    @workflow.run
    async def run(self, data: dict) -> dict:
        version = data.get("version", 1)

        if version == 1:
            return await self._run_v1(data)
        elif version == 2:
            return await self._run_v2(data)
        else:
            raise ValueError(f"Unsupported version: {version}")

    async def _run_v1(self, data: dict) -> dict:
        # V1 logic
        pass

    async def _run_v2(self, data: dict) -> dict:
        # V2 logic
        pass

Custom Data Converter

from temporalio.converter import DataConverter, PayloadCodec
from temporal_boost.temporal.client import TemporalClientBuilder

class CustomPayloadCodec(PayloadCodec):
    def encode(self, payloads: list[Payload]) -> list[Payload]:
        # Custom encoding logic
        return payloads

    def decode(self, payloads: list[Payload]) -> list[Payload]:
        # Custom decoding logic
        return payloads

custom_converter = DataConverter(
    payload_converter=DefaultPayloadConverter(),
    payload_codec=CustomPayloadCodec(),
)

client_builder = TemporalClientBuilder()
client_builder.set_kwargs(data_converter=custom_converter)

These advanced patterns provide powerful customization options for complex use cases. For more examples, see Examples.