Creating application
Base code example
This is the base code snippet to start working with the framework. Create a BoostApp
object, set configuration via environment variables, and run it.
import logging
from datetime import timedelta
from temporalio import activity, workflow
from temporal_boost import BoostApp
logging.basicConfig(level=logging.INFO)
app = BoostApp(
name="BoostApp example",
temporal_endpoint="localhost:7233",
temporal_namespace="default",
use_pydantic=True,
)
@activity.defn(name="my_activity")
async def my_activity(name: str) -> str:
return f"Hello, {name}!"
@workflow.defn(sandboxed=False, name="MyWorkflow")
class MyWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
my_activity,
name,
task_queue="my_queue_1",
start_to_close_timeout=timedelta(minutes=1),
)
app.add_worker(
"worker_1",
"my_queue_1",
activities=[my_activity],
)
app.add_worker(
"worker_2",
"my_queue_2",
workflows=[MyWorkflow],
)
if __name__ == "__main__":
app.run()
Configuration via environment variables
All configuration is now handled via environment variables. You can set the following variables (see temporal_boost/temporal/config.py
for the full list):
TEMPORAL_TARGET_HOST
(default:localhost:7233
)TEMPORAL_NAMESPACE
(default:default
)TEMPORAL_TLS
(default:false
)TEMPORAL_API_KEY
(optional)TEMPORAL_IDENTITY
(optional)TEMPORAL_USE_PYDANTIC_DATA_CONVERTER
(default:false
)- Worker concurrency, Prometheus metrics, and more (see config.py)
Example:
export TEMPORAL_TARGET_HOST=temporal.example.com:7233
export TEMPORAL_NAMESPACE=production
export TEMPORAL_USE_PYDANTIC_DATA_CONVERTER=true
Adding Temporal workers
To add a worker to the app, use the add_worker
method:
def add_worker(
self,
worker_name: str,
task_queue: str,
workflows: list = [],
activities: list = [],
cron_schedule: str | None = None,
cron_runner: typing.Coroutine | None = None,
) -> None:
worker_name
: Unique name for the worker (do not use reserved names likeall
orinternal
).task_queue
: Task queue for activities and workflows.workflows
: List of workflow classes.activities
: List of activity functions.cron_schedule
: (Optional) CRON string for scheduled workflows.cron_runner
: (Optional) Workflow run method for CRON workers.
Examples
app.add_worker(
"worker_1",
"my_queue_1",
activities=[my_activity],
)
app.add_worker(
"worker_2",
"my_queue_2",
workflows=[MyWorkflow],
)
app.add_worker(
"worker_3",
"my_queue_3",
workflows=[MyWorkflow2],
activities=[my_activity2],
)
Adding CRON workers
To execute a workflow on a schedule, create a CRON worker:
app.add_worker(
"worker_4",
"task_q_4",
workflows=[MyWorkflow],
cron_runner=MyWorkflow.run,
cron_schedule="* * * * *"
)
cron_runner
is a coroutine (usually the workflow'srun
method) that will be started according to thecron_schedule
.
Adding ASGI workers
To add a FastAPI (or any ASGI) application as a worker:
from fastapi import FastAPI
fastapi_app = FastAPI(docs_url="/doc")
app.add_asgi_worker("asgi_worker", fastapi_app, "0.0.0.0", 8000)
You can specify the ASGI worker type ("uvicorn", "hypercorn", "granian") or use auto-detection:
The application will be run with the selected ASGI server in the appropriate async runtime.