Tutorial 3: Multi-Tier Application¶
This tutorial demonstrates a realistic multi-tier application with multiple ports, adapters, and services working together.
Real-World Application Structure¶
Most applications need multiple infrastructure dependencies:
Database - PostgreSQL in production, in-memory in tests
Cache - Redis in production, dict in tests
Email - SendGrid in production, fake in tests
External APIs - Real HTTP calls in production, fakes in tests
We’ll build a notification system that demonstrates these patterns.
Application Architecture¶
Our notification system has three layers:
Ports - Interfaces defining what we need
Adapters - Implementations for different environments
Services - Business logic orchestrating ports
┌─────────────────────────────────────────┐
│ NotificationService (Core) │ ← Business Logic
│ - Orchestrates multiple ports │
└──────────────┬──────────────────────────┘
│ depends on
▼
┌─────────────────────────────────────────┐
│ Ports (Protocols - Interfaces) │
│ - UserRepository │
│ - CachePort │
│ - EmailPort │
└──────────────┬──────────────────────────┘
│ implemented by
▼
┌─────────────────────────────────────────┐
│ Adapters (Profile-specific) │
│ Production: Postgres, Redis, SendGrid │
│ Test: InMemory, Dict, Fake │
└─────────────────────────────────────────┘
Step 1: Define Ports¶
Create interfaces for all infrastructure needs:
from typing import Protocol
from dataclasses import dataclass
from datetime import datetime
@dataclass
class User:
"""User domain model."""
id: int
email: str
name: str
last_notified_at: datetime | None = None
class UserRepository(Protocol):
"""Port for user data access."""
async def find_by_id(self, user_id: int) -> User | None:
"""Find user by ID."""
...
async def save(self, user: User) -> None:
"""Save or update user."""
...
class CachePort(Protocol):
"""Port for caching."""
async def get(self, key: str) -> str | None:
"""Get cached value."""
...
async def set(self, key: str, value: str, ttl_seconds: int) -> None:
"""Set cached value with TTL."""
...
class EmailPort(Protocol):
"""Port for email sending."""
async def send(self, to: str, subject: str, body: str) -> None:
"""Send email."""
...
Step 2: Create Production Adapters¶
PostgreSQL User Repository¶
from dioxide import adapter, Profile, service
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
@service
class Database:
"""Database connection - shared across repositories."""
def __init__(self, config: AppConfig):
self.engine: AsyncEngine | None = None
self.database_url = config.database_url
async def initialize(self) -> None:
"""Initialize connection (lifecycle method)."""
self.engine = create_async_engine(self.database_url)
async def dispose(self) -> None:
"""Close connection (lifecycle method)."""
if self.engine:
await self.engine.dispose()
@adapter.for_(UserRepository, profile=Profile.PRODUCTION)
class PostgresUserRepository:
"""Production user repository using PostgreSQL."""
def __init__(self, db: Database):
self.db = db
async def find_by_id(self, user_id: int) -> User | None:
"""Find user in PostgreSQL."""
async with self.db.engine.begin() as conn:
result = await conn.execute(
"SELECT id, email, name, last_notified_at FROM users WHERE id = :id",
{"id": user_id}
)
row = result.fetchone()
if row:
return User(
id=row.id,
email=row.email,
name=row.name,
last_notified_at=row.last_notified_at
)
return None
async def save(self, user: User) -> None:
"""Save user to PostgreSQL."""
async with self.db.engine.begin() as conn:
await conn.execute(
"""
INSERT INTO users (id, email, name, last_notified_at)
VALUES (:id, :email, :name, :last_notified_at)
ON CONFLICT (id) DO UPDATE SET
email = EXCLUDED.email,
name = EXCLUDED.name,
last_notified_at = EXCLUDED.last_notified_at
""",
{
"id": user.id,
"email": user.email,
"name": user.name,
"last_notified_at": user.last_notified_at
}
)
Redis Cache Adapter¶
import redis.asyncio as redis
@adapter.for_(CachePort, profile=Profile.PRODUCTION)
class RedisAdapter:
"""Production cache using Redis."""
def __init__(self, config: AppConfig):
self.redis: redis.Redis | None = None
self.redis_url = config.redis_url
async def initialize(self) -> None:
"""Connect to Redis (lifecycle method)."""
self.redis = await redis.from_url(self.redis_url)
async def dispose(self) -> None:
"""Close Redis connection (lifecycle method)."""
if self.redis:
await self.redis.close()
async def get(self, key: str) -> str | None:
"""Get from Redis."""
value = await self.redis.get(key)
return value.decode() if value else None
async def set(self, key: str, value: str, ttl_seconds: int) -> None:
"""Set in Redis with TTL."""
await self.redis.setex(key, ttl_seconds, value)
SendGrid Email Adapter¶
import httpx
@adapter.for_(EmailPort, profile=Profile.PRODUCTION)
class SendGridAdapter:
"""Production email using SendGrid."""
def __init__(self, config: AppConfig):
self.api_key = config.sendgrid_api_key
async def send(self, to: str, subject: str, body: str) -> None:
"""Send email via SendGrid API."""
async with httpx.AsyncClient() as client:
await client.post(
"https://api.sendgrid.com/v3/mail/send",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"personalizations": [{"to": [{"email": to}]}],
"from": {"email": "noreply@example.com"},
"subject": subject,
"content": [{"type": "text/plain", "value": body}]
}
)
Step 3: Create Test Adapters (Fakes)¶
In-Memory User Repository¶
@adapter.for_(UserRepository, profile=Profile.TEST)
class InMemoryUserRepository:
"""Fast fake for testing - no database required."""
def __init__(self):
self.users: dict[int, User] = {}
async def find_by_id(self, user_id: int) -> User | None:
"""Find user in memory."""
return self.users.get(user_id)
async def save(self, user: User) -> None:
"""Save user to memory."""
self.users[user.id] = user
def seed(self, *users: User) -> None:
"""Seed test data - only available in fakes!"""
for user in users:
self.users[user.id] = user
Dict Cache Adapter¶
@adapter.for_(CachePort, profile=Profile.TEST)
class DictCacheAdapter:
"""Fast fake cache using dict - no Redis required."""
def __init__(self):
self.cache: dict[str, str] = {}
async def get(self, key: str) -> str | None:
"""Get from dict."""
return self.cache.get(key)
async def set(self, key: str, value: str, ttl_seconds: int) -> None:
"""Set in dict (TTL not enforced in tests)."""
self.cache[key] = value
Fake Email Adapter¶
@adapter.for_(EmailPort, profile=Profile.TEST)
class FakeEmailAdapter:
"""Fast fake email - no network calls."""
def __init__(self):
self.sent_emails: list[dict] = []
async def send(self, to: str, subject: str, body: str) -> None:
"""Record email instead of sending."""
self.sent_emails.append({
"to": to,
"subject": subject,
"body": body
})
Step 4: Implement Service Layer¶
The service layer orchestrates multiple ports:
from dioxide import service
from datetime import datetime, timedelta, UTC
@service
class NotificationService:
"""Core business logic - orchestrates multiple ports."""
def __init__(
self,
users: UserRepository,
cache: CachePort,
email: EmailPort
):
"""All dependencies injected via constructor."""
self.users = users
self.cache = cache
self.email = email
async def send_notification(self, user_id: int, message: str) -> bool:
"""Send notification with throttling and caching.
Business rules:
1. Don't send if user doesn't exist
2. Don't send more than once per hour (throttling)
3. Cache notification status for 5 minutes
"""
# Check cache first
cache_key = f"notification:{user_id}"
cached = await self.cache.get(cache_key)
if cached:
print(f"⚡ Notification cached, skipping send")
return False
# Get user
user = await self.users.find_by_id(user_id)
if not user:
print(f"❌ User {user_id} not found")
return False
# Throttle: Don't send if sent within past hour
if user.last_notified_at:
elapsed = datetime.now(UTC) - user.last_notified_at
if elapsed < timedelta(hours=1):
print(f"⏱️ Throttled: Last notified {elapsed.total_seconds():.0f}s ago")
return False
# Send notification
await self.email.send(
to=user.email,
subject="New Notification",
body=message
)
# Update user
user.last_notified_at = datetime.now(UTC)
await self.users.save(user)
# Cache for 5 minutes
await self.cache.set(cache_key, "sent", ttl_seconds=300)
print(f"✅ Notification sent to {user.email}")
return True
Complete Example¶
Here’s a complete, runnable example:
"""
Multi-Tier Application Example
This example demonstrates:
- Multiple ports (database, cache, email)
- Multiple adapters per port (production, test)
- Service orchestrating multiple dependencies
- Testing with all fakes (no I/O)
"""
import asyncio
from typing import Protocol
from dataclasses import dataclass
from datetime import datetime, timedelta, UTC
from dioxide import adapter, service, Container, Profile
# ===== DOMAIN MODEL =====
@dataclass
class User:
id: int
email: str
name: str
last_notified_at: datetime | None = None
# ===== PORTS =====
class UserRepository(Protocol):
async def find_by_id(self, user_id: int) -> User | None: ...
async def save(self, user: User) -> None: ...
class CachePort(Protocol):
async def get(self, key: str) -> str | None: ...
async def set(self, key: str, value: str, ttl_seconds: int) -> None: ...
class EmailPort(Protocol):
async def send(self, to: str, subject: str, body: str) -> None: ...
# ===== PRODUCTION ADAPTERS =====
@adapter.for_(UserRepository, profile=Profile.PRODUCTION)
class PostgresUserRepository:
async def find_by_id(self, user_id: int) -> User | None:
print(f"💾 [Postgres] Looking up user {user_id}")
# Simulate database query
return User(id=user_id, email=f"user{user_id}@example.com", name=f"User {user_id}")
async def save(self, user: User) -> None:
print(f"💾 [Postgres] Saving user {user.id}")
@adapter.for_(CachePort, profile=Profile.PRODUCTION)
class RedisAdapter:
def __init__(self):
self.cache = {}
async def get(self, key: str) -> str | None:
print(f"🔴 [Redis] GET {key}")
return self.cache.get(key)
async def set(self, key: str, value: str, ttl_seconds: int) -> None:
print(f"🔴 [Redis] SET {key} (TTL: {ttl_seconds}s)")
self.cache[key] = value
@adapter.for_(EmailPort, profile=Profile.PRODUCTION)
class SendGridAdapter:
async def send(self, to: str, subject: str, body: str) -> None:
print(f"📧 [SendGrid] Sending to {to}: {subject}")
# ===== TEST ADAPTERS (FAKES) =====
@adapter.for_(UserRepository, profile=Profile.TEST)
class InMemoryUserRepository:
def __init__(self):
self.users: dict[int, User] = {}
async def find_by_id(self, user_id: int) -> User | None:
return self.users.get(user_id)
async def save(self, user: User) -> None:
self.users[user.id] = user
def seed(self, *users: User) -> None:
for user in users:
self.users[user.id] = user
@adapter.for_(CachePort, profile=Profile.TEST)
class DictCacheAdapter:
def __init__(self):
self.cache: dict[str, str] = {}
async def get(self, key: str) -> str | None:
return self.cache.get(key)
async def set(self, key: str, value: str, ttl_seconds: int) -> None:
self.cache[key] = value
@adapter.for_(EmailPort, profile=Profile.TEST)
class FakeEmailAdapter:
def __init__(self):
self.sent_emails: list[dict] = []
async def send(self, to: str, subject: str, body: str) -> None:
self.sent_emails.append({"to": to, "subject": subject, "body": body})
# ===== SERVICE =====
@service
class NotificationService:
def __init__(self, users: UserRepository, cache: CachePort, email: EmailPort):
self.users = users
self.cache = cache
self.email = email
async def send_notification(self, user_id: int, message: str) -> bool:
# Check cache
cache_key = f"notification:{user_id}"
if await self.cache.get(cache_key):
print(f"⚡ Cached, skipping")
return False
# Get user
user = await self.users.find_by_id(user_id)
if not user:
print(f"❌ User not found")
return False
# Throttle check
if user.last_notified_at:
elapsed = datetime.now(UTC) - user.last_notified_at
if elapsed < timedelta(hours=1):
print(f"⏱️ Throttled")
return False
# Send
await self.email.send(user.email, "Notification", message)
user.last_notified_at = datetime.now(UTC)
await self.users.save(user)
await self.cache.set(cache_key, "sent", 300)
return True
# ===== USAGE =====
async def main():
print("=" * 70)
print("MULTI-TIER APPLICATION EXAMPLE")
print("=" * 70)
# Production
print("\n🏭 PRODUCTION - PostgreSQL + Redis + SendGrid")
print("-" * 70)
prod_container = Container(profile=Profile.PRODUCTION)
prod_service = prod_container.resolve(NotificationService)
await prod_service.send_notification(1, "Hello from production!")
# Test
print("\n🧪 TEST - In-Memory + Dict + Fake")
print("-" * 70)
test_container = Container(profile=Profile.TEST)
# Seed test data
users = test_container.resolve(UserRepository)
users.seed(User(id=1, email="alice@test.com", name="Alice"))
test_service = test_container.resolve(NotificationService)
result = await test_service.send_notification(1, "Test message")
print(f"Result: {result}")
# Verify
fake_email = test_container.resolve(EmailPort)
assert len(fake_email.sent_emails) == 1
print("✅ Test passed: Email was sent")
print("\n" + "=" * 70)
print("KEY TAKEAWAYS:")
print("✅ Service orchestrates multiple ports")
print("✅ All dependencies auto-injected")
print("✅ Profile switches all adapters at once")
print("✅ Test fakes are fast and deterministic")
print("=" * 70)
if __name__ == "__main__":
asyncio.run(main())
Testing the Multi-Tier Application¶
With fakes, testing is straightforward:
import pytest
from dioxide import Container, Profile
@pytest.fixture
def container():
c = Container()
c.scan("myapp", profile=Profile.TEST)
return c
@pytest.fixture
def notification_service(container):
return container.resolve(NotificationService)
@pytest.fixture
def user_repo(container):
return container.resolve(UserRepository)
@pytest.fixture
def fake_email(container):
return container.resolve(EmailPort)
@pytest.mark.asyncio
async def test_sends_notification_to_existing_user(
notification_service, user_repo, fake_email
):
"""Sends notification when user exists and not throttled."""
# Arrange
user_repo.seed(User(id=1, email="alice@test.com", name="Alice"))
# Act
result = await notification_service.send_notification(1, "Hello Alice!")
# Assert
assert result is True
assert len(fake_email.sent_emails) == 1
assert fake_email.sent_emails[0]["to"] == "alice@test.com"
@pytest.mark.asyncio
async def test_throttles_duplicate_notifications(
notification_service, user_repo, fake_email
):
"""Does not send duplicate notification within throttle window."""
# Arrange
user = User(
id=1,
email="alice@test.com",
name="Alice",
last_notified_at=datetime.now(UTC) # Just notified
)
user_repo.seed(user)
# Act
result = await notification_service.send_notification(1, "Hello again!")
# Assert
assert result is False # Throttled
assert len(fake_email.sent_emails) == 0 # No email sent
No mocks, no patches, no async magic - just fast, deterministic tests!
Key Concepts¶
Multiple Ports¶
Applications typically need multiple ports:
Data access - UserRepository, ProductRepository, etc.
Caching - CachePort
External services - EmailPort, PaymentPort, etc.
Time - ClockPort (for testable time-dependent logic)
Service Orchestration¶
Services orchestrate multiple ports to implement business logic:
@service
class OrderService:
def __init__(
self,
orders: OrderRepository,
payments: PaymentPort,
email: EmailPort,
inventory: InventoryPort
):
# All injected automatically
pass
async def place_order(self, order: Order):
# 1. Reserve inventory
# 2. Process payment
# 3. Save order
# 4. Send confirmation email
pass
Profile Switching¶
Changing profile switches all adapters at once:
# Production: PostgreSQL + Redis + SendGrid
container.scan("myapp", profile=Profile.PRODUCTION)
# Test: In-Memory + Dict + Fake
container.scan("myapp", profile=Profile.TEST)
# Same service, different adapters!
service = container.resolve(NotificationService)
Test Data Seeding¶
Fakes can have helper methods for test setup:
@adapter.for_(UserRepository, profile=Profile.TEST)
class InMemoryUserRepository:
def seed(self, *users: User) -> None:
"""Seed test data - only in fakes!"""
for user in users:
self.users[user.id] = user
# In tests
users = container.resolve(UserRepository)
users.seed(
User(id=1, email="alice@test.com", name="Alice"),
User(id=2, email="bob@test.com", name="Bob")
)
Next Steps¶
This tutorial showed a multi-tier application without lifecycle management. In the next tutorial, we’ll add:
@lifecycledecorator for initialization and cleanupDatabase connections that need startup/shutdown
Async context manager usage
Resource leak prevention
Continue to: Tutorial 4: Lifecycle Management