Week 9 — Day 1: Tenant Isolation Strategies
System Design Mastery Series — Multi-Tenancy, Security, and Compliance Week
Preface
You've built an amazing SaaS product. Users love it. Growth is exploding.
Then your first enterprise customer shows up:
THE ENTERPRISE REALITY
Sales team: "Great news! We just closed Acme Corp — $500K ARR!"
Acme Corp's security team sends their requirements:
┌────────────────────────────────────────────────────────────────────────┐
│ │
│ ACME CORP SECURITY REQUIREMENTS │
│ │
│ 1. "Our data must be completely isolated from other customers" │
│ │
│ 2. "We need audit logs of every access to our data" │
│ │
│ 3. "Can you prove that your engineers can't see our data?" │
│ │
│ 4. "What happens if another customer's bug affects our instance?" │
│ │
│ 5. "We require SOC 2 Type II and annual penetration testing" │
│ │
└────────────────────────────────────────────────────────────────────────┘
You look at your architecture:
Current state:
├── Single PostgreSQL database
├── All customers in same tables
├── WHERE tenant_id = ? everywhere (hopefully)
├── Shared Redis cache
└── One Elasticsearch cluster
Question: Can you prove isolation?
Answer: Not really.
This is the multi-tenancy challenge.
Today, we'll learn to design systems where thousands of customers share infrastructure safely — from startups to enterprises.
Part I: Foundations
Chapter 1: What Is Multi-Tenancy?
1.1 The Simple Definition
Multi-tenancy is an architecture where a single instance of software serves multiple customers (tenants), with each tenant's data and configuration isolated from others.
MULTI-TENANCY VISUALIZATION
Single-Tenant (old world):
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Customer A │ │ Customer B │ │ Customer C │
│ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │
│ │ App │ │ │ │ App │ │ │ │ App │ │
│ │ Server│ │ │ │ Server│ │ │ │ Server│ │
│ └───────┘ │ │ └───────┘ │ │ └───────┘ │
│ ┌───────┐ │ │ ┌───────┐ │ │ ┌───────┐ │
│ │ DB │ │ │ │ DB │ │ │ │ DB │ │
│ └───────┘ │ │ └───────┘ │ │ └───────┘ │
└─────────────┘ └─────────────┘ └─────────────┘
$500/mo $500/mo $500/mo
Cost per customer: High
Isolation: Perfect
Scalability: Limited
Multi-Tenant (modern SaaS):
┌────────────────────────────────────────────────┐
│ Shared Infrastructure │
│ ┌─────────────────────────────────────────┐ │
│ │ Application Servers │ │
│ │ Tenant A │ Tenant B │ Tenant C │ │
│ └─────────────────────────────────────────┘ │
│ ┌────────────────────────────────────────┐ │
│ │ Shared Database │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │ A │ │ B │ │ C │ (isolated) │ │
│ │ └─────┘ └─────┘ └─────┘ │ │
│ └────────────────────────────────────────┘ │
└────────────────────────────────────────────────┘
$50/mo $50/mo $50/mo
Cost per customer: Low
Isolation: Depends on implementation
Scalability: High
1.2 Why Multi-Tenancy Matters
BUSINESS DRIVERS
Cost Efficiency:
├── Shared infrastructure = lower cost per customer
├── $50/month pricing impossible with dedicated infrastructure
├── Enables SMB and self-serve markets
└── Cloud-native economics
Operational Efficiency:
├── One codebase to maintain
├── One deployment to manage
├── Updates roll out to everyone
└── Simpler monitoring and debugging
Scalability:
├── Add customers without adding servers 1:1
├── Better resource utilization
├── Elastic scaling across tenants
└── Standard SaaS economics
THE CHALLENGE:
├── How do you prevent Tenant A from seeing Tenant B's data?
├── How do you prevent Tenant A from affecting Tenant B's performance?
├── How do you meet enterprise security requirements?
└── How do you handle different compliance needs per tenant?
1.3 The Isolation Spectrum
ISOLATION LEVELS
Level 0: NO ISOLATION (Don't do this)
─────────────────────────────────────
All data in same tables, no tenant identifier
└── Security: None
└── Use case: Never
Level 1: SHARED DATABASE, SHARED SCHEMA
─────────────────────────────────────────
Same database, same tables, tenant_id column
└── Isolation: Logical (application-enforced)
└── Security: Low-Medium
└── Use case: Startups, SMB SaaS
Level 2: SHARED DATABASE, SEPARATE SCHEMAS
──────────────────────────────────────────
Same database server, schema per tenant
└── Isolation: Logical (database-enforced)
└── Security: Medium
└── Use case: Growing SaaS, some enterprise
Level 3: SEPARATE DATABASES
───────────────────────────
Database instance per tenant (same server cluster)
└── Isolation: Physical at database level
└── Security: Medium-High
└── Use case: Enterprise SaaS
Level 4: SEPARATE INFRASTRUCTURE
────────────────────────────────
Dedicated servers, network isolation
└── Isolation: Physical
└── Security: High
└── Use case: Regulated industries, large enterprise
Level 5: SEPARATE CLOUD ACCOUNTS
────────────────────────────────
Different AWS/GCP accounts per tenant
└── Isolation: Complete
└── Security: Highest
└── Use case: Government, healthcare, finance
Chapter 2: Isolation Patterns Deep Dive
2.1 Pattern 1: Shared Tables with Tenant ID
SHARED TABLES PATTERN
Database Schema:
┌───────────────────────────────────────────────────────────────────────┐
│ │
│ users table: │
│ ┌────────────┬────────────┬────────────┬────────────┬──────────────┐ │
│ │ id │ tenant_id │ email │ name │ created_at │ │
│ ├────────────┼────────────┼────────────┼────────────┼──────────────┤ │
│ │ 1 │ acme │ a@acme.com │ Alice │ 2024-01-01 │ │
│ │ 2 │ acme │ b@acme.com │ Bob │ 2024-01-02 │ │
│ │ 3 │ globex │ c@globex.. │ Carol │ 2024-01-03 │ │
│ │ 4 │ initech │ d@inite... │ Dave │ 2024-01-04 │ │
│ └────────────┴────────────┴────────────┴────────────┴──────────────┘ │
│ │
│ Every table has tenant_id column │
│ Every query MUST include WHERE tenant_id = ? │
│ │
└───────────────────────────────────────────────────────────────────────┘
Pros:
├── Simplest to implement
├── Lowest cost (one database)
├── Easy cross-tenant operations (admin)
├── Simple backup/restore
└── Works well up to thousands of tenants
Cons:
├── Risk of data leakage (forgot WHERE clause)
├── No native database isolation
├── Noisy neighbor at database level
├── Hard to give tenant their own backup
└── Enterprise customers may reject this
2.2 Pattern 2: Schema Per Tenant
SCHEMA PER TENANT PATTERN
Database Structure:
┌───────────────────────────────────────────────────────────────────────┐
│ │
│ PostgreSQL Database │
│ │ │
│ ├── Schema: tenant_acme │
│ │ ├── users │
│ │ ├── orders │
│ │ └── settings │
│ │ │
│ ├── Schema: tenant_globex │
│ │ ├── users │
│ │ ├── orders │
│ │ └── settings │
│ │ │
│ └── Schema: tenant_initech │
│ ├── users │
│ ├── orders │
│ └── settings │
│ │
│ Connection sets search_path per tenant: │
│ SET search_path TO tenant_acme; │
│ │
└───────────────────────────────────────────────────────────────────────┘
Pros:
├── Database-level isolation
├── Can't accidentally query wrong tenant
├── Per-tenant backup possible
├── Schema migrations per tenant possible
├── Better security posture
└── PostgreSQL RLS can add extra layer
Cons:
├── Schema proliferation (10K tenants = 10K schemas)
├── Migrations more complex
├── Connection management harder
├── Cross-tenant queries need explicit schema
└── Some ORMs don't handle this well
2.3 Pattern 3: Database Per Tenant
DATABASE PER TENANT PATTERN
Infrastructure Structure:
┌───────────────────────────────────────────────────────────────────────┐
│ │
│ Database Cluster │
│ │ │
│ ├── Database: db_acme │
│ │ └── (all acme tables) │
│ │ │
│ ├── Database: db_globex │
│ │ └── (all globex tables) │
│ │ │
│ └── Database: db_initech │
│ └── (all initech tables) │
│ │
│ Each tenant gets dedicated connection string: │
│ acme: postgresql://cluster/db_acme │
│ globex: postgresql://cluster/db_globex │
│ │
└───────────────────────────────────────────────────────────────────────┘
Pros:
├── Strong isolation at database level
├── Independent backup/restore per tenant
├── Can put large tenants on dedicated hardware
├── Easier compliance (data residency)
├── Can offer "dedicated" tier at higher price
└── Clear resource accounting per tenant
Cons:
├── Connection pool per tenant
├── Higher operational complexity
├── Schema migrations across all databases
├── Monitoring complexity increases
└── Cost increases with tenant count
2.4 Pattern Comparison
| Aspect | Shared Tables | Schema/Tenant | DB/Tenant |
|---|---|---|---|
| Cost | Lowest | Low-Medium | Medium-High |
| Isolation | Application | Database | Physical |
| Max Tenants | Millions | Thousands | Hundreds |
| Complexity | Low | Medium | High |
| Backup Granularity | All or nothing | Per schema | Per tenant |
| Compliance | Basic | Better | Best |
| Enterprise Ready | Rarely | Sometimes | Yes |
| Data Residency | Difficult | Possible | Easy |
Chapter 3: Tenant Context and Propagation
3.1 The Tenant Context Problem
TENANT CONTEXT CHALLENGE
Request Flow:
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Client │────▶│ API │────▶│ Service │────▶│ Database │
│ │ │ Gateway │ │ │ │ │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │
▼ ▼ ▼
tenant_id? tenant_id? tenant_id?
Questions:
├── How does the API Gateway know which tenant?
├── How does the service know which tenant?
├── How does the database query know which tenant?
├── How do async jobs know which tenant?
├── How do background workers know which tenant?
Without consistent tenant context:
├── Queries might miss WHERE clause
├── Logs might not show tenant
├── Metrics not attributed correctly
├── Debugging becomes impossible
└── Security incidents undetectable
3.2 Tenant Context Implementation
# tenant_isolation/context.py
"""
Tenant context management for multi-tenant applications.
The tenant context must flow through every layer:
- HTTP request → Service → Database → Cache → Queue → Background job
"""
from contextvars import ContextVar
from dataclasses import dataclass
from typing import Optional
import functools
# Thread-safe tenant context using contextvars
_tenant_context: ContextVar[Optional['TenantContext']] = ContextVar(
'tenant_context',
default=None
)
@dataclass(frozen=True)
class TenantContext:
"""
Immutable tenant context that flows through the application.
Frozen to prevent accidental modification.
"""
tenant_id: str
tenant_name: str
plan: str # 'free', 'pro', 'enterprise'
region: str # For data residency
features: frozenset # Enabled features
# Security context
isolation_level: str # 'shared', 'schema', 'database'
encryption_key_id: Optional[str] = None # For tenant-specific encryption
def get_current_tenant() -> TenantContext:
"""
Get the current tenant context.
Raises TenantContextError if not in tenant context.
"""
ctx = _tenant_context.get()
if ctx is None:
raise TenantContextError("No tenant context set. This is a bug!")
return ctx
def get_current_tenant_id() -> str:
"""Convenience method to get just the tenant ID."""
return get_current_tenant().tenant_id
def set_tenant_context(ctx: TenantContext) -> None:
"""Set the tenant context for the current execution."""
_tenant_context.set(ctx)
def clear_tenant_context() -> None:
"""Clear the tenant context."""
_tenant_context.set(None)
class TenantContextError(Exception):
"""Raised when tenant context is missing or invalid."""
pass
# Context manager for tenant scope
class tenant_scope:
"""
Context manager for executing code in a tenant's context.
Usage:
with tenant_scope(tenant_ctx):
# All code here runs in tenant context
users = user_service.get_users() # Automatically filtered
"""
def __init__(self, tenant: TenantContext):
self.tenant = tenant
self.previous = None
def __enter__(self):
self.previous = _tenant_context.get()
_tenant_context.set(self.tenant)
return self.tenant
def __exit__(self, exc_type, exc_val, exc_tb):
_tenant_context.set(self.previous)
return False
# Decorator for tenant-required functions
def requires_tenant(func):
"""
Decorator that ensures function runs within tenant context.
Usage:
@requires_tenant
def get_user_data():
tenant = get_current_tenant()
# ...
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
if _tenant_context.get() is None:
raise TenantContextError(
f"Function {func.__name__} requires tenant context"
)
return func(*args, **kwargs)
return wrapper
# Async version
def requires_tenant_async(func):
"""Async version of requires_tenant decorator."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
if _tenant_context.get() is None:
raise TenantContextError(
f"Function {func.__name__} requires tenant context"
)
return await func(*args, **kwargs)
return wrapper
3.3 Extracting Tenant from Requests
# tenant_isolation/middleware.py
"""
Middleware for extracting and setting tenant context from HTTP requests.
"""
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import logging
logger = logging.getLogger(__name__)
class TenantMiddleware(BaseHTTPMiddleware):
"""
Middleware that extracts tenant context from requests.
Tenant can be identified via:
1. Subdomain: acme.myapp.com → tenant_id = 'acme'
2. Header: X-Tenant-ID: acme
3. JWT claim: token.tenant_id = 'acme'
4. Path: /api/tenants/acme/users
"""
def __init__(self, app, tenant_service):
super().__init__(app)
self.tenant_service = tenant_service
async def dispatch(self, request: Request, call_next):
# Skip tenant resolution for public endpoints
if self._is_public_endpoint(request.url.path):
return await call_next(request)
# Extract tenant identifier
tenant_id = await self._extract_tenant_id(request)
if not tenant_id:
raise HTTPException(
status_code=400,
detail="Tenant identification required"
)
# Load tenant context from database/cache
tenant_ctx = await self.tenant_service.get_tenant_context(tenant_id)
if not tenant_ctx:
raise HTTPException(
status_code=404,
detail=f"Tenant '{tenant_id}' not found"
)
if not tenant_ctx.is_active:
raise HTTPException(
status_code=403,
detail="Tenant account is suspended"
)
# Set context for this request
with tenant_scope(tenant_ctx):
# Add tenant to request state for logging
request.state.tenant_id = tenant_id
# Process request
response = await call_next(request)
# Add tenant to response headers (for debugging)
response.headers["X-Tenant-ID"] = tenant_id
return response
async def _extract_tenant_id(self, request: Request) -> Optional[str]:
"""
Extract tenant ID from request using multiple strategies.
"""
# Strategy 1: Subdomain
host = request.headers.get("host", "")
if "." in host:
subdomain = host.split(".")[0]
if subdomain not in ["www", "api", "app"]:
return subdomain
# Strategy 2: Header
tenant_header = request.headers.get("X-Tenant-ID")
if tenant_header:
return tenant_header
# Strategy 3: JWT claim (if authenticated)
if hasattr(request.state, "user"):
return request.state.user.tenant_id
# Strategy 4: Query parameter (for webhooks)
tenant_param = request.query_params.get("tenant_id")
if tenant_param:
return tenant_param
return None
def _is_public_endpoint(self, path: str) -> bool:
"""Check if endpoint doesn't require tenant context."""
public_paths = [
"/health",
"/metrics",
"/api/auth/login",
"/api/auth/register",
"/api/tenants/create", # Creating new tenant
]
return any(path.startswith(p) for p in public_paths)
class TenantService:
"""
Service for loading and caching tenant configuration.
"""
def __init__(self, db, cache):
self.db = db
self.cache = cache
async def get_tenant_context(self, tenant_id: str) -> Optional[TenantContext]:
"""
Load tenant context, with caching.
"""
# Check cache first
cache_key = f"tenant:{tenant_id}"
cached = await self.cache.get(cache_key)
if cached:
return TenantContext(**cached)
# Load from database
tenant = await self.db.fetchone(
"""
SELECT
id, name, plan, region, features,
isolation_level, encryption_key_id, is_active
FROM tenants
WHERE id = $1
""",
tenant_id
)
if not tenant:
return None
ctx = TenantContext(
tenant_id=tenant["id"],
tenant_name=tenant["name"],
plan=tenant["plan"],
region=tenant["region"],
features=frozenset(tenant["features"]),
isolation_level=tenant["isolation_level"],
encryption_key_id=tenant["encryption_key_id"]
)
# Cache for 5 minutes
await self.cache.set(cache_key, ctx.__dict__, ttl=300)
return ctx
Part II: Implementation
Chapter 4: Row-Level Security Implementation
4.1 Application-Level Enforcement
# tenant_isolation/repository.py
"""
Repository pattern with automatic tenant filtering.
All database access goes through repositories that automatically
apply tenant filtering.
"""
from typing import TypeVar, Generic, List, Optional
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
class TenantAwareRepository(Generic[T]):
"""
Base repository that automatically filters by tenant.
All queries include tenant_id filter.
All inserts include tenant_id.
"""
def __init__(self, db, table_name: str, model_class: type):
self.db = db
self.table_name = table_name
self.model_class = model_class
async def find_by_id(self, id: str) -> Optional[T]:
"""
Find record by ID within current tenant.
"""
tenant_id = get_current_tenant_id()
row = await self.db.fetchone(
f"""
SELECT * FROM {self.table_name}
WHERE id = $1 AND tenant_id = $2
""",
id, tenant_id
)
if not row:
return None
return self.model_class(**row)
async def find_all(
self,
filters: dict = None,
limit: int = 100,
offset: int = 0
) -> List[T]:
"""
Find all records within current tenant.
"""
tenant_id = get_current_tenant_id()
# Build query with tenant filter always first
query = f"SELECT * FROM {self.table_name} WHERE tenant_id = $1"
params = [tenant_id]
param_idx = 2
if filters:
for key, value in filters.items():
query += f" AND {key} = ${param_idx}"
params.append(value)
param_idx += 1
query += f" LIMIT ${param_idx} OFFSET ${param_idx + 1}"
params.extend([limit, offset])
rows = await self.db.fetch(query, *params)
return [self.model_class(**row) for row in rows]
async def create(self, data: dict) -> T:
"""
Create record within current tenant.
"""
tenant_id = get_current_tenant_id()
# Always set tenant_id
data["tenant_id"] = tenant_id
columns = ", ".join(data.keys())
placeholders = ", ".join(f"${i+1}" for i in range(len(data)))
row = await self.db.fetchone(
f"""
INSERT INTO {self.table_name} ({columns})
VALUES ({placeholders})
RETURNING *
""",
*data.values()
)
return self.model_class(**row)
async def update(self, id: str, data: dict) -> Optional[T]:
"""
Update record within current tenant.
"""
tenant_id = get_current_tenant_id()
# Remove tenant_id from update data (can't change tenant)
data.pop("tenant_id", None)
set_clauses = ", ".join(
f"{key} = ${i+1}" for i, key in enumerate(data.keys())
)
row = await self.db.fetchone(
f"""
UPDATE {self.table_name}
SET {set_clauses}
WHERE id = ${len(data)+1} AND tenant_id = ${len(data)+2}
RETURNING *
""",
*data.values(), id, tenant_id
)
if not row:
return None
return self.model_class(**row)
async def delete(self, id: str) -> bool:
"""
Delete record within current tenant.
"""
tenant_id = get_current_tenant_id()
result = await self.db.execute(
f"""
DELETE FROM {self.table_name}
WHERE id = $1 AND tenant_id = $2
""",
id, tenant_id
)
return result.rowcount > 0
async def count(self, filters: dict = None) -> int:
"""
Count records within current tenant.
"""
tenant_id = get_current_tenant_id()
query = f"SELECT COUNT(*) FROM {self.table_name} WHERE tenant_id = $1"
params = [tenant_id]
if filters:
for i, (key, value) in enumerate(filters.items()):
query += f" AND {key} = ${i+2}"
params.append(value)
result = await self.db.fetchone(query, *params)
return result["count"]
# Example usage
@dataclass
class User:
id: str
tenant_id: str
email: str
name: str
role: str
class UserRepository(TenantAwareRepository[User]):
def __init__(self, db):
super().__init__(db, "users", User)
async def find_by_email(self, email: str) -> Optional[User]:
"""Custom query - still tenant-scoped."""
tenant_id = get_current_tenant_id()
row = await self.db.fetchone(
"""
SELECT * FROM users
WHERE email = $1 AND tenant_id = $2
""",
email, tenant_id
)
if not row:
return None
return User(**row)
4.2 Database-Level Row Security (PostgreSQL RLS)
# tenant_isolation/database_rls.py
"""
PostgreSQL Row-Level Security setup for multi-tenant isolation.
RLS provides database-enforced tenant isolation - even if
application code has a bug, the database won't return wrong data.
"""
RLS_SETUP_SQL = """
-- =============================================================================
-- ROW-LEVEL SECURITY SETUP
-- =============================================================================
-- Enable RLS on tenant tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- Force RLS even for table owners (important!)
ALTER TABLE users FORCE ROW LEVEL SECURITY;
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
ALTER TABLE documents FORCE ROW LEVEL SECURITY;
-- =============================================================================
-- TENANT CONTEXT FUNCTIONS
-- =============================================================================
-- Function to get current tenant from session variable
CREATE OR REPLACE FUNCTION current_tenant_id()
RETURNS TEXT AS $$
BEGIN
RETURN current_setting('app.current_tenant', true);
END;
$$ LANGUAGE plpgsql STABLE;
-- =============================================================================
-- ROW-LEVEL SECURITY POLICIES
-- =============================================================================
-- Users table policies
CREATE POLICY tenant_isolation_users ON users
FOR ALL
TO application_role
USING (tenant_id = current_tenant_id())
WITH CHECK (tenant_id = current_tenant_id());
-- Orders table policies
CREATE POLICY tenant_isolation_orders ON orders
FOR ALL
TO application_role
USING (tenant_id = current_tenant_id())
WITH CHECK (tenant_id = current_tenant_id());
-- Documents table policies
CREATE POLICY tenant_isolation_documents ON documents
FOR ALL
TO application_role
USING (tenant_id = current_tenant_id())
WITH CHECK (tenant_id = current_tenant_id());
-- =============================================================================
-- ADMIN BYPASS POLICY (for support/admin operations)
-- =============================================================================
-- Admin role can see all data (use with extreme caution!)
CREATE POLICY admin_bypass_users ON users
FOR ALL
TO admin_role
USING (true)
WITH CHECK (true);
-- Note: Admin access should be heavily audited
"""
class RLSConnectionManager:
"""
Manages database connections with RLS tenant context.
Sets the tenant context on each connection before use.
"""
def __init__(self, pool):
self.pool = pool
async def get_connection(self):
"""
Get a connection with tenant context set.
"""
conn = await self.pool.acquire()
# Set tenant context for RLS
tenant_id = get_current_tenant_id()
await conn.execute(
f"SET app.current_tenant = '{tenant_id}'"
)
return conn
async def release_connection(self, conn):
"""
Release connection and clear tenant context.
"""
# Clear tenant context before returning to pool
await conn.execute("RESET app.current_tenant")
await self.pool.release(conn)
class TenantAwareConnectionPool:
"""
Connection pool that automatically sets tenant context.
Usage:
async with pool.connection() as conn:
# tenant_id is automatically set
result = await conn.fetch("SELECT * FROM users")
# RLS filters to current tenant
"""
def __init__(self, dsn: str):
self.dsn = dsn
self._pool = None
async def initialize(self):
"""Create the connection pool."""
import asyncpg
self._pool = await asyncpg.create_pool(
self.dsn,
min_size=5,
max_size=20,
setup=self._setup_connection
)
async def _setup_connection(self, conn):
"""Called for each new connection."""
# Connection starts with no tenant context
pass
async def connection(self):
"""Get connection context manager with tenant set."""
return TenantConnection(self._pool)
class TenantConnection:
"""Async context manager for tenant-scoped connections."""
def __init__(self, pool):
self.pool = pool
self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
# Set tenant context
try:
tenant_id = get_current_tenant_id()
await self.conn.execute(
"SELECT set_config('app.current_tenant', $1, true)",
tenant_id
)
except TenantContextError:
# No tenant context - queries will fail RLS
# This is intentional - forces proper context
pass
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.conn:
# Clear tenant context
await self.conn.execute(
"SELECT set_config('app.current_tenant', '', true)"
)
await self.pool.release(self.conn)
return False
Chapter 5: Schema-Per-Tenant Implementation
5.1 Dynamic Schema Management
# tenant_isolation/schema_manager.py
"""
Schema-per-tenant implementation.
Each tenant gets their own database schema with identical structure.
Provides stronger isolation than shared tables.
"""
import logging
from typing import List
logger = logging.getLogger(__name__)
class SchemaPerTenantManager:
"""
Manages schema-per-tenant database architecture.
"""
def __init__(self, admin_pool, schema_prefix: str = "tenant_"):
self.admin_pool = admin_pool
self.schema_prefix = schema_prefix
async def create_tenant_schema(self, tenant_id: str):
"""
Create a new schema for a tenant.
Called during tenant onboarding.
"""
schema_name = f"{self.schema_prefix}{tenant_id}"
async with self.admin_pool.acquire() as conn:
# Create schema
await conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
# Create tables in schema
await self._create_schema_tables(conn, schema_name)
# Grant permissions
await conn.execute(f"""
GRANT USAGE ON SCHEMA {schema_name} TO app_user
""")
await conn.execute(f"""
GRANT ALL ON ALL TABLES IN SCHEMA {schema_name} TO app_user
""")
logger.info(f"Created schema for tenant: {tenant_id}")
async def _create_schema_tables(self, conn, schema_name: str):
"""Create standard tables in tenant schema."""
await conn.execute(f"""
CREATE TABLE {schema_name}.users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
role VARCHAR(50) DEFAULT 'member',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
)
""")
await conn.execute(f"""
CREATE TABLE {schema_name}.documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(500) NOT NULL,
content TEXT,
created_by UUID REFERENCES {schema_name}.users(id),
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
)
""")
await conn.execute(f"""
CREATE TABLE {schema_name}.settings (
key VARCHAR(255) PRIMARY KEY,
value JSONB,
updated_at TIMESTAMP DEFAULT NOW()
)
""")
async def delete_tenant_schema(self, tenant_id: str):
"""
Delete a tenant's schema.
Called during tenant offboarding. Requires confirmation!
"""
schema_name = f"{self.schema_prefix}{tenant_id}"
async with self.admin_pool.acquire() as conn:
# CASCADE drops all objects in schema
await conn.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
logger.info(f"Deleted schema for tenant: {tenant_id}")
async def get_all_tenant_schemas(self) -> List[str]:
"""Get list of all tenant schemas."""
async with self.admin_pool.acquire() as conn:
rows = await conn.fetch(f"""
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name LIKE '{self.schema_prefix}%'
""")
return [row["schema_name"] for row in rows]
async def run_migration(self, migration_sql: str):
"""
Run migration across all tenant schemas.
"""
schemas = await self.get_all_tenant_schemas()
async with self.admin_pool.acquire() as conn:
for schema in schemas:
try:
# Set search path to tenant schema
await conn.execute(f"SET search_path TO {schema}")
# Run migration
await conn.execute(migration_sql)
logger.info(f"Migration complete for schema: {schema}")
except Exception as e:
logger.error(f"Migration failed for {schema}: {e}")
raise
logger.info(f"Migration complete for {len(schemas)} schemas")
class SchemaRoutingConnectionPool:
"""
Connection pool that routes to tenant-specific schema.
"""
def __init__(self, pool, schema_prefix: str = "tenant_"):
self.pool = pool
self.schema_prefix = schema_prefix
async def connection(self):
"""Get connection with schema routing."""
return SchemaRoutedConnection(self.pool, self.schema_prefix)
class SchemaRoutedConnection:
"""Connection that automatically routes to tenant schema."""
def __init__(self, pool, schema_prefix: str):
self.pool = pool
self.schema_prefix = schema_prefix
self.conn = None
async def __aenter__(self):
self.conn = await self.pool.acquire()
# Set search path to tenant schema
tenant_id = get_current_tenant_id()
schema_name = f"{self.schema_prefix}{tenant_id}"
await self.conn.execute(f"SET search_path TO {schema_name}, public")
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.conn:
# Reset search path
await self.conn.execute("SET search_path TO public")
await self.pool.release(self.conn)
return False
Chapter 6: Database-Per-Tenant Implementation
6.1 Database Provisioning
# tenant_isolation/database_per_tenant.py
"""
Database-per-tenant implementation.
Each tenant gets their own database instance.
Provides strongest isolation but highest operational complexity.
"""
from dataclasses import dataclass
from typing import Dict, Optional
import logging
logger = logging.getLogger(__name__)
@dataclass
class TenantDatabase:
"""Configuration for a tenant's database."""
tenant_id: str
host: str
port: int
database: str
username: str
password: str # Should be encrypted/from secrets manager
ssl_mode: str = "require"
@property
def connection_string(self) -> str:
return (
f"postgresql://{self.username}:{self.password}@"
f"{self.host}:{self.port}/{self.database}"
f"?sslmode={self.ssl_mode}"
)
class DatabasePerTenantManager:
"""
Manages database-per-tenant architecture.
In practice, this often integrates with:
- AWS RDS for database provisioning
- Terraform for infrastructure
- Secrets Manager for credentials
"""
def __init__(self, db_cluster_endpoint: str, admin_credentials: dict):
self.cluster_endpoint = db_cluster_endpoint
self.admin_creds = admin_credentials
self.tenant_pools: Dict[str, any] = {}
async def provision_tenant_database(
self,
tenant_id: str,
tier: str = "standard"
) -> TenantDatabase:
"""
Provision a new database for a tenant.
In production, this might:
- Call AWS RDS API to create database
- Use Terraform to provision infrastructure
- Set up replication, backups, etc.
"""
import asyncpg
import secrets
database_name = f"tenant_{tenant_id}"
username = f"user_{tenant_id}"
password = secrets.token_urlsafe(32)
# Connect as admin to create database
admin_conn = await asyncpg.connect(
host=self.cluster_endpoint,
user=self.admin_creds["username"],
password=self.admin_creds["password"],
database="postgres"
)
try:
# Create database
await admin_conn.execute(f"""
CREATE DATABASE {database_name}
WITH ENCODING 'UTF8'
""")
# Create user
await admin_conn.execute(f"""
CREATE USER {username} WITH PASSWORD '{password}'
""")
# Grant permissions
await admin_conn.execute(f"""
GRANT ALL PRIVILEGES ON DATABASE {database_name} TO {username}
""")
logger.info(f"Provisioned database for tenant: {tenant_id}")
finally:
await admin_conn.close()
# Connect to new database to set up schema
tenant_conn = await asyncpg.connect(
host=self.cluster_endpoint,
user=username,
password=password,
database=database_name
)
try:
await self._create_tenant_schema(tenant_conn)
finally:
await tenant_conn.close()
tenant_db = TenantDatabase(
tenant_id=tenant_id,
host=self.cluster_endpoint,
port=5432,
database=database_name,
username=username,
password=password
)
# Store credentials in secrets manager (not shown)
await self._store_credentials(tenant_id, tenant_db)
return tenant_db
async def _create_tenant_schema(self, conn):
"""Create tables in tenant database."""
await conn.execute("""
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
email VARCHAR(255) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
role VARCHAR(50) DEFAULT 'member',
created_at TIMESTAMP DEFAULT NOW()
)
""")
await conn.execute("""
CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
title VARCHAR(500) NOT NULL,
content TEXT,
created_by UUID REFERENCES users(id),
created_at TIMESTAMP DEFAULT NOW()
)
""")
async def _store_credentials(self, tenant_id: str, db: TenantDatabase):
"""Store credentials in secrets manager."""
# In production: AWS Secrets Manager, HashiCorp Vault, etc.
pass
async def get_tenant_pool(self, tenant_id: str):
"""
Get connection pool for a specific tenant.
Pools are created lazily and cached.
"""
import asyncpg
if tenant_id in self.tenant_pools:
return self.tenant_pools[tenant_id]
# Load tenant database config
tenant_db = await self._load_tenant_config(tenant_id)
if not tenant_db:
raise ValueError(f"No database configured for tenant: {tenant_id}")
# Create pool
pool = await asyncpg.create_pool(
tenant_db.connection_string,
min_size=2,
max_size=10
)
self.tenant_pools[tenant_id] = pool
return pool
async def _load_tenant_config(self, tenant_id: str) -> Optional[TenantDatabase]:
"""Load tenant database configuration."""
# In production: Load from secrets manager
pass
class TenantDatabaseRouter:
"""
Routes database connections to the correct tenant database.
"""
def __init__(self, db_manager: DatabasePerTenantManager):
self.db_manager = db_manager
async def connection(self):
"""Get connection to current tenant's database."""
tenant_id = get_current_tenant_id()
pool = await self.db_manager.get_tenant_pool(tenant_id)
return pool.acquire()
Chapter 7: Tenant-Aware Services Pattern
7.1 Complete Service Implementation
# tenant_isolation/service.py
"""
Complete example of a tenant-aware service.
This shows how all the pieces fit together:
- Middleware extracts tenant
- Service uses tenant-aware repository
- Caching is tenant-scoped
- Logging includes tenant context
"""
from fastapi import FastAPI, Depends, HTTPException
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class CreateDocumentRequest:
title: str
content: str
@dataclass
class Document:
id: str
tenant_id: str
title: str
content: str
created_by: str
created_at: datetime
class DocumentService:
"""
Tenant-aware document service.
All operations are automatically scoped to current tenant.
"""
def __init__(self, document_repo, user_repo, cache, event_publisher):
self.documents = document_repo
self.users = user_repo
self.cache = cache
self.events = event_publisher
@requires_tenant_async
async def create_document(
self,
request: CreateDocumentRequest,
created_by_user_id: str
) -> Document:
"""
Create a new document in current tenant.
"""
tenant = get_current_tenant()
# Validate user exists in this tenant
user = await self.users.find_by_id(created_by_user_id)
if not user:
raise ValueError("User not found")
# Create document
document = await self.documents.create({
"title": request.title,
"content": request.content,
"created_by": created_by_user_id,
"created_at": datetime.utcnow()
})
# Invalidate cache
await self._invalidate_document_cache()
# Publish event
await self.events.publish(
topic="documents",
event={
"type": "document.created",
"tenant_id": tenant.tenant_id,
"document_id": document.id,
"created_by": created_by_user_id
}
)
logger.info(
"Document created",
extra={
"tenant_id": tenant.tenant_id,
"document_id": document.id,
"user_id": created_by_user_id
}
)
return document
@requires_tenant_async
async def get_document(self, document_id: str) -> Optional[Document]:
"""
Get document by ID within current tenant.
"""
tenant = get_current_tenant()
cache_key = f"tenant:{tenant.tenant_id}:doc:{document_id}"
# Check cache
cached = await self.cache.get(cache_key)
if cached:
return Document(**cached)
# Query database
document = await self.documents.find_by_id(document_id)
if document:
# Cache for 5 minutes
await self.cache.set(cache_key, document.__dict__, ttl=300)
return document
@requires_tenant_async
async def list_documents(
self,
limit: int = 50,
offset: int = 0
) -> List[Document]:
"""
List documents in current tenant.
"""
return await self.documents.find_all(
limit=limit,
offset=offset
)
@requires_tenant_async
async def search_documents(self, query: str) -> List[Document]:
"""
Search documents within current tenant.
Search index is tenant-scoped.
"""
tenant = get_current_tenant()
# Search in tenant-scoped index
results = await self.search_client.search(
index=f"documents_{tenant.tenant_id}",
query=query
)
return [Document(**r) for r in results]
@requires_tenant_async
async def delete_document(self, document_id: str) -> bool:
"""
Delete document within current tenant.
"""
tenant = get_current_tenant()
# Delete from database
deleted = await self.documents.delete(document_id)
if deleted:
# Invalidate cache
cache_key = f"tenant:{tenant.tenant_id}:doc:{document_id}"
await self.cache.delete(cache_key)
# Publish event
await self.events.publish(
topic="documents",
event={
"type": "document.deleted",
"tenant_id": tenant.tenant_id,
"document_id": document_id
}
)
return deleted
async def _invalidate_document_cache(self):
"""Invalidate document list caches for current tenant."""
tenant = get_current_tenant()
pattern = f"tenant:{tenant.tenant_id}:docs:*"
await self.cache.delete_pattern(pattern)
# FastAPI router with tenant middleware
app = FastAPI()
# Dependency to get document service
async def get_document_service() -> DocumentService:
# In production, use dependency injection
pass
@app.post("/api/documents")
async def create_document(
request: CreateDocumentRequest,
service: DocumentService = Depends(get_document_service)
):
"""
Create a new document.
Tenant context is set by middleware before this runs.
"""
# Get current user from auth context
user_id = get_current_user_id()
document = await service.create_document(request, user_id)
return {"id": document.id, "title": document.title}
@app.get("/api/documents/{document_id}")
async def get_document(
document_id: str,
service: DocumentService = Depends(get_document_service)
):
"""Get a document by ID."""
document = await service.get_document(document_id)
if not document:
raise HTTPException(status_code=404, detail="Document not found")
return document
@app.get("/api/documents")
async def list_documents(
limit: int = 50,
offset: int = 0,
service: DocumentService = Depends(get_document_service)
):
"""List documents in current tenant."""
documents = await service.list_documents(limit=limit, offset=offset)
return {"documents": documents, "count": len(documents)}
7.2 Tenant-Aware Event Processing
# tenant_isolation/events.py
"""
Tenant-aware event processing.
Events in multi-tenant systems must carry tenant context
so consumers can process them correctly.
"""
from dataclasses import dataclass
from typing import Dict, Any, Callable
from datetime import datetime
import json
@dataclass
class TenantEvent:
"""
Event with tenant context.
All events must include tenant_id for proper routing and processing.
"""
event_id: str
event_type: str
tenant_id: str
timestamp: datetime
payload: Dict[str, Any]
def to_json(self) -> str:
return json.dumps({
"event_id": self.event_id,
"event_type": self.event_type,
"tenant_id": self.tenant_id,
"timestamp": self.timestamp.isoformat(),
"payload": self.payload
})
@classmethod
def from_json(cls, data: str) -> 'TenantEvent':
d = json.loads(data)
return cls(
event_id=d["event_id"],
event_type=d["event_type"],
tenant_id=d["tenant_id"],
timestamp=datetime.fromisoformat(d["timestamp"]),
payload=d["payload"]
)
class TenantEventPublisher:
"""
Publishes events with tenant context.
"""
def __init__(self, kafka_producer):
self.producer = kafka_producer
async def publish(self, topic: str, event: Dict[str, Any]):
"""
Publish event with current tenant context.
"""
import uuid
tenant = get_current_tenant()
tenant_event = TenantEvent(
event_id=str(uuid.uuid4()),
event_type=event.get("type", "unknown"),
tenant_id=tenant.tenant_id,
timestamp=datetime.utcnow(),
payload=event
)
# Use tenant_id as partition key for ordering
await self.producer.send(
topic=topic,
key=tenant.tenant_id,
value=tenant_event.to_json()
)
class TenantEventConsumer:
"""
Consumes events and sets tenant context before processing.
"""
def __init__(self, kafka_consumer, tenant_service):
self.consumer = kafka_consumer
self.tenant_service = tenant_service
self.handlers: Dict[str, Callable] = {}
def register_handler(self, event_type: str, handler: Callable):
"""Register handler for event type."""
self.handlers[event_type] = handler
async def start(self):
"""Start consuming events."""
async for message in self.consumer:
await self._process_message(message)
async def _process_message(self, message):
"""Process a single message with tenant context."""
event = TenantEvent.from_json(message.value)
# Load tenant context
tenant_ctx = await self.tenant_service.get_tenant_context(
event.tenant_id
)
if not tenant_ctx:
logger.error(f"Unknown tenant: {event.tenant_id}")
return
# Set tenant context and process
with tenant_scope(tenant_ctx):
handler = self.handlers.get(event.event_type)
if handler:
try:
await handler(event)
except Exception as e:
logger.error(
f"Event processing failed: {e}",
extra={
"tenant_id": event.tenant_id,
"event_id": event.event_id,
"event_type": event.event_type
}
)
else:
logger.warning(f"No handler for event type: {event.event_type}")
7.3 Tenant-Aware Background Jobs
# tenant_isolation/jobs.py
"""
Tenant-aware background job processing.
All background jobs must run within tenant context.
"""
from celery import Celery
from functools import wraps
celery_app = Celery('tasks')
def tenant_task(func):
"""
Decorator for tenant-aware Celery tasks.
Usage:
@celery_app.task
@tenant_task
def process_document(document_id):
# Runs in tenant context
doc = document_repo.find_by_id(document_id)
"""
@wraps(func)
def wrapper(tenant_id: str, *args, **kwargs):
# Load tenant context
tenant_ctx = load_tenant_sync(tenant_id)
if not tenant_ctx:
raise ValueError(f"Unknown tenant: {tenant_id}")
# Set context and run
with tenant_scope(tenant_ctx):
return func(*args, **kwargs)
return wrapper
# Example tasks
@celery_app.task
@tenant_task
def send_document_notification(document_id: str, user_ids: list):
"""
Send notifications about a document.
Note: tenant_id is automatically prepended by @tenant_task
"""
document = document_repo.find_by_id(document_id)
if not document:
return
for user_id in user_ids:
user = user_repo.find_by_id(user_id)
if user:
notification_service.send(
user=user,
template="document_shared",
context={"document": document}
)
@celery_app.task
@tenant_task
def generate_tenant_report():
"""
Generate usage report for current tenant.
"""
tenant = get_current_tenant()
stats = {
"tenant_id": tenant.tenant_id,
"user_count": user_repo.count(),
"document_count": document_repo.count(),
"storage_used": storage_service.get_usage(),
}
report_service.generate(stats)
# Scheduling tenant jobs
class TenantJobScheduler:
"""
Schedule jobs for all tenants or specific tenant.
"""
def __init__(self, tenant_service):
self.tenant_service = tenant_service
async def schedule_for_all_tenants(self, task, *args, **kwargs):
"""
Schedule a task to run for each tenant.
Useful for nightly batch jobs.
"""
tenants = await self.tenant_service.get_all_active_tenants()
for tenant in tenants:
# Include tenant_id as first argument
task.delay(tenant.tenant_id, *args, **kwargs)
async def schedule_for_tenant(self, tenant_id: str, task, *args, **kwargs):
"""Schedule a task for specific tenant."""
task.delay(tenant_id, *args, **kwargs)
Part III: Real-World Application
Chapter 7: Case Studies
7.1 Slack's Multi-Tenancy
SLACK'S ISOLATION ARCHITECTURE
Challenge:
├── Thousands of enterprise customers
├── Strict data isolation required
├── Real-time messaging at scale
├── Some customers need dedicated infrastructure
Solution: HYBRID APPROACH
Standard Tenants (Shared Infrastructure):
├── Logical isolation with workspace_id
├── All messages in shared Vitess cluster
├── Row-level filtering in application
├── Shared search infrastructure
Enterprise Grid (Enhanced Isolation):
├── Dedicated Vitess shards
├── Isolated search clusters
├── Customer-managed encryption keys
├── Can be in customer's preferred region
Key Innovations:
├── WORKSPACE AS TENANT BOUNDARY
│ └── Clear isolation unit
│
├── CHANNEL-BASED PARTITIONING
│ └── Data sharded by channel for performance
│
├── ENTERPRISE KEY MANAGEMENT (EKM)
│ └── Customer controls encryption keys
│ └── Slack can't read customer data
│
└── SLACK CONNECT
└── Controlled cross-tenant communication
└── Explicit consent from both sides
Lesson: Offer different isolation levels at different price points
7.2 Salesforce Multi-Tenancy
SALESFORCE'S ARCHITECTURE
Scale:
├── 150,000+ customers
├── Shared infrastructure
├── Custom objects and fields per tenant
├── 20+ years of evolution
Architecture:
┌───────────────────────────────────────────────────────────────────────┐
│ SALESFORCE MULTI-TENANT │
│ │
│ Application Layer: │
│ ├── Stateless app servers │
│ ├── Tenant context from session │
│ └── All queries include OrgId │
│ │
│ Metadata Layer: │
│ ├── Custom objects stored as metadata │
│ ├── Fields defined in metadata tables │
│ └── Runtime schema assembly per tenant │
│ │
│ Database Layer: │
│ ├── Oracle RAC clusters │
│ ├── OrgId in every table │
│ ├── Massive indexes on OrgId │
│ └── Data and metadata tables separate │
│ │
└───────────────────────────────────────────────────────────────────────┘
Key Innovations:
GOVERNOR LIMITS:
├── CPU time per transaction: 10 seconds
├── SOQL queries per transaction: 100
├── Records retrieved: 50,000
├── API calls per day: Based on licenses
└── Purpose: Prevent noisy neighbors
METADATA-DRIVEN CUSTOMIZATION:
├── Custom objects don't create real tables
├── Pivot tables store custom field data
├── Runtime query assembly
└── Allows unlimited customization
TRUST.SALESFORCE.COM:
├── Real-time system status
├── Performance metrics per instance
├── Transparency about incidents
└── Builds customer trust
Lesson: Strict resource limits enable massive multi-tenancy
7.3 AWS Multi-Tenancy Patterns
AWS'S GUIDANCE ON MULTI-TENANCY
AWS categorizes into "Silo" vs "Pool" models:
SILO MODEL (Dedicated Resources):
├── Separate AWS accounts per tenant
├── VPC isolation
├── Dedicated RDS instances
├── Independent scaling
├── Maximum isolation
└── Higher cost, simpler compliance
POOL MODEL (Shared Resources):
├── Single AWS account
├── Shared infrastructure
├── Tenant_id filtering
├── More efficient resource use
├── Lower cost, more complex
└── Requires careful design
BRIDGE MODEL (Hybrid):
├── Shared compute (ECS/EKS)
├── Isolated data stores
├── Tenant-specific encryption keys
├── Balance of cost and isolation
└── Most common for SaaS
AWS Recommendations:
├── Use AWS Organizations for account management
├── Use Resource Access Manager for sharing
├── Use KMS with per-tenant keys
├── Use CloudWatch with tenant dimensions
└── Use IAM policies for access control
Chapter 8: Common Mistakes
8.1 Tenant Isolation Mistakes
MULTI-TENANCY ANTI-PATTERNS
❌ MISTAKE 1: Missing WHERE Clause
Wrong:
# Forgot tenant filter!
users = db.query("SELECT * FROM users WHERE role = 'admin'")
Problem:
Returns admins from ALL tenants
Data breach waiting to happen
Right:
# Always use tenant-aware repository
users = await user_repo.find_all(filters={"role": "admin"})
# Repository adds tenant_id automatically
❌ MISTAKE 2: Trusting Client-Provided Tenant ID
Wrong:
tenant_id = request.headers.get("X-Tenant-ID")
# Trust it blindly
Problem:
Attacker sets X-Tenant-ID: victim_tenant
Access other tenant's data
Right:
# Derive tenant from authenticated user
tenant_id = request.user.tenant_id
# Or validate header against user's allowed tenants
❌ MISTAKE 3: Shared Cache Without Tenant Prefix
Wrong:
cache_key = f"user:{user_id}"
cached_user = redis.get(cache_key)
Problem:
User ID 123 exists in both tenants
Returns wrong tenant's user
Right:
tenant_id = get_current_tenant_id()
cache_key = f"tenant:{tenant_id}:user:{user_id}"
cached_user = redis.get(cache_key)
❌ MISTAKE 4: Logging PII Without Tenant Context
Wrong:
logger.info(f"User {user.email} logged in")
Problem:
No way to filter logs by tenant
Can't provide tenant-specific audit logs
Cross-tenant data in same log stream
Right:
logger.info(
"User logged in",
extra={
"tenant_id": get_current_tenant_id(),
"user_id": user.id, # Not email!
}
)
❌ MISTAKE 5: Background Jobs Without Tenant Context
Wrong:
@celery.task
def send_welcome_email(user_id):
user = User.get(user_id) # Which tenant?!
send_email(user.email)
Problem:
Job runs without tenant context
Query might fail or return wrong data
Right:
@celery.task
def send_welcome_email(tenant_id, user_id):
with tenant_scope(load_tenant(tenant_id)):
user = user_repo.find_by_id(user_id)
send_email(user.email)
8.2 Security Checklist
MULTI-TENANT SECURITY CHECKLIST
Before Production:
TENANT IDENTIFICATION:
├── [ ] Tenant derived from authenticated session, not headers
├── [ ] Tenant ID validated on every request
├── [ ] Admin endpoints require additional auth
└── [ ] API keys are tenant-scoped
DATA ACCESS:
├── [ ] All queries include tenant filter
├── [ ] Database has RLS enabled (PostgreSQL)
├── [ ] Cache keys include tenant prefix
├── [ ] File storage paths include tenant
└── [ ] Search indexes are tenant-scoped
BACKGROUND PROCESSING:
├── [ ] Jobs include tenant context
├── [ ] Queues are tenant-aware
├── [ ] Scheduled tasks run in tenant scope
└── [ ] Webhooks validated before processing
LOGGING AND MONITORING:
├── [ ] All logs include tenant_id
├── [ ] Metrics tagged with tenant
├── [ ] Alerts can be filtered by tenant
└── [ ] No PII in logs (use IDs)
TESTING:
├── [ ] Tests run in isolated tenant
├── [ ] Cross-tenant access tests exist
├── [ ] Penetration testing for tenant isolation
└── [ ] Automated security scans
Part IV: Interview Preparation
Chapter 9: Interview Tips
9.1 When to Discuss Multi-Tenancy
MULTI-TENANCY INTERVIEW TRIGGERS
"Design a SaaS platform for..."
├── Immediately ask about tenant count
├── Ask about enterprise requirements
└── Discuss isolation levels
"Design a system that stores user data..."
├── Ask: "Multiple organizations?"
├── If yes, tenant isolation is critical
└── Discuss data segregation
"How would you handle multiple customers?"
├── This IS a multi-tenancy question
├── Discuss isolation spectrum
└── Propose appropriate level
"Security requirements for B2B..."
├── Multi-tenancy is implicit
├── Enterprise customers want isolation
└── Compliance often requires segregation
9.2 Key Phrases
DISCUSSING MULTI-TENANCY IN INTERVIEWS
Opening:
"For multi-tenancy, the key decision is the isolation level.
This is a spectrum from shared tables with row-level filtering
to completely dedicated infrastructure per tenant."
Shared Tables:
"For a startup with SMB customers, shared tables with tenant_id
filtering is cost-effective. I'd use PostgreSQL Row-Level
Security for database-enforced isolation, plus application-level
filtering as defense in depth."
Schema Per Tenant:
"For mid-market SaaS with compliance requirements, schema-per-tenant
provides stronger isolation. Each tenant gets their own schema,
so a bug can't accidentally expose cross-tenant data."
Database Per Tenant:
"For enterprise customers requiring strict isolation, database-per-tenant
makes sense. It enables per-tenant backups, independent scaling,
and meets compliance requirements. The trade-off is operational
complexity and cost."
Hybrid Approach:
"Many successful SaaS platforms use a tiered approach: shared tables
for self-serve customers, and dedicated resources for enterprise
customers who pay for isolation. Slack does this with their
Enterprise Grid offering."
Chapter 10: Practice Problems
Problem 1: E-commerce SaaS Platform
Setup: Design a multi-tenant e-commerce platform where merchants can set up online stores. You have 10,000 merchants, ranging from small shops (100 orders/month) to large retailers (100,000 orders/month).
Requirements:
- Each merchant's data must be isolated
- Large merchants shouldn't affect small merchants
- Some merchants need dedicated infrastructure
Questions:
- What isolation level would you choose?
- How do you handle the 10x variation in scale?
- How do you migrate a merchant from shared to dedicated?
- Consider hybrid approach: shared for small, dedicated for large
- Shard by merchant_id for the shared tier
- Design migration path from shared to dedicated
- Use feature flags to control which tier a merchant uses
Problem 2: Healthcare SaaS
Setup: Design a multi-tenant EHR (Electronic Health Records) system for clinics. HIPAA compliance required.
Requirements:
- Strong data isolation (PHI)
- Audit logging of all access
- Some clinics part of same hospital network
Questions:
- Can you use shared tables for PHI?
- How do you handle clinic networks (parent-child tenants)?
- What's your audit logging strategy?
- PHI typically requires database-per-tenant or stronger
- Consider organization hierarchy in tenant model
- Audit logging at database level (triggers)
- Encryption with per-tenant keys
Chapter 11: Sample Interview Dialogue
Interviewer: "Design a project management SaaS like Asana. Multiple companies will use it."
You: "Great. This is a multi-tenant system. Let me start by understanding the scale and customer profile. How many organizations are we targeting? What's the range — startups to enterprises?"
Interviewer: "Let's say 50,000 organizations, from 5-person startups to 10,000-person enterprises."
You: "That range is important. For isolation, I'd propose a tiered approach:
Standard tier: Shared PostgreSQL with tenant_id in every table. I'd use Row-Level Security so the database enforces isolation. This works well for startups and SMBs — cost-effective and scales to tens of thousands of tenants.
Enterprise tier: For large organizations with compliance requirements, I'd offer schema-per-tenant or dedicated database options. This provides stronger isolation guarantees and enables features like customer-managed encryption keys.
Let me draw the data model..."
TENANT MODEL
tenants table:
├── id (UUID)
├── name
├── plan (standard, enterprise)
├── isolation_level (shared, schema, dedicated)
├── created_at
└── settings (JSONB)
For shared tier, all tables have:
├── tenant_id (indexed)
├── Composite indexes: (tenant_id, other_columns)
└── RLS policy: WHERE tenant_id = current_tenant()
Interviewer: "What if an enterprise customer runs a huge report that slows down the database?"
You: "Noisy neighbor problem — great question. Several defenses:
- Query timeouts: Limit query execution time per tenant
- Resource quotas: Track and limit database connections, CPU time
- Read replicas: Route heavy analytics to replicas
- For enterprise tier: Their dedicated database means they only affect themselves
For the shared tier, I'd also implement query analysis to flag expensive queries before they run."
Interviewer: "How do you handle tenant context across microservices?"
You: "Tenant context needs to propagate through the entire request path. I'd implement:
- API Gateway: Extracts tenant from subdomain or JWT, sets X-Tenant-ID header
- Service mesh: Propagates header automatically between services
- Each service: Middleware validates header, sets context variable
- Database layer: Uses context to set RLS variables
For async jobs, the message includes tenant_id, and the worker sets context before processing.
This ensures every database query, cache lookup, and log entry is tenant-scoped."
Chapter 12: Testing Multi-Tenant Systems
12.1 Isolation Testing
# tests/test_tenant_isolation.py
"""
Tests to verify tenant isolation.
These tests are CRITICAL - they verify that tenants can't see each other's data.
Run these in CI/CD and before every deployment.
"""
import pytest
from uuid import uuid4
class TestTenantIsolation:
"""
Test suite for verifying tenant isolation.
"""
@pytest.fixture
async def tenant_a(self, tenant_service):
"""Create test tenant A."""
return await tenant_service.create_tenant(
name="Test Tenant A",
plan="standard"
)
@pytest.fixture
async def tenant_b(self, tenant_service):
"""Create test tenant B."""
return await tenant_service.create_tenant(
name="Test Tenant B",
plan="standard"
)
async def test_tenant_cannot_see_other_tenant_data(
self,
tenant_a,
tenant_b,
document_service
):
"""
Verify that Tenant A cannot see Tenant B's documents.
"""
# Create document as Tenant A
with tenant_scope(tenant_a):
doc_a = await document_service.create_document(
CreateDocumentRequest(
title="Tenant A Secret",
content="This is confidential"
),
created_by_user_id="user_a"
)
# Try to access from Tenant B
with tenant_scope(tenant_b):
# Should not find Tenant A's document
doc = await document_service.get_document(doc_a.id)
assert doc is None, "Tenant B should not see Tenant A's document!"
# List should not include Tenant A's documents
docs = await document_service.list_documents()
doc_ids = [d.id for d in docs]
assert doc_a.id not in doc_ids, "Tenant A's document leaked to Tenant B!"
async def test_tenant_cannot_modify_other_tenant_data(
self,
tenant_a,
tenant_b,
document_repo
):
"""
Verify that Tenant A cannot modify Tenant B's documents.
"""
# Create document as Tenant A
with tenant_scope(tenant_a):
doc_a = await document_repo.create({
"title": "Original Title",
"content": "Original Content"
})
# Try to update from Tenant B
with tenant_scope(tenant_b):
result = await document_repo.update(
doc_a.id,
{"title": "Hacked Title"}
)
assert result is None, "Tenant B should not modify Tenant A's document!"
# Verify document unchanged
with tenant_scope(tenant_a):
doc = await document_repo.find_by_id(doc_a.id)
assert doc.title == "Original Title"
async def test_tenant_cannot_delete_other_tenant_data(
self,
tenant_a,
tenant_b,
document_repo
):
"""
Verify that Tenant A cannot delete Tenant B's documents.
"""
# Create document as Tenant A
with tenant_scope(tenant_a):
doc_a = await document_repo.create({
"title": "Important Document",
"content": "Do not delete"
})
# Try to delete from Tenant B
with tenant_scope(tenant_b):
deleted = await document_repo.delete(doc_a.id)
assert not deleted, "Tenant B should not delete Tenant A's document!"
# Verify document still exists
with tenant_scope(tenant_a):
doc = await document_repo.find_by_id(doc_a.id)
assert doc is not None
async def test_sql_injection_cannot_bypass_tenant(
self,
tenant_a,
tenant_b,
db_connection
):
"""
Verify that SQL injection attempts cannot bypass tenant isolation.
This tests the RLS policy directly.
"""
# Create document as Tenant A
with tenant_scope(tenant_a):
# Attempt SQL injection in search query
malicious_query = "'; SELECT * FROM documents; --"
# This should not return Tenant B's documents
# If RLS is working, it's impossible to bypass
docs = await document_service.search_documents(malicious_query)
for doc in docs:
assert doc.tenant_id == tenant_a.tenant_id
async def test_cache_isolation(
self,
tenant_a,
tenant_b,
document_service,
cache
):
"""
Verify that cached data is tenant-isolated.
"""
# Create and cache document as Tenant A
with tenant_scope(tenant_a):
doc_a = await document_service.create_document(
CreateDocumentRequest(title="Cached Doc", content="Secret"),
created_by_user_id="user_a"
)
# Access to populate cache
await document_service.get_document(doc_a.id)
# Verify cache key is tenant-scoped
cache_key_a = f"tenant:{tenant_a.tenant_id}:doc:{doc_a.id}"
assert await cache.exists(cache_key_a)
# Tenant B should not hit this cache
with tenant_scope(tenant_b):
cache_key_b = f"tenant:{tenant_b.tenant_id}:doc:{doc_a.id}"
assert not await cache.exists(cache_key_b)
class TestTenantContextPropagation:
"""
Test that tenant context flows correctly through the system.
"""
async def test_context_available_in_service(self, tenant_a, document_service):
"""Verify tenant context is available in service layer."""
with tenant_scope(tenant_a):
# Service should be able to get current tenant
tenant = get_current_tenant()
assert tenant.tenant_id == tenant_a.tenant_id
async def test_context_propagates_to_repository(self, tenant_a, document_repo):
"""Verify tenant context propagates to repository."""
with tenant_scope(tenant_a):
# Repository should automatically use current tenant
doc = await document_repo.create({"title": "Test", "content": "Test"})
assert doc.tenant_id == tenant_a.tenant_id
async def test_no_context_raises_error(self, document_service):
"""Verify operations fail without tenant context."""
# Outside tenant scope
with pytest.raises(TenantContextError):
await document_service.list_documents()
async def test_context_cleared_after_scope(self, tenant_a):
"""Verify context is cleared after scope exits."""
with tenant_scope(tenant_a):
assert get_current_tenant().tenant_id == tenant_a.tenant_id
# After scope, context should be cleared
with pytest.raises(TenantContextError):
get_current_tenant()
12.2 Load Testing Multi-Tenant Systems
# tests/load_test_tenants.py
"""
Load testing for multi-tenant systems.
Verify that:
1. Multiple tenants can operate concurrently
2. One tenant's load doesn't affect others
3. System handles tenant context correctly under load
"""
import asyncio
from locust import HttpUser, task, between
class TenantLoadTest(HttpUser):
"""
Locust load test simulating multiple tenants.
"""
wait_time = between(1, 3)
def on_start(self):
"""Setup - assign this user to a random tenant."""
import random
self.tenant_id = f"tenant_{random.randint(1, 100)}"
self.headers = {"X-Tenant-ID": self.tenant_id}
@task(3)
def list_documents(self):
"""List documents for this tenant."""
self.client.get("/api/documents", headers=self.headers)
@task(2)
def create_document(self):
"""Create a document for this tenant."""
self.client.post(
"/api/documents",
json={"title": "Load Test Doc", "content": "Test content"},
headers=self.headers
)
@task(1)
def get_document(self):
"""Get a specific document."""
self.client.get(f"/api/documents/doc_1", headers=self.headers)
async def verify_isolation_under_load():
"""
Verify tenant isolation while system is under load.
Run this while load test is active.
"""
tenants = ["tenant_1", "tenant_2", "tenant_3"]
async def check_tenant(tenant_id):
"""Verify tenant only sees their own data."""
async with httpx.AsyncClient() as client:
response = await client.get(
"http://localhost:8000/api/documents",
headers={"X-Tenant-ID": tenant_id}
)
docs = response.json()["documents"]
for doc in docs:
assert doc["tenant_id"] == tenant_id, \
f"Tenant {tenant_id} saw document from {doc['tenant_id']}!"
# Check all tenants concurrently
await asyncio.gather(*[check_tenant(t) for t in tenants])
print("Isolation verified under load!")
Summary
DAY 1 KEY TAKEAWAYS
ISOLATION SPECTRUM:
├── Shared tables: Lowest cost, application-enforced
├── Row-level security: Database-enforced, same tables
├── Schema per tenant: Stronger isolation, same server
├── Database per tenant: Physical isolation, higher cost
└── Dedicated infrastructure: Complete isolation, highest cost
TENANT CONTEXT:
├── Extract from auth, not client headers
├── Propagate through all layers
├── Include in async jobs and messages
├── Tag all logs and metrics
ROW-LEVEL SECURITY:
├── PostgreSQL RLS is powerful
├── Defense in depth with application filtering
├── Set context per connection
└── Test extensively
HYBRID APPROACH:
├── Different isolation for different tiers
├── Self-serve: Shared infrastructure
├── Enterprise: Dedicated resources
├── Migration path between tiers
DEFAULT CHOICE:
├── Start with shared tables + RLS
├── Add schema separation for compliance
├── Offer dedicated for enterprise deals
└── Never compromise on context propagation
Further Reading
Documentation:
- PostgreSQL Row-Level Security: https://www.postgresql.org/docs/current/ddl-rowsecurity.html
- AWS Multi-Tenant SaaS: https://docs.aws.amazon.com/wellarchitected/latest/saas-lens/
Engineering Blogs:
- Slack Engineering: "Scaling Slack's Database"
- Salesforce Architecture: "The Metadata-Driven Architecture"
Books:
- "Building Multi-Tenant SaaS Architectures" (AWS)
End of Day 1: Tenant Isolation Strategies
Tomorrow: Day 2 — Noisy Neighbor Prevention. We'll learn how to stop one tenant from ruining everyone else's experience with quotas, rate limiting, and fair scheduling.