Source code for dioxide.lifecycle

"""Lifecycle management decorator for dioxide components.

The @lifecycle decorator enables opt-in lifecycle management for services and
adapters that need initialization and cleanup. It provides guaranteed startup
and shutdown ordering based on dependency relationships, making it ideal for
managing resources like database connections, caches, message queues, and other
infrastructure components.

In hexagonal architecture, lifecycle management is essential at the seams (adapters)
where your application connects to external systems. The @lifecycle decorator ensures
these connections are established before your application starts processing requests
and gracefully shut down when the application stops.

Key Features:
    - **Dependency-ordered initialization**: Components initialized in dependency order
    - **Reverse-order disposal**: Cleanup happens in reverse dependency order
    - **Async context manager**: Use ``async with container:`` for automatic lifecycle
    - **Type-safe validation**: Validates initialize() and dispose() methods at decoration time
    - **Rollback on failure**: If initialization fails, already-initialized components are cleaned up
    - **Works with @service and @adapter**: Composable with other dioxide decorators
    - **Order-independent**: Decorator order doesn't matter (both orderings work identically)

The lifecycle flow follows this pattern:

1. Container.start() or async with container:
   - Build dependency graph of all @lifecycle components
   - Sort topologically (dependencies before dependents)
   - Call initialize() on each component in order
   - If any initialize() fails, rollback by disposing already-initialized components

2. Application runs normally with all resources ready

3. Container.stop() or async context exit:
   - Call dispose() on all components in reverse order
   - Continue cleanup even if individual dispose() calls fail
   - Log disposal errors but don't raise (best-effort cleanup)

Basic Example:
    Database adapter with lifecycle management::

        from dioxide import adapter, Profile, lifecycle
        from sqlalchemy.ext.asyncio import create_async_engine

        @adapter.for_(DatabasePort, profile=Profile.PRODUCTION)
        @lifecycle
        class PostgresAdapter:
            def __init__(self, config: AppConfig):
                self.config = config
                self.engine = None

            async def initialize(self) -> None:
                \"\"\"Called automatically when container starts.\"\"\"
                self.engine = create_async_engine(self.config.database_url)
                # Test connection
                async with self.engine.connect() as conn:
                    await conn.execute("SELECT 1")
                print(f"Connected to {self.config.database_url}")

            async def dispose(self) -> None:
                \"\"\"Called automatically when container stops.\"\"\"
                if self.engine:
                    await self.engine.dispose()
                    print("Database connection closed")

            async def query(self, sql: str) -> list[dict]:
                async with self.engine.connect() as conn:
                    result = await conn.execute(sql)
                    return result.fetchall()

Advanced Example:
    Multiple lifecycle components with dependencies::

        from dioxide import adapter, service, lifecycle, Profile


        # Cache depends on nothing - initialized first
        @adapter.for_(CachePort, profile=Profile.PRODUCTION)
        @lifecycle
        class RedisCache:
            async def initialize(self) -> None:
                self.redis = await aioredis.create_redis_pool('redis://localhost')
                print('Redis connected')

            async def dispose(self) -> None:
                self.redis.close()
                await self.redis.wait_closed()
                print('Redis disconnected')


        # Database depends on nothing - initialized first (parallel with cache)
        @adapter.for_(DatabasePort, profile=Profile.PRODUCTION)
        @lifecycle
        class PostgresAdapter:
            async def initialize(self) -> None:
                self.engine = create_async_engine('postgresql://...')
                print('Database connected')

            async def dispose(self) -> None:
                await self.engine.dispose()
                print('Database disconnected')


        # Service depends on cache and database - initialized last
        @service
        @lifecycle
        class UserService:
            def __init__(self, cache: CachePort, db: DatabasePort):
                self.cache = cache
                self.db = db

            async def initialize(self) -> None:
                # Warm up cache
                users = await self.db.query('SELECT * FROM users')
                for user in users:
                    await self.cache.set(f'user:{user.id}', user)
                print('UserService cache warmed')

            async def dispose(self) -> None:
                # Flush pending operations
                print('UserService cleanup complete')


        # Initialization order: RedisCache, PostgresAdapter, UserService
        # Disposal order: UserService, PostgresAdapter, RedisCache

Container Usage:
    Manual lifecycle control::

        from dioxide import Container, Profile

        container = Container()
        container.scan(profile=Profile.PRODUCTION)

        # Start all @lifecycle components
        await container.start()

        # Use services (all resources are initialized)
        user_service = container.resolve(UserService)
        users = await user_service.find_all()

        # Stop all @lifecycle components (reverse order)
        await container.stop()

    Async context manager (recommended)::

        from dioxide import Container, Profile

        async with Container() as container:
            container.scan(profile=Profile.PRODUCTION)
            # All @lifecycle components initialized here

            user_service = container.resolve(UserService)
            users = await user_service.find_all()

        # All @lifecycle components disposed here (even if exception raised)

Testing with Lifecycle:
    Use fast fakes that don't need real resources::

        from dioxide import adapter, Profile, lifecycle


        @adapter.for_(DatabasePort, profile=Profile.TEST)
        @lifecycle
        class FakeDatabaseAdapter:
            async def initialize(self) -> None:
                self.records = {}
                print('Fake database ready (no real connection)')

            async def dispose(self) -> None:
                self.records.clear()
                print('Fake database cleared')

            async def query(self, sql: str) -> list[dict]:
                # Fast in-memory queries
                return list(self.records.values())


        # Test container - uses fake adapters, no real infrastructure needed
        async with Container() as container:
            container.scan(profile=Profile.TEST)
            # Fast initialization - no network calls

            service = container.resolve(UserService)
            await service.create_user('alice@example.com')

        # Fast cleanup - no network calls

Error Handling:
    Initialization failure with automatic rollback::

        @adapter.for_(DatabasePort, profile=Profile.PRODUCTION)
        @lifecycle
        class PostgresAdapter:
            async def initialize(self) -> None:
                self.engine = create_async_engine('postgresql://...')
                # If connection fails, raises exception
                async with self.engine.connect() as conn:
                    await conn.execute('SELECT 1')


        async with Container() as container:
            try:
                container.scan(profile=Profile.PRODUCTION)
                # If database connection fails during start():
                # - initialize() raises exception
                # - Container automatically calls dispose() on already-initialized components
                # - Exception propagates to caller
            except Exception as e:
                print(f'Startup failed: {e}')
                # All initialized components have been cleaned up

Best Practices:
    - **Keep initialize() fast**: Avoid expensive operations, defer to first use if possible
    - **Make dispose() idempotent**: Safe to call multiple times, check if resources exist
    - **Don't raise in dispose()**: Log errors but continue cleanup (best-effort)
    - **Use for adapters, not services**: Services rarely need lifecycle (they're stateless logic)
    - **Test with fakes**: Use fast fake adapters in tests, no lifecycle overhead
    - **Connection pooling**: Initialize connection pools in initialize(), dispose in dispose()
    - **Graceful degradation**: Handle initialization failures gracefully

Common Patterns:
    Database connection pool::

        @lifecycle
        class DatabaseAdapter:
            async def initialize(self) -> None:
                self.pool = await asyncpg.create_pool(...)

            async def dispose(self) -> None:
                if self.pool:
                    await self.pool.close()

    Message queue consumer::

        @lifecycle
        class MessageQueueAdapter:
            async def initialize(self) -> None:
                self.consumer = await create_consumer(...)
                await self.consumer.start()

            async def dispose(self) -> None:
                if self.consumer:
                    await self.consumer.stop()

    HTTP session::

        @lifecycle
        class HttpClientAdapter:
            async def initialize(self) -> None:
                self.session = aiohttp.ClientSession()

            async def dispose(self) -> None:
                if self.session:
                    await self.session.close()

Important - Async/Sync Relationship:
    Lifecycle methods (``initialize()``, ``dispose()``) are **async**, while
    ``container.resolve()`` is **sync**. This is intentional:

    - ``resolve()`` is fast and returns already-initialized instances
    - Lifecycle methods run once at ``container.start()`` / ``container.stop()``
    - Always call ``start()`` (or use ``async with container:``) before resolving
      components that have ``@lifecycle``

    See the :doc:`/guides/lifecycle-async-patterns` guide for detailed patterns.

See Also:
    - :class:`dioxide.container.Container.start` - Initialize lifecycle components
    - :class:`dioxide.container.Container.stop` - Dispose lifecycle components
    - :class:`dioxide.adapter.adapter` - For marking boundary implementations
    - :class:`dioxide.services.service` - For core domain logic
    - :class:`dioxide.exceptions.CircularDependencyError` - Raised on circular dependencies
    - :doc:`/guides/lifecycle-async-patterns` - Async/sync patterns guide
"""

import inspect
from typing import TypeVar

[docs] T = TypeVar('T', bound=type)
[docs] def lifecycle(cls: T) -> T: """Mark a class for lifecycle management with initialization and cleanup. The @lifecycle decorator marks a service or adapter as requiring lifecycle management, which means it needs to be initialized before use and disposed of when the application shuts down. This is essential for managing resources like database connections, caches, message queues, and other infrastructure components that require setup and teardown. The decorator performs compile-time validation to ensure the decorated class implements the required async methods. This provides early error detection (at import time) rather than runtime failures. Required Methods: The decorated class MUST implement both of these async methods: - ``async def initialize(self) -> None``: Called once when the container starts (via ``container.start()`` or ``async with container:``). Use this to establish connections, load resources, warm caches, etc. This method is called in dependency order (dependencies are initialized before their dependents). - ``async def dispose(self) -> None``: Called once when the container stops (via ``container.stop()`` or when exiting the ``async with`` block). Use this to close connections, flush buffers, release resources, etc. This method is called in reverse dependency order (dependents are disposed before their dependencies). Should be idempotent and not raise exceptions. Decorator Composition: @lifecycle works with both @service and @adapter.for_() decorators. **Decorator order does not affect functionality** - both orderings work identically because dioxide decorators only add metadata attributes. For consistency, we **recommend** @lifecycle as the innermost decorator: - ``@service`` + ``@lifecycle`` - For stateful core logic (rare) - ``@adapter.for_()`` + ``@lifecycle`` - For infrastructure adapters (common) Both orders work:: # Recommended (but both work identically) @adapter.for_(Port, profile=Profile.PRODUCTION) @lifecycle class MyAdapter: ... # Also works (not recommended for consistency) @lifecycle @adapter.for_(Port, profile=Profile.PRODUCTION) class MyAdapter: ... Args: cls: The class to mark for lifecycle management. Must implement both ``initialize()`` and ``dispose()`` methods as async coroutines. Returns: The decorated class with ``_dioxide_lifecycle = True`` attribute set. The class can be used normally and will be discovered by the container. Raises: TypeError: If the class does not implement ``initialize()`` method. TypeError: If ``initialize()`` is not an async coroutine function. TypeError: If the class does not implement ``dispose()`` method. TypeError: If ``dispose()`` is not an async coroutine function. Examples: Service with lifecycle (stateful core logic):: from dioxide import service, lifecycle @service @lifecycle class CacheWarmer: def __init__(self, db: DatabasePort): self.db = db self.cache = {} async def initialize(self) -> None: # Load all users into memory cache users = await self.db.query('SELECT * FROM users') for user in users: self.cache[user.id] = user print(f'Cache warmed with {len(users)} users') async def dispose(self) -> None: # Flush any pending writes self.cache.clear() print('Cache cleared') Adapter with lifecycle (infrastructure connection):: from dioxide import adapter, Profile, lifecycle @adapter.for_(DatabasePort, profile=Profile.PRODUCTION) @lifecycle class PostgresAdapter: def __init__(self, config: AppConfig): self.config = config self.engine = None async def initialize(self) -> None: # Establish database connection pool self.engine = create_async_engine(self.config.database_url, pool_size=10, max_overflow=20) # Verify connection async with self.engine.connect() as conn: await conn.execute('SELECT 1') print('Database connection established') async def dispose(self) -> None: # Close all connections in pool if self.engine: await self.engine.dispose() self.engine = None print('Database connection closed') async def query(self, sql: str) -> list[dict]: async with self.engine.connect() as conn: result = await conn.execute(sql) return [dict(row) for row in result] Multiple lifecycle components with dependencies:: # Database adapter (no dependencies) - initialized first @adapter.for_(DatabasePort, profile=Profile.PRODUCTION) @lifecycle class PostgresAdapter: async def initialize(self) -> None: self.engine = create_async_engine(...) async def dispose(self) -> None: await self.engine.dispose() # Service depends on database - initialized after database @service @lifecycle class UserRepository: def __init__(self, db: DatabasePort): self.db = db self.initialized = False async def initialize(self) -> None: # Database is already initialized at this point # Run migrations or setup await self.db.query('CREATE TABLE IF NOT EXISTS users ...') self.initialized = True async def dispose(self) -> None: self.initialized = False # Container handles dependency order automatically: # 1. PostgresAdapter.initialize() # 2. UserRepository.initialize() # ... application runs ... # 1. UserRepository.dispose() # 2. PostgresAdapter.dispose() Validation errors at decoration time:: @service @lifecycle class BrokenService: # Missing initialize() and dispose() methods pass # Raises TypeError: BrokenService must implement initialize() method @service @lifecycle class SyncService: def initialize(self) -> None: # Not async! pass async def dispose(self) -> None: pass # Raises TypeError: SyncService.initialize() must be async Best Practices: - **Keep initialize() fast**: Avoid expensive operations, connection checks only - **Make dispose() idempotent**: Safe to call multiple times (check if resource exists) - **Don't raise in dispose()**: Log errors but continue cleanup (best-effort) - **Use for adapters**: Infrastructure components at the seams (databases, queues, etc.) - **Rare for services**: Core domain logic is usually stateless (no lifecycle needed) - **Consistent ordering**: For readability, use ``@adapter.for_() @lifecycle class ...`` (though both orders work identically) See Also: - :class:`dioxide.container.Container.start` - Initialize all lifecycle components - :class:`dioxide.container.Container.stop` - Dispose all lifecycle components - :class:`dioxide.adapter.adapter` - For marking infrastructure adapters - :class:`dioxide.services.service` - For marking core domain services - :doc:`/guides/lifecycle-async-patterns` - Async/sync patterns guide """ # Validate that initialize() method exists if not hasattr(cls, 'initialize'): msg = f'{cls.__name__} must implement initialize() method' raise TypeError(msg) # Validate that initialize() is async init_method = cls.initialize if not inspect.iscoroutinefunction(init_method): msg = f'{cls.__name__}.initialize() must be async' raise TypeError(msg) # Validate that dispose() method exists if not hasattr(cls, 'dispose'): msg = f'{cls.__name__} must implement dispose() method' raise TypeError(msg) # Validate that dispose() is async dispose_method = cls.dispose # type: ignore[attr-defined] if not inspect.iscoroutinefunction(dispose_method): msg = f'{cls.__name__}.dispose() must be async' raise TypeError(msg) cls._dioxide_lifecycle = True # type: ignore[attr-defined] return cls