dioxide.celery¶
Celery integration for dioxide dependency injection.
This module provides seamless integration between dioxide’s dependency injection container and Celery background tasks. It enables:
Single function setup:
configure_dioxide(app, profile=...)Task scoping: Automatic
ScopedContainerper task execution viascoped_taskLifecycle management: Container start/stop tied to Celery app configuration
- Quick Start:
Set up dioxide in your Celery app:
from celery import Celery from dioxide import Profile from dioxide.celery import configure_dioxide, scoped_task app = Celery('tasks') configure_dioxide(app, profile=Profile.PRODUCTION) @scoped_task(app) def process_order(scope, order_id: str) -> dict: service = scope.resolve(OrderService) return service.process(order_id) # Execute task result = process_order.delay('order-123')
- Task Scoping:
The integration creates a
ScopedContainerfor each task execution. This enables REQUEST-scoped components to be fresh for each task:from dioxide import service, Scope @service(scope=Scope.REQUEST) class TaskContext: def __init__(self): import uuid self.task_id = str(uuid.uuid4()) # In task handlers: @scoped_task(app) def my_task(scope) -> str: ctx = scope.resolve(TaskContext) # ctx.task_id is unique per task execution return ctx.task_id
- Lifecycle Management:
The integration handles container lifecycle automatically:
from dioxide import adapter, lifecycle, Profile @adapter.for_(DatabasePort, profile=Profile.PRODUCTION) @lifecycle class PostgresAdapter: async def initialize(self) -> None: self.engine = create_engine(...) print('Database connected') async def dispose(self) -> None: await self.engine.dispose() print('Database disconnected') # When configure_dioxide is called: container.scan() and start() # When task ends: scope.dispose() for REQUEST-scoped components
- Thread Safety:
Celery uses processes/threads for task execution. The integration creates a fresh scope per task execution, ensuring each task gets its own isolated REQUEST-scoped dependencies. SINGLETON-scoped components are shared across tasks within the same worker process.
See also
configure_dioxide()- The main setup functionscoped_task()- Decorator for task scopingdioxide.container.Container- The DI containerdioxide.container.ScopedContainer- Task-scoped container
Functions¶
|
Configure dioxide dependency injection for a Celery application. |
|
Decorator to create a Celery task with dioxide scoping. |
Module Contents¶
- dioxide.celery.configure_dioxide(app, profile=None, container=None, packages=None)[source]¶
Configure dioxide dependency injection for a Celery application.
This function sets up the integration between dioxide and Celery:
Scans for components in specified packages (or all registered)
Starts the container (initializing @lifecycle components)
Stores the container reference on the Celery app
- Parameters:
app (celery.Celery) – The Celery application instance
profile (dioxide.profile_enum.Profile | str | None) – Profile to scan with (e.g.,
Profile.PRODUCTION). Accepts either a Profile enum value or a string profile name.container (dioxide.container.Container | None) – Optional Container instance. If not provided, uses the global
dioxide.containersingleton.packages (list[str] | None) – Optional list of packages to scan for components. If not provided, scans all registered components.
- Raises:
ImportError – If Celery is not installed.
- Return type:
None
Example
Basic setup:
from celery import Celery from dioxide import Profile from dioxide.celery import configure_dioxide app = Celery('tasks') configure_dioxide(app, profile=Profile.PRODUCTION)
With custom container:
from dioxide import Container, Profile from dioxide.celery import configure_dioxide my_container = Container() app = Celery('tasks') configure_dioxide(app, profile=Profile.TEST, container=my_container)
With package scanning:
configure_dioxide( app, profile=Profile.PRODUCTION, packages=['myapp.services', 'myapp.adapters'], )
See also
scoped_task()- How to create scoped tasksdioxide.container.ScopedContainer- How scoping works
- dioxide.celery.scoped_task(app, **task_options)[source]¶
Decorator to create a Celery task with dioxide scoping.
Creates a Celery task that automatically receives a ScopedContainer as its first argument. The scope is created before task execution and disposed after completion (including on errors).
- Parameters:
app (celery.Celery) – The Celery application instance (must be configured with configure_dioxide)
**task_options (Any) – Additional options to pass to Celery’s @app.task decorator (e.g., name, bind, queue, etc.)
- Returns:
A decorator function that wraps the task with scoping.
- Return type:
collections.abc.Callable[[collections.abc.Callable[Ellipsis, T]], Any]
Example
Basic usage:
from dioxide.celery import configure_dioxide, scoped_task app = Celery('tasks') configure_dioxide(app, profile=Profile.PRODUCTION) @scoped_task(app) def process_order(scope, order_id: str) -> dict: service = scope.resolve(OrderService) return service.process(order_id) # Execute task result = process_order.delay('order-123')
With Celery task options:
@scoped_task(app, name='custom.task.name', queue='high-priority') def important_task(scope) -> None: pass
Async task:
@scoped_task(app) async def async_task(scope) -> str: ctx = scope.resolve(TaskContext) await asyncio.sleep(0.1) return ctx.task_id
Note
The scope is always injected as the FIRST argument
REQUEST-scoped components are fresh per task execution
SINGLETON-scoped components are shared across tasks in the same worker
@lifecycle components with REQUEST scope are disposed after task completion
See also
configure_dioxide()- Must be called firstdioxide.container.ScopedContainer- How scoping works