Scoping Guide¶
This guide explains dioxide’s scoping system, a powerful primitive for isolating dependencies within bounded contexts. While commonly used for web request handling, scoping is a universal concept applicable to any bounded execution context.
What is Scoping?¶
Scoping creates an isolated dependency context where:
Scoped dependencies get fresh instances per scope
Singleton dependencies remain shared across all scopes
Lifecycle boundaries are respected (resources created in scope are disposed with scope)
Think of a scope as a “mini-container” that inherits from the parent container but has its own instances for scoped components.
Key insight: Singletons are shared; scoped instances are isolated per scope.
Why You Need Scoping¶
The Solution: Scoped Isolation¶
With scoping, each request gets its own context:
from dioxide import adapter, Profile, Scope
class RequestContextPort(Protocol):
user_id: str | None
request_id: str
start_time: datetime
@adapter.for_(RequestContextPort, profile=Profile.PRODUCTION, scope=Scope.REQUEST)
class RequestContext:
def __init__(self):
self.user_id = None
self.request_id = str(uuid.uuid4())
self.start_time = datetime.now(UTC)
# Now each scope gets a fresh RequestContext!
The Scoping Primitive¶
Dioxide provides a simple, universal scoping primitive:
from dioxide import Container, Profile
# Create and configure container
container = Container()
container.scan(profile=Profile.PRODUCTION)
# Create a scope for bounded work
async with container.create_scope() as scope:
# Resolve dependencies within this scope
context = scope.resolve(RequestContextPort)
service = scope.resolve(UserService)
# Use them...
await service.process_request(context)
# Scope ends - scoped instances disposed
How create_scope() Works¶
Key behaviors:
create_scope()returns aScopedContainerScoped components get fresh instances within the scope
Singletons resolve to the same instance as the parent container
When the scope exits, all scoped components are disposed (in reverse creation order)
When to Use REQUEST Scope¶
Use Scope.REQUEST For¶
Adapters that need per-request isolation:
# Database connections scoped to request
@adapter.for_(DatabaseSession, profile=Profile.PRODUCTION, scope=Scope.REQUEST)
@lifecycle
class PostgresSession:
async def initialize(self):
self.conn = await pool.acquire()
async def dispose(self):
await self.conn.release()
# Request-specific audit logging
@adapter.for_(AuditLogger, profile=Profile.PRODUCTION, scope=Scope.REQUEST)
class RequestAuditLogger:
def __init__(self, context: RequestContextPort):
self.request_id = context.request_id
def log(self, event: str):
logger.info(f"[{self.request_id}] {event}")
# User context per request
@adapter.for_(RequestContextPort, profile=Profile.PRODUCTION, scope=Scope.REQUEST)
class HttpRequestContext:
def __init__(self):
self.user_id = None
self.request_id = str(uuid.uuid4())
Do NOT Use Scope.REQUEST For¶
Services (core business logic):
# WRONG: Services should be singleton!
@service(scope=Scope.REQUEST) # Never do this!
class UserService:
pass
# RIGHT: Services are always singleton
@service
class UserService:
def __init__(self, db: DatabaseSession, audit: AuditLogger):
self.db = db
self.audit = audit
Why? Services contain business logic that doesn’t change per request. They should be singletons that receive request-scoped adapters via dependency injection.
Rule of thumb:
Adapters: May use
Scope.REQUESTwhen they hold request-specific stateServices: Always
Scope.SINGLETON(the default)
Non-Web Use Cases¶
Scoping is not just for web requests. The same primitive works for any bounded execution context.
Celery/Background Tasks¶
Each task gets isolated dependencies:
from celery import Celery
from dioxide import Container, Profile
app = Celery('tasks')
container = Container()
container.scan(profile=Profile.PRODUCTION)
@app.task
async def process_order(order_id: int):
"""Each task invocation gets its own scope."""
async with container.create_scope() as scope:
# Fresh instances for this task
context = scope.resolve(TaskContextPort)
context.task_id = process_order.request.id
order_service = scope.resolve(OrderService)
await order_service.process(order_id)
# Task-scoped instances disposed
CLI Applications¶
Each command invocation gets its own scope:
import click
from dioxide import Container, Profile
container = Container()
container.scan(profile=Profile.PRODUCTION)
@click.group()
def cli():
pass
@cli.command()
@click.argument('user_id')
async def process_user(user_id: str):
"""Each CLI command gets its own scope."""
async with container.create_scope() as scope:
context = scope.resolve(CommandContextPort)
context.command = "process_user"
context.args = {"user_id": user_id}
service = scope.resolve(UserService)
await service.process(user_id)
Batch Processing / Data Pipelines¶
Each batch gets isolated dependencies:
async def process_batch(items: list[Item]):
"""Each batch gets its own scope."""
async with container.create_scope() as scope:
# Fresh database connection for this batch
db = scope.resolve(DatabaseSession)
# Batch-specific metrics
metrics = scope.resolve(BatchMetricsPort)
metrics.batch_size = len(items)
processor = scope.resolve(BatchProcessor)
await processor.process_all(items)
# Connection released, metrics flushed
Test Isolation¶
Each test gets a clean scope:
import pytest
from dioxide import Container, Profile
@pytest.fixture
async def scope():
"""Fresh scope for each test - complete isolation."""
container = Container()
container.scan(profile=Profile.TEST)
async with container.create_scope() as test_scope:
yield test_scope
# All test-scoped instances cleaned up
async def test_user_registration(scope):
"""Test runs in isolated scope."""
# Fresh instances for this test only
users = scope.resolve(UserRepository)
email = scope.resolve(EmailPort)
service = scope.resolve(UserService)
await service.register("alice@example.com")
assert len(email.sent_emails) == 1
# Next test gets fresh empty fakes!
Scheduled Jobs¶
Each job run gets its own scope:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job('cron', hour=2)
async def nightly_cleanup():
"""Each scheduled run gets its own scope."""
async with container.create_scope() as scope:
context = scope.resolve(JobContextPort)
context.job_name = "nightly_cleanup"
context.run_id = str(uuid.uuid4())
cleanup = scope.resolve(CleanupService)
await cleanup.run()
Architecture: Scopes and the Dependency Graph¶
Understanding how scopes interact with the dependency graph is crucial.
Scope Inheritance¶
Scopes inherit from their parent container:
Resolution rules:
When resolving in a scope, check if component is
Scope.REQUESTIf REQUEST: create/return scope-local instance
If SINGLETON: delegate to parent container
Captive Dependencies (Anti-Pattern)¶
DANGER: A singleton must NEVER depend on a scoped component directly.
# WRONG: Captive dependency!
@service # Singleton
class UserService:
def __init__(self, session: DatabaseSession): # REQUEST-scoped!
self.session = session # Captured forever!
# This singleton captures the first scope's session and uses it forever,
# even after that scope is disposed. This causes:
# - Stale connections
# - Connection pool exhaustion
# - Data corruption between requests
Dioxide detects captive dependencies and raises CaptiveDependencyError at scan time:
CaptiveDependencyError: Singleton 'UserService' cannot depend on request-scoped
'DatabaseSession'. Singletons outlive request scopes, which would capture a
stale instance.
Solutions:
1. Make UserService request-scoped (if it needs per-request state)
2. Inject a factory: Callable[[], DatabaseSession] instead
3. Inject the port and resolve within methods
Correct Pattern: Inject Ports, Resolve in Methods¶
@service
class UserService:
def __init__(self, container: Container):
self._container = container
async def get_user(self, user_id: int) -> User:
# Resolve scoped dependency when needed
async with self._container.create_scope() as scope:
session = scope.resolve(DatabaseSession)
return await session.query(User).get(user_id)
Or, use the injected scope in web frameworks:
# FastAPI example - scope created per request by middleware
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
scope: ScopedContainer = Depends(get_request_scope)
):
session = scope.resolve(DatabaseSession)
service = scope.resolve(UserService)
return await service.get_user(user_id)
Lifecycle Management in Scopes¶
Scoped components can use @lifecycle for initialization and cleanup:
@adapter.for_(DatabaseSession, profile=Profile.PRODUCTION, scope=Scope.REQUEST)
@lifecycle
class PostgresSession:
def __init__(self, config: DatabaseConfig):
self.config = config
self.conn = None
async def initialize(self):
"""Called when scope creates this instance."""
self.conn = await asyncpg.connect(self.config.database_url)
async def dispose(self):
"""Called when scope ends."""
if self.conn:
await self.conn.close()
async def query(self, sql: str) -> list:
return await self.conn.fetch(sql)
Lifecycle Order in Scopes¶
Guarantees:
Initialize in dependency order (dependencies before dependents)
Dispose in reverse order (dependents before dependencies)
Disposal happens even if exceptions occur
Error Messages and How to Fix Them¶
ScopeError: Cannot resolve request-scoped outside scope¶
ScopeError: Cannot resolve 'DatabaseSession' (Scope.REQUEST) outside of a scope.
Request-scoped components can only be resolved within an active scope.
Fix: Use container.create_scope() to create a scope first:
async with container.create_scope() as scope:
session = scope.resolve(DatabaseSession)
Cause: Trying to resolve a REQUEST-scoped component directly from the container.
Fix: Create a scope first using container.create_scope().
CaptiveDependencyError: Singleton depends on scoped¶
CaptiveDependencyError: Singleton 'UserService' cannot depend on request-scoped
'DatabaseSession'. Singletons outlive request scopes, which would capture a
stale instance.
Cause: A singleton service has a REQUEST-scoped dependency in its constructor.
Fix options:
Make the service REQUEST-scoped (if appropriate)
Don’t inject the scoped dependency directly; resolve it within methods
Inject a factory function instead
CircularDependencyError in scopes¶
Circular dependencies are detected at scan time, before any scopes are created:
CircularDependencyError: Circular dependency detected:
UserService -> AuditLogger -> RequestContext -> UserService
Fix: Break the cycle by introducing an interface or restructuring dependencies.
Testing Request-Scoped Components¶
Pattern 1: Test-Scoped Fixtures¶
@pytest.fixture
async def scope():
"""Each test gets a fresh scope."""
container = Container()
container.scan(profile=Profile.TEST)
async with container.create_scope() as test_scope:
yield test_scope
async def test_database_session(scope):
session = scope.resolve(DatabaseSession)
# Session is scoped to this test
result = await session.query("SELECT 1")
assert result is not None
Pattern 2: Nested Scopes for Request Simulation¶
async def test_multiple_requests(container):
"""Simulate multiple requests, each with isolated scope."""
container.scan(profile=Profile.TEST)
# Simulate Request 1
async with container.create_scope() as scope1:
ctx1 = scope1.resolve(RequestContextPort)
ctx1.user_id = "alice"
request_id_1 = ctx1.request_id
# Simulate Request 2
async with container.create_scope() as scope2:
ctx2 = scope2.resolve(RequestContextPort)
ctx2.user_id = "bob"
request_id_2 = ctx2.request_id
# Verify isolation
assert request_id_1 != request_id_2
Pattern 3: Override Scoped Dependencies¶
@pytest.fixture
def test_context():
"""Pre-configured test context."""
ctx = TestRequestContext()
ctx.user_id = "test-user"
ctx.request_id = "test-request-123"
return ctx
async def test_with_known_context(scope, test_context):
# Override the scoped dependency
scope.register_instance(RequestContextPort, test_context)
service = scope.resolve(UserService)
result = await service.current_user()
assert result.id == "test-user"
FastAPI Integration Example¶
Here’s how scoping integrates with FastAPI:
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, Request
from dioxide import Container, Profile, ScopedContainer
container = Container()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan - initialize container."""
container.scan(profile=Profile.PRODUCTION)
async with container:
yield
app = FastAPI(lifespan=lifespan)
# Middleware to create request scope
@app.middleware("http")
async def scope_middleware(request: Request, call_next):
async with container.create_scope() as scope:
request.state.scope = scope
response = await call_next(request)
return response
# Dependency to get current scope
def get_scope(request: Request) -> ScopedContainer:
return request.state.scope
# Use in routes
@app.get("/users/{user_id}")
async def get_user(
user_id: int,
scope: ScopedContainer = Depends(get_scope)
):
# Resolve request-scoped dependencies
session = scope.resolve(DatabaseSession)
service = scope.resolve(UserService)
return await service.get_user(user_id)
Summary¶
Key concepts:
Scoping creates isolated dependency contexts within bounded execution
create_scope()returns aScopedContainerfor resolving scoped dependenciesScope.REQUEST is for adapters that need per-execution isolation (not services)
Universal primitive - works for web, CLI, background tasks, batch processing, tests
Captive dependencies are detected at scan time (singleton -> scoped = error)
Lifecycle is respected: scoped components with
@lifecycleare initialized/disposed with the scope
When to use Scope.REQUEST:
Component Type |
Scope |
Example |
|---|---|---|
Core services |
SINGLETON |
|
Request context |
REQUEST |
|
Database connections |
REQUEST |
|
Audit/logging |
REQUEST |
|
Test fakes |
Either |
Depends on test needs |
Decision flowchart:
Scoping in dioxide enables clean, isolated execution contexts while maintaining the simplicity of the dependency injection model. Use it whenever you need bounded, isolated dependencies - whether that’s web requests, background tasks, CLI commands, or test cases.