Himanshu Kukreja
0%
LearnSystem DesignWeek 9Tenant Isolation
Day 01

Week 9 — Day 1: Tenant Isolation Strategies

System Design Mastery Series — Multi-Tenancy, Security, and Compliance Week


Preface

You've built an amazing SaaS product. Users love it. Growth is exploding.

Then your first enterprise customer shows up:

THE ENTERPRISE REALITY

Sales team: "Great news! We just closed Acme Corp — $500K ARR!"

Acme Corp's security team sends their requirements:

┌────────────────────────────────────────────────────────────────────────┐
│                                                                        │
│  ACME CORP SECURITY REQUIREMENTS                                       │
│                                                                        │
│  1. "Our data must be completely isolated from other customers"        │
│                                                                        │
│  2. "We need audit logs of every access to our data"                   │
│                                                                        │
│  3. "Can you prove that your engineers can't see our data?"            │
│                                                                        │
│  4. "What happens if another customer's bug affects our instance?"     │
│                                                                        │
│  5. "We require SOC 2 Type II and annual penetration testing"          │
│                                                                        │
└────────────────────────────────────────────────────────────────────────┘

You look at your architecture:

  Current state:
  ├── Single PostgreSQL database
  ├── All customers in same tables
  ├── WHERE tenant_id = ? everywhere (hopefully)
  ├── Shared Redis cache
  └── One Elasticsearch cluster

  Question: Can you prove isolation?
  Answer: Not really.

This is the multi-tenancy challenge.

Today, we'll learn to design systems where thousands of customers share infrastructure safely — from startups to enterprises.


Part I: Foundations

Chapter 1: What Is Multi-Tenancy?

1.1 The Simple Definition

Multi-tenancy is an architecture where a single instance of software serves multiple customers (tenants), with each tenant's data and configuration isolated from others.

MULTI-TENANCY VISUALIZATION

Single-Tenant (old world):
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│  Customer A │  │  Customer B │  │  Customer C │
│  ┌───────┐  │  │  ┌───────┐  │  │  ┌───────┐  │
│  │ App   │  │  │  │ App   │  │  │  │ App   │  │
│  │ Server│  │  │  │ Server│  │  │  │ Server│  │
│  └───────┘  │  │  └───────┘  │  │  └───────┘  │
│  ┌───────┐  │  │  ┌───────┐  │  │  ┌───────┐  │
│  │  DB   │  │  │  │  DB   │  │  │  │  DB   │  │
│  └───────┘  │  │  └───────┘  │  │  └───────┘  │
└─────────────┘  └─────────────┘  └─────────────┘
     $500/mo         $500/mo         $500/mo

Cost per customer: High
Isolation: Perfect
Scalability: Limited


Multi-Tenant (modern SaaS):
┌────────────────────────────────────────────────┐
│              Shared Infrastructure             │
│  ┌─────────────────────────────────────────┐   │
│  │            Application Servers          │   │
│  │    Tenant A │ Tenant B │ Tenant C       │   │
│  └─────────────────────────────────────────┘   │
│  ┌────────────────────────────────────────┐    │
│  │              Shared Database           │    │
│  │    ┌─────┐ ┌─────┐ ┌─────┐             │    │
│  │    │  A  │ │  B  │ │  C  │  (isolated) │    │
│  │    └─────┘ └─────┘ └─────┘             │    │
│  └────────────────────────────────────────┘    │
└────────────────────────────────────────────────┘
     $50/mo      $50/mo      $50/mo

Cost per customer: Low
Isolation: Depends on implementation
Scalability: High

1.2 Why Multi-Tenancy Matters

BUSINESS DRIVERS

Cost Efficiency:
├── Shared infrastructure = lower cost per customer
├── $50/month pricing impossible with dedicated infrastructure
├── Enables SMB and self-serve markets
└── Cloud-native economics

Operational Efficiency:
├── One codebase to maintain
├── One deployment to manage
├── Updates roll out to everyone
└── Simpler monitoring and debugging

Scalability:
├── Add customers without adding servers 1:1
├── Better resource utilization
├── Elastic scaling across tenants
└── Standard SaaS economics

THE CHALLENGE:
├── How do you prevent Tenant A from seeing Tenant B's data?
├── How do you prevent Tenant A from affecting Tenant B's performance?
├── How do you meet enterprise security requirements?
└── How do you handle different compliance needs per tenant?

1.3 The Isolation Spectrum

ISOLATION LEVELS

Level 0: NO ISOLATION (Don't do this)
─────────────────────────────────────
All data in same tables, no tenant identifier
└── Security: None
└── Use case: Never

Level 1: SHARED DATABASE, SHARED SCHEMA
─────────────────────────────────────────
Same database, same tables, tenant_id column
└── Isolation: Logical (application-enforced)
└── Security: Low-Medium
└── Use case: Startups, SMB SaaS

Level 2: SHARED DATABASE, SEPARATE SCHEMAS
──────────────────────────────────────────
Same database server, schema per tenant
└── Isolation: Logical (database-enforced)
└── Security: Medium
└── Use case: Growing SaaS, some enterprise

Level 3: SEPARATE DATABASES
───────────────────────────
Database instance per tenant (same server cluster)
└── Isolation: Physical at database level
└── Security: Medium-High
└── Use case: Enterprise SaaS

Level 4: SEPARATE INFRASTRUCTURE
────────────────────────────────
Dedicated servers, network isolation
└── Isolation: Physical
└── Security: High
└── Use case: Regulated industries, large enterprise

Level 5: SEPARATE CLOUD ACCOUNTS
────────────────────────────────
Different AWS/GCP accounts per tenant
└── Isolation: Complete
└── Security: Highest
└── Use case: Government, healthcare, finance

Chapter 2: Isolation Patterns Deep Dive

2.1 Pattern 1: Shared Tables with Tenant ID

SHARED TABLES PATTERN

Database Schema:
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│  users table:                                                         │
│  ┌────────────┬────────────┬────────────┬────────────┬──────────────┐ │
│  │ id         │ tenant_id  │ email      │ name       │ created_at   │ │
│  ├────────────┼────────────┼────────────┼────────────┼──────────────┤ │
│  │ 1          │ acme       │ a@acme.com │ Alice      │ 2024-01-01   │ │
│  │ 2          │ acme       │ b@acme.com │ Bob        │ 2024-01-02   │ │
│  │ 3          │ globex     │ c@globex.. │ Carol      │ 2024-01-03   │ │
│  │ 4          │ initech    │ d@inite... │ Dave       │ 2024-01-04   │ │
│  └────────────┴────────────┴────────────┴────────────┴──────────────┘ │
│                                                                       │
│  Every table has tenant_id column                                     │
│  Every query MUST include WHERE tenant_id = ?                         │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

Pros:
├── Simplest to implement
├── Lowest cost (one database)
├── Easy cross-tenant operations (admin)
├── Simple backup/restore
└── Works well up to thousands of tenants

Cons:
├── Risk of data leakage (forgot WHERE clause)
├── No native database isolation
├── Noisy neighbor at database level
├── Hard to give tenant their own backup
└── Enterprise customers may reject this

2.2 Pattern 2: Schema Per Tenant

SCHEMA PER TENANT PATTERN

Database Structure:
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│  PostgreSQL Database                                                  │
│  │                                                                    │
│  ├── Schema: tenant_acme                                              │
│  │   ├── users                                                        │
│  │   ├── orders                                                       │
│  │   └── settings                                                     │
│  │                                                                    │
│  ├── Schema: tenant_globex                                            │
│  │   ├── users                                                        │
│  │   ├── orders                                                       │
│  │   └── settings                                                     │
│  │                                                                    │
│  └── Schema: tenant_initech                                           │
│      ├── users                                                        │
│      ├── orders                                                       │
│      └── settings                                                     │
│                                                                       │
│  Connection sets search_path per tenant:                              │
│  SET search_path TO tenant_acme;                                      │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

Pros:
├── Database-level isolation
├── Can't accidentally query wrong tenant
├── Per-tenant backup possible
├── Schema migrations per tenant possible
├── Better security posture
└── PostgreSQL RLS can add extra layer

Cons:
├── Schema proliferation (10K tenants = 10K schemas)
├── Migrations more complex
├── Connection management harder
├── Cross-tenant queries need explicit schema
└── Some ORMs don't handle this well

2.3 Pattern 3: Database Per Tenant

DATABASE PER TENANT PATTERN

Infrastructure Structure:
┌───────────────────────────────────────────────────────────────────────┐
│                                                                       │
│  Database Cluster                                                     │
│  │                                                                    │
│  ├── Database: db_acme                                                │
│  │   └── (all acme tables)                                            │
│  │                                                                    │
│  ├── Database: db_globex                                              │
│  │   └── (all globex tables)                                          │
│  │                                                                    │
│  └── Database: db_initech                                             │
│      └── (all initech tables)                                         │
│                                                                       │
│  Each tenant gets dedicated connection string:                        │
│  acme: postgresql://cluster/db_acme                                   │
│  globex: postgresql://cluster/db_globex                               │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

Pros:
├── Strong isolation at database level
├── Independent backup/restore per tenant
├── Can put large tenants on dedicated hardware
├── Easier compliance (data residency)
├── Can offer "dedicated" tier at higher price
└── Clear resource accounting per tenant

Cons:
├── Connection pool per tenant
├── Higher operational complexity
├── Schema migrations across all databases
├── Monitoring complexity increases
└── Cost increases with tenant count

2.4 Pattern Comparison

Aspect Shared Tables Schema/Tenant DB/Tenant
Cost Lowest Low-Medium Medium-High
Isolation Application Database Physical
Max Tenants Millions Thousands Hundreds
Complexity Low Medium High
Backup Granularity All or nothing Per schema Per tenant
Compliance Basic Better Best
Enterprise Ready Rarely Sometimes Yes
Data Residency Difficult Possible Easy

Chapter 3: Tenant Context and Propagation

3.1 The Tenant Context Problem

TENANT CONTEXT CHALLENGE

Request Flow:
┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│  Client  │────▶│   API    │────▶│ Service  │────▶│ Database │
│          │     │ Gateway  │     │          │     │          │
└──────────┘     └──────────┘     └──────────┘     └──────────┘
                      │                │                │
                      ▼                ▼                ▼
                 tenant_id?       tenant_id?       tenant_id?

Questions:
├── How does the API Gateway know which tenant?
├── How does the service know which tenant?
├── How does the database query know which tenant?
├── How do async jobs know which tenant?
├── How do background workers know which tenant?

Without consistent tenant context:
├── Queries might miss WHERE clause
├── Logs might not show tenant
├── Metrics not attributed correctly
├── Debugging becomes impossible
└── Security incidents undetectable

3.2 Tenant Context Implementation

# tenant_isolation/context.py

"""
Tenant context management for multi-tenant applications.

The tenant context must flow through every layer:
- HTTP request → Service → Database → Cache → Queue → Background job
"""

from contextvars import ContextVar
from dataclasses import dataclass
from typing import Optional
import functools


# Thread-safe tenant context using contextvars
_tenant_context: ContextVar[Optional['TenantContext']] = ContextVar(
    'tenant_context', 
    default=None
)


@dataclass(frozen=True)
class TenantContext:
    """
    Immutable tenant context that flows through the application.
    
    Frozen to prevent accidental modification.
    """
    tenant_id: str
    tenant_name: str
    plan: str  # 'free', 'pro', 'enterprise'
    region: str  # For data residency
    features: frozenset  # Enabled features
    
    # Security context
    isolation_level: str  # 'shared', 'schema', 'database'
    encryption_key_id: Optional[str] = None  # For tenant-specific encryption


def get_current_tenant() -> TenantContext:
    """
    Get the current tenant context.
    
    Raises TenantContextError if not in tenant context.
    """
    ctx = _tenant_context.get()
    if ctx is None:
        raise TenantContextError("No tenant context set. This is a bug!")
    return ctx


def get_current_tenant_id() -> str:
    """Convenience method to get just the tenant ID."""
    return get_current_tenant().tenant_id


def set_tenant_context(ctx: TenantContext) -> None:
    """Set the tenant context for the current execution."""
    _tenant_context.set(ctx)


def clear_tenant_context() -> None:
    """Clear the tenant context."""
    _tenant_context.set(None)


class TenantContextError(Exception):
    """Raised when tenant context is missing or invalid."""
    pass


# Context manager for tenant scope
class tenant_scope:
    """
    Context manager for executing code in a tenant's context.
    
    Usage:
        with tenant_scope(tenant_ctx):
            # All code here runs in tenant context
            users = user_service.get_users()  # Automatically filtered
    """
    
    def __init__(self, tenant: TenantContext):
        self.tenant = tenant
        self.previous = None
    
    def __enter__(self):
        self.previous = _tenant_context.get()
        _tenant_context.set(self.tenant)
        return self.tenant
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        _tenant_context.set(self.previous)
        return False


# Decorator for tenant-required functions
def requires_tenant(func):
    """
    Decorator that ensures function runs within tenant context.
    
    Usage:
        @requires_tenant
        def get_user_data():
            tenant = get_current_tenant()
            # ...
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        if _tenant_context.get() is None:
            raise TenantContextError(
                f"Function {func.__name__} requires tenant context"
            )
        return func(*args, **kwargs)
    return wrapper


# Async version
def requires_tenant_async(func):
    """Async version of requires_tenant decorator."""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        if _tenant_context.get() is None:
            raise TenantContextError(
                f"Function {func.__name__} requires tenant context"
            )
        return await func(*args, **kwargs)
    return wrapper

3.3 Extracting Tenant from Requests

# tenant_isolation/middleware.py

"""
Middleware for extracting and setting tenant context from HTTP requests.
"""

from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import logging

logger = logging.getLogger(__name__)


class TenantMiddleware(BaseHTTPMiddleware):
    """
    Middleware that extracts tenant context from requests.
    
    Tenant can be identified via:
    1. Subdomain: acme.myapp.com → tenant_id = 'acme'
    2. Header: X-Tenant-ID: acme
    3. JWT claim: token.tenant_id = 'acme'
    4. Path: /api/tenants/acme/users
    """
    
    def __init__(self, app, tenant_service):
        super().__init__(app)
        self.tenant_service = tenant_service
    
    async def dispatch(self, request: Request, call_next):
        # Skip tenant resolution for public endpoints
        if self._is_public_endpoint(request.url.path):
            return await call_next(request)
        
        # Extract tenant identifier
        tenant_id = await self._extract_tenant_id(request)
        
        if not tenant_id:
            raise HTTPException(
                status_code=400,
                detail="Tenant identification required"
            )
        
        # Load tenant context from database/cache
        tenant_ctx = await self.tenant_service.get_tenant_context(tenant_id)
        
        if not tenant_ctx:
            raise HTTPException(
                status_code=404,
                detail=f"Tenant '{tenant_id}' not found"
            )
        
        if not tenant_ctx.is_active:
            raise HTTPException(
                status_code=403,
                detail="Tenant account is suspended"
            )
        
        # Set context for this request
        with tenant_scope(tenant_ctx):
            # Add tenant to request state for logging
            request.state.tenant_id = tenant_id
            
            # Process request
            response = await call_next(request)
            
            # Add tenant to response headers (for debugging)
            response.headers["X-Tenant-ID"] = tenant_id
            
            return response
    
    async def _extract_tenant_id(self, request: Request) -> Optional[str]:
        """
        Extract tenant ID from request using multiple strategies.
        """
        
        # Strategy 1: Subdomain
        host = request.headers.get("host", "")
        if "." in host:
            subdomain = host.split(".")[0]
            if subdomain not in ["www", "api", "app"]:
                return subdomain
        
        # Strategy 2: Header
        tenant_header = request.headers.get("X-Tenant-ID")
        if tenant_header:
            return tenant_header
        
        # Strategy 3: JWT claim (if authenticated)
        if hasattr(request.state, "user"):
            return request.state.user.tenant_id
        
        # Strategy 4: Query parameter (for webhooks)
        tenant_param = request.query_params.get("tenant_id")
        if tenant_param:
            return tenant_param
        
        return None
    
    def _is_public_endpoint(self, path: str) -> bool:
        """Check if endpoint doesn't require tenant context."""
        public_paths = [
            "/health",
            "/metrics",
            "/api/auth/login",
            "/api/auth/register",
            "/api/tenants/create",  # Creating new tenant
        ]
        return any(path.startswith(p) for p in public_paths)


class TenantService:
    """
    Service for loading and caching tenant configuration.
    """
    
    def __init__(self, db, cache):
        self.db = db
        self.cache = cache
    
    async def get_tenant_context(self, tenant_id: str) -> Optional[TenantContext]:
        """
        Load tenant context, with caching.
        """
        
        # Check cache first
        cache_key = f"tenant:{tenant_id}"
        cached = await self.cache.get(cache_key)
        
        if cached:
            return TenantContext(**cached)
        
        # Load from database
        tenant = await self.db.fetchone(
            """
            SELECT 
                id, name, plan, region, features,
                isolation_level, encryption_key_id, is_active
            FROM tenants
            WHERE id = $1
            """,
            tenant_id
        )
        
        if not tenant:
            return None
        
        ctx = TenantContext(
            tenant_id=tenant["id"],
            tenant_name=tenant["name"],
            plan=tenant["plan"],
            region=tenant["region"],
            features=frozenset(tenant["features"]),
            isolation_level=tenant["isolation_level"],
            encryption_key_id=tenant["encryption_key_id"]
        )
        
        # Cache for 5 minutes
        await self.cache.set(cache_key, ctx.__dict__, ttl=300)
        
        return ctx

Part II: Implementation

Chapter 4: Row-Level Security Implementation

4.1 Application-Level Enforcement

# tenant_isolation/repository.py

"""
Repository pattern with automatic tenant filtering.

All database access goes through repositories that automatically
apply tenant filtering.
"""

from typing import TypeVar, Generic, List, Optional
from dataclasses import dataclass
import logging

logger = logging.getLogger(__name__)

T = TypeVar('T')


class TenantAwareRepository(Generic[T]):
    """
    Base repository that automatically filters by tenant.
    
    All queries include tenant_id filter.
    All inserts include tenant_id.
    """
    
    def __init__(self, db, table_name: str, model_class: type):
        self.db = db
        self.table_name = table_name
        self.model_class = model_class
    
    async def find_by_id(self, id: str) -> Optional[T]:
        """
        Find record by ID within current tenant.
        """
        tenant_id = get_current_tenant_id()
        
        row = await self.db.fetchone(
            f"""
            SELECT * FROM {self.table_name}
            WHERE id = $1 AND tenant_id = $2
            """,
            id, tenant_id
        )
        
        if not row:
            return None
        
        return self.model_class(**row)
    
    async def find_all(
        self,
        filters: dict = None,
        limit: int = 100,
        offset: int = 0
    ) -> List[T]:
        """
        Find all records within current tenant.
        """
        tenant_id = get_current_tenant_id()
        
        # Build query with tenant filter always first
        query = f"SELECT * FROM {self.table_name} WHERE tenant_id = $1"
        params = [tenant_id]
        param_idx = 2
        
        if filters:
            for key, value in filters.items():
                query += f" AND {key} = ${param_idx}"
                params.append(value)
                param_idx += 1
        
        query += f" LIMIT ${param_idx} OFFSET ${param_idx + 1}"
        params.extend([limit, offset])
        
        rows = await self.db.fetch(query, *params)
        return [self.model_class(**row) for row in rows]
    
    async def create(self, data: dict) -> T:
        """
        Create record within current tenant.
        """
        tenant_id = get_current_tenant_id()
        
        # Always set tenant_id
        data["tenant_id"] = tenant_id
        
        columns = ", ".join(data.keys())
        placeholders = ", ".join(f"${i+1}" for i in range(len(data)))
        
        row = await self.db.fetchone(
            f"""
            INSERT INTO {self.table_name} ({columns})
            VALUES ({placeholders})
            RETURNING *
            """,
            *data.values()
        )
        
        return self.model_class(**row)
    
    async def update(self, id: str, data: dict) -> Optional[T]:
        """
        Update record within current tenant.
        """
        tenant_id = get_current_tenant_id()
        
        # Remove tenant_id from update data (can't change tenant)
        data.pop("tenant_id", None)
        
        set_clauses = ", ".join(
            f"{key} = ${i+1}" for i, key in enumerate(data.keys())
        )
        
        row = await self.db.fetchone(
            f"""
            UPDATE {self.table_name}
            SET {set_clauses}
            WHERE id = ${len(data)+1} AND tenant_id = ${len(data)+2}
            RETURNING *
            """,
            *data.values(), id, tenant_id
        )
        
        if not row:
            return None
        
        return self.model_class(**row)
    
    async def delete(self, id: str) -> bool:
        """
        Delete record within current tenant.
        """
        tenant_id = get_current_tenant_id()
        
        result = await self.db.execute(
            f"""
            DELETE FROM {self.table_name}
            WHERE id = $1 AND tenant_id = $2
            """,
            id, tenant_id
        )
        
        return result.rowcount > 0
    
    async def count(self, filters: dict = None) -> int:
        """
        Count records within current tenant.
        """
        tenant_id = get_current_tenant_id()
        
        query = f"SELECT COUNT(*) FROM {self.table_name} WHERE tenant_id = $1"
        params = [tenant_id]
        
        if filters:
            for i, (key, value) in enumerate(filters.items()):
                query += f" AND {key} = ${i+2}"
                params.append(value)
        
        result = await self.db.fetchone(query, *params)
        return result["count"]


# Example usage
@dataclass
class User:
    id: str
    tenant_id: str
    email: str
    name: str
    role: str


class UserRepository(TenantAwareRepository[User]):
    def __init__(self, db):
        super().__init__(db, "users", User)
    
    async def find_by_email(self, email: str) -> Optional[User]:
        """Custom query - still tenant-scoped."""
        tenant_id = get_current_tenant_id()
        
        row = await self.db.fetchone(
            """
            SELECT * FROM users
            WHERE email = $1 AND tenant_id = $2
            """,
            email, tenant_id
        )
        
        if not row:
            return None
        
        return User(**row)

4.2 Database-Level Row Security (PostgreSQL RLS)

# tenant_isolation/database_rls.py

"""
PostgreSQL Row-Level Security setup for multi-tenant isolation.

RLS provides database-enforced tenant isolation - even if
application code has a bug, the database won't return wrong data.
"""

RLS_SETUP_SQL = """
-- =============================================================================
-- ROW-LEVEL SECURITY SETUP
-- =============================================================================

-- Enable RLS on tenant tables
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Force RLS even for table owners (important!)
ALTER TABLE users FORCE ROW LEVEL SECURITY;
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
ALTER TABLE documents FORCE ROW LEVEL SECURITY;


-- =============================================================================
-- TENANT CONTEXT FUNCTIONS
-- =============================================================================

-- Function to get current tenant from session variable
CREATE OR REPLACE FUNCTION current_tenant_id() 
RETURNS TEXT AS $$
BEGIN
    RETURN current_setting('app.current_tenant', true);
END;
$$ LANGUAGE plpgsql STABLE;


-- =============================================================================
-- ROW-LEVEL SECURITY POLICIES
-- =============================================================================

-- Users table policies
CREATE POLICY tenant_isolation_users ON users
    FOR ALL
    TO application_role
    USING (tenant_id = current_tenant_id())
    WITH CHECK (tenant_id = current_tenant_id());

-- Orders table policies  
CREATE POLICY tenant_isolation_orders ON orders
    FOR ALL
    TO application_role
    USING (tenant_id = current_tenant_id())
    WITH CHECK (tenant_id = current_tenant_id());

-- Documents table policies
CREATE POLICY tenant_isolation_documents ON documents
    FOR ALL
    TO application_role
    USING (tenant_id = current_tenant_id())
    WITH CHECK (tenant_id = current_tenant_id());


-- =============================================================================
-- ADMIN BYPASS POLICY (for support/admin operations)
-- =============================================================================

-- Admin role can see all data (use with extreme caution!)
CREATE POLICY admin_bypass_users ON users
    FOR ALL
    TO admin_role
    USING (true)
    WITH CHECK (true);

-- Note: Admin access should be heavily audited
"""


class RLSConnectionManager:
    """
    Manages database connections with RLS tenant context.
    
    Sets the tenant context on each connection before use.
    """
    
    def __init__(self, pool):
        self.pool = pool
    
    async def get_connection(self):
        """
        Get a connection with tenant context set.
        """
        conn = await self.pool.acquire()
        
        # Set tenant context for RLS
        tenant_id = get_current_tenant_id()
        await conn.execute(
            f"SET app.current_tenant = '{tenant_id}'"
        )
        
        return conn
    
    async def release_connection(self, conn):
        """
        Release connection and clear tenant context.
        """
        # Clear tenant context before returning to pool
        await conn.execute("RESET app.current_tenant")
        await self.pool.release(conn)


class TenantAwareConnectionPool:
    """
    Connection pool that automatically sets tenant context.
    
    Usage:
        async with pool.connection() as conn:
            # tenant_id is automatically set
            result = await conn.fetch("SELECT * FROM users")
            # RLS filters to current tenant
    """
    
    def __init__(self, dsn: str):
        self.dsn = dsn
        self._pool = None
    
    async def initialize(self):
        """Create the connection pool."""
        import asyncpg
        self._pool = await asyncpg.create_pool(
            self.dsn,
            min_size=5,
            max_size=20,
            setup=self._setup_connection
        )
    
    async def _setup_connection(self, conn):
        """Called for each new connection."""
        # Connection starts with no tenant context
        pass
    
    async def connection(self):
        """Get connection context manager with tenant set."""
        return TenantConnection(self._pool)


class TenantConnection:
    """Async context manager for tenant-scoped connections."""
    
    def __init__(self, pool):
        self.pool = pool
        self.conn = None
    
    async def __aenter__(self):
        self.conn = await self.pool.acquire()
        
        # Set tenant context
        try:
            tenant_id = get_current_tenant_id()
            await self.conn.execute(
                "SELECT set_config('app.current_tenant', $1, true)",
                tenant_id
            )
        except TenantContextError:
            # No tenant context - queries will fail RLS
            # This is intentional - forces proper context
            pass
        
        return self.conn
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            # Clear tenant context
            await self.conn.execute(
                "SELECT set_config('app.current_tenant', '', true)"
            )
            await self.pool.release(self.conn)
        return False

Chapter 5: Schema-Per-Tenant Implementation

5.1 Dynamic Schema Management

# tenant_isolation/schema_manager.py

"""
Schema-per-tenant implementation.

Each tenant gets their own database schema with identical structure.
Provides stronger isolation than shared tables.
"""

import logging
from typing import List

logger = logging.getLogger(__name__)


class SchemaPerTenantManager:
    """
    Manages schema-per-tenant database architecture.
    """
    
    def __init__(self, admin_pool, schema_prefix: str = "tenant_"):
        self.admin_pool = admin_pool
        self.schema_prefix = schema_prefix
    
    async def create_tenant_schema(self, tenant_id: str):
        """
        Create a new schema for a tenant.
        
        Called during tenant onboarding.
        """
        schema_name = f"{self.schema_prefix}{tenant_id}"
        
        async with self.admin_pool.acquire() as conn:
            # Create schema
            await conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
            
            # Create tables in schema
            await self._create_schema_tables(conn, schema_name)
            
            # Grant permissions
            await conn.execute(f"""
                GRANT USAGE ON SCHEMA {schema_name} TO app_user
            """)
            await conn.execute(f"""
                GRANT ALL ON ALL TABLES IN SCHEMA {schema_name} TO app_user
            """)
            
            logger.info(f"Created schema for tenant: {tenant_id}")
    
    async def _create_schema_tables(self, conn, schema_name: str):
        """Create standard tables in tenant schema."""
        
        await conn.execute(f"""
            CREATE TABLE {schema_name}.users (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                email VARCHAR(255) UNIQUE NOT NULL,
                name VARCHAR(255) NOT NULL,
                role VARCHAR(50) DEFAULT 'member',
                created_at TIMESTAMP DEFAULT NOW(),
                updated_at TIMESTAMP DEFAULT NOW()
            )
        """)
        
        await conn.execute(f"""
            CREATE TABLE {schema_name}.documents (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                title VARCHAR(500) NOT NULL,
                content TEXT,
                created_by UUID REFERENCES {schema_name}.users(id),
                created_at TIMESTAMP DEFAULT NOW(),
                updated_at TIMESTAMP DEFAULT NOW()
            )
        """)
        
        await conn.execute(f"""
            CREATE TABLE {schema_name}.settings (
                key VARCHAR(255) PRIMARY KEY,
                value JSONB,
                updated_at TIMESTAMP DEFAULT NOW()
            )
        """)
    
    async def delete_tenant_schema(self, tenant_id: str):
        """
        Delete a tenant's schema.
        
        Called during tenant offboarding. Requires confirmation!
        """
        schema_name = f"{self.schema_prefix}{tenant_id}"
        
        async with self.admin_pool.acquire() as conn:
            # CASCADE drops all objects in schema
            await conn.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
            
            logger.info(f"Deleted schema for tenant: {tenant_id}")
    
    async def get_all_tenant_schemas(self) -> List[str]:
        """Get list of all tenant schemas."""
        
        async with self.admin_pool.acquire() as conn:
            rows = await conn.fetch(f"""
                SELECT schema_name 
                FROM information_schema.schemata
                WHERE schema_name LIKE '{self.schema_prefix}%'
            """)
            
            return [row["schema_name"] for row in rows]
    
    async def run_migration(self, migration_sql: str):
        """
        Run migration across all tenant schemas.
        """
        schemas = await self.get_all_tenant_schemas()
        
        async with self.admin_pool.acquire() as conn:
            for schema in schemas:
                try:
                    # Set search path to tenant schema
                    await conn.execute(f"SET search_path TO {schema}")
                    
                    # Run migration
                    await conn.execute(migration_sql)
                    
                    logger.info(f"Migration complete for schema: {schema}")
                    
                except Exception as e:
                    logger.error(f"Migration failed for {schema}: {e}")
                    raise
        
        logger.info(f"Migration complete for {len(schemas)} schemas")


class SchemaRoutingConnectionPool:
    """
    Connection pool that routes to tenant-specific schema.
    """
    
    def __init__(self, pool, schema_prefix: str = "tenant_"):
        self.pool = pool
        self.schema_prefix = schema_prefix
    
    async def connection(self):
        """Get connection with schema routing."""
        return SchemaRoutedConnection(self.pool, self.schema_prefix)


class SchemaRoutedConnection:
    """Connection that automatically routes to tenant schema."""
    
    def __init__(self, pool, schema_prefix: str):
        self.pool = pool
        self.schema_prefix = schema_prefix
        self.conn = None
    
    async def __aenter__(self):
        self.conn = await self.pool.acquire()
        
        # Set search path to tenant schema
        tenant_id = get_current_tenant_id()
        schema_name = f"{self.schema_prefix}{tenant_id}"
        
        await self.conn.execute(f"SET search_path TO {schema_name}, public")
        
        return self.conn
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.conn:
            # Reset search path
            await self.conn.execute("SET search_path TO public")
            await self.pool.release(self.conn)
        return False

Chapter 6: Database-Per-Tenant Implementation

6.1 Database Provisioning

# tenant_isolation/database_per_tenant.py

"""
Database-per-tenant implementation.

Each tenant gets their own database instance.
Provides strongest isolation but highest operational complexity.
"""

from dataclasses import dataclass
from typing import Dict, Optional
import logging

logger = logging.getLogger(__name__)


@dataclass
class TenantDatabase:
    """Configuration for a tenant's database."""
    tenant_id: str
    host: str
    port: int
    database: str
    username: str
    password: str  # Should be encrypted/from secrets manager
    ssl_mode: str = "require"
    
    @property
    def connection_string(self) -> str:
        return (
            f"postgresql://{self.username}:{self.password}@"
            f"{self.host}:{self.port}/{self.database}"
            f"?sslmode={self.ssl_mode}"
        )


class DatabasePerTenantManager:
    """
    Manages database-per-tenant architecture.
    
    In practice, this often integrates with:
    - AWS RDS for database provisioning
    - Terraform for infrastructure
    - Secrets Manager for credentials
    """
    
    def __init__(self, db_cluster_endpoint: str, admin_credentials: dict):
        self.cluster_endpoint = db_cluster_endpoint
        self.admin_creds = admin_credentials
        self.tenant_pools: Dict[str, any] = {}
    
    async def provision_tenant_database(
        self,
        tenant_id: str,
        tier: str = "standard"
    ) -> TenantDatabase:
        """
        Provision a new database for a tenant.
        
        In production, this might:
        - Call AWS RDS API to create database
        - Use Terraform to provision infrastructure
        - Set up replication, backups, etc.
        """
        
        import asyncpg
        import secrets
        
        database_name = f"tenant_{tenant_id}"
        username = f"user_{tenant_id}"
        password = secrets.token_urlsafe(32)
        
        # Connect as admin to create database
        admin_conn = await asyncpg.connect(
            host=self.cluster_endpoint,
            user=self.admin_creds["username"],
            password=self.admin_creds["password"],
            database="postgres"
        )
        
        try:
            # Create database
            await admin_conn.execute(f"""
                CREATE DATABASE {database_name}
                WITH ENCODING 'UTF8'
            """)
            
            # Create user
            await admin_conn.execute(f"""
                CREATE USER {username} WITH PASSWORD '{password}'
            """)
            
            # Grant permissions
            await admin_conn.execute(f"""
                GRANT ALL PRIVILEGES ON DATABASE {database_name} TO {username}
            """)
            
            logger.info(f"Provisioned database for tenant: {tenant_id}")
            
        finally:
            await admin_conn.close()
        
        # Connect to new database to set up schema
        tenant_conn = await asyncpg.connect(
            host=self.cluster_endpoint,
            user=username,
            password=password,
            database=database_name
        )
        
        try:
            await self._create_tenant_schema(tenant_conn)
        finally:
            await tenant_conn.close()
        
        tenant_db = TenantDatabase(
            tenant_id=tenant_id,
            host=self.cluster_endpoint,
            port=5432,
            database=database_name,
            username=username,
            password=password
        )
        
        # Store credentials in secrets manager (not shown)
        await self._store_credentials(tenant_id, tenant_db)
        
        return tenant_db
    
    async def _create_tenant_schema(self, conn):
        """Create tables in tenant database."""
        
        await conn.execute("""
            CREATE TABLE users (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                email VARCHAR(255) UNIQUE NOT NULL,
                name VARCHAR(255) NOT NULL,
                role VARCHAR(50) DEFAULT 'member',
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
        
        await conn.execute("""
            CREATE TABLE documents (
                id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
                title VARCHAR(500) NOT NULL,
                content TEXT,
                created_by UUID REFERENCES users(id),
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
    
    async def _store_credentials(self, tenant_id: str, db: TenantDatabase):
        """Store credentials in secrets manager."""
        # In production: AWS Secrets Manager, HashiCorp Vault, etc.
        pass
    
    async def get_tenant_pool(self, tenant_id: str):
        """
        Get connection pool for a specific tenant.
        
        Pools are created lazily and cached.
        """
        import asyncpg
        
        if tenant_id in self.tenant_pools:
            return self.tenant_pools[tenant_id]
        
        # Load tenant database config
        tenant_db = await self._load_tenant_config(tenant_id)
        
        if not tenant_db:
            raise ValueError(f"No database configured for tenant: {tenant_id}")
        
        # Create pool
        pool = await asyncpg.create_pool(
            tenant_db.connection_string,
            min_size=2,
            max_size=10
        )
        
        self.tenant_pools[tenant_id] = pool
        return pool
    
    async def _load_tenant_config(self, tenant_id: str) -> Optional[TenantDatabase]:
        """Load tenant database configuration."""
        # In production: Load from secrets manager
        pass


class TenantDatabaseRouter:
    """
    Routes database connections to the correct tenant database.
    """
    
    def __init__(self, db_manager: DatabasePerTenantManager):
        self.db_manager = db_manager
    
    async def connection(self):
        """Get connection to current tenant's database."""
        tenant_id = get_current_tenant_id()
        pool = await self.db_manager.get_tenant_pool(tenant_id)
        return pool.acquire()

Chapter 7: Tenant-Aware Services Pattern

7.1 Complete Service Implementation

# tenant_isolation/service.py

"""
Complete example of a tenant-aware service.

This shows how all the pieces fit together:
- Middleware extracts tenant
- Service uses tenant-aware repository
- Caching is tenant-scoped
- Logging includes tenant context
"""

from fastapi import FastAPI, Depends, HTTPException
from typing import List, Optional
from dataclasses import dataclass
from datetime import datetime
import logging

logger = logging.getLogger(__name__)


@dataclass
class CreateDocumentRequest:
    title: str
    content: str


@dataclass
class Document:
    id: str
    tenant_id: str
    title: str
    content: str
    created_by: str
    created_at: datetime


class DocumentService:
    """
    Tenant-aware document service.
    
    All operations are automatically scoped to current tenant.
    """
    
    def __init__(self, document_repo, user_repo, cache, event_publisher):
        self.documents = document_repo
        self.users = user_repo
        self.cache = cache
        self.events = event_publisher
    
    @requires_tenant_async
    async def create_document(
        self,
        request: CreateDocumentRequest,
        created_by_user_id: str
    ) -> Document:
        """
        Create a new document in current tenant.
        """
        tenant = get_current_tenant()
        
        # Validate user exists in this tenant
        user = await self.users.find_by_id(created_by_user_id)
        if not user:
            raise ValueError("User not found")
        
        # Create document
        document = await self.documents.create({
            "title": request.title,
            "content": request.content,
            "created_by": created_by_user_id,
            "created_at": datetime.utcnow()
        })
        
        # Invalidate cache
        await self._invalidate_document_cache()
        
        # Publish event
        await self.events.publish(
            topic="documents",
            event={
                "type": "document.created",
                "tenant_id": tenant.tenant_id,
                "document_id": document.id,
                "created_by": created_by_user_id
            }
        )
        
        logger.info(
            "Document created",
            extra={
                "tenant_id": tenant.tenant_id,
                "document_id": document.id,
                "user_id": created_by_user_id
            }
        )
        
        return document
    
    @requires_tenant_async
    async def get_document(self, document_id: str) -> Optional[Document]:
        """
        Get document by ID within current tenant.
        """
        tenant = get_current_tenant()
        cache_key = f"tenant:{tenant.tenant_id}:doc:{document_id}"
        
        # Check cache
        cached = await self.cache.get(cache_key)
        if cached:
            return Document(**cached)
        
        # Query database
        document = await self.documents.find_by_id(document_id)
        
        if document:
            # Cache for 5 minutes
            await self.cache.set(cache_key, document.__dict__, ttl=300)
        
        return document
    
    @requires_tenant_async
    async def list_documents(
        self,
        limit: int = 50,
        offset: int = 0
    ) -> List[Document]:
        """
        List documents in current tenant.
        """
        return await self.documents.find_all(
            limit=limit,
            offset=offset
        )
    
    @requires_tenant_async
    async def search_documents(self, query: str) -> List[Document]:
        """
        Search documents within current tenant.
        
        Search index is tenant-scoped.
        """
        tenant = get_current_tenant()
        
        # Search in tenant-scoped index
        results = await self.search_client.search(
            index=f"documents_{tenant.tenant_id}",
            query=query
        )
        
        return [Document(**r) for r in results]
    
    @requires_tenant_async
    async def delete_document(self, document_id: str) -> bool:
        """
        Delete document within current tenant.
        """
        tenant = get_current_tenant()
        
        # Delete from database
        deleted = await self.documents.delete(document_id)
        
        if deleted:
            # Invalidate cache
            cache_key = f"tenant:{tenant.tenant_id}:doc:{document_id}"
            await self.cache.delete(cache_key)
            
            # Publish event
            await self.events.publish(
                topic="documents",
                event={
                    "type": "document.deleted",
                    "tenant_id": tenant.tenant_id,
                    "document_id": document_id
                }
            )
        
        return deleted
    
    async def _invalidate_document_cache(self):
        """Invalidate document list caches for current tenant."""
        tenant = get_current_tenant()
        pattern = f"tenant:{tenant.tenant_id}:docs:*"
        await self.cache.delete_pattern(pattern)


# FastAPI router with tenant middleware

app = FastAPI()

# Dependency to get document service
async def get_document_service() -> DocumentService:
    # In production, use dependency injection
    pass


@app.post("/api/documents")
async def create_document(
    request: CreateDocumentRequest,
    service: DocumentService = Depends(get_document_service)
):
    """
    Create a new document.
    
    Tenant context is set by middleware before this runs.
    """
    # Get current user from auth context
    user_id = get_current_user_id()
    
    document = await service.create_document(request, user_id)
    
    return {"id": document.id, "title": document.title}


@app.get("/api/documents/{document_id}")
async def get_document(
    document_id: str,
    service: DocumentService = Depends(get_document_service)
):
    """Get a document by ID."""
    document = await service.get_document(document_id)
    
    if not document:
        raise HTTPException(status_code=404, detail="Document not found")
    
    return document


@app.get("/api/documents")
async def list_documents(
    limit: int = 50,
    offset: int = 0,
    service: DocumentService = Depends(get_document_service)
):
    """List documents in current tenant."""
    documents = await service.list_documents(limit=limit, offset=offset)
    return {"documents": documents, "count": len(documents)}

7.2 Tenant-Aware Event Processing

# tenant_isolation/events.py

"""
Tenant-aware event processing.

Events in multi-tenant systems must carry tenant context
so consumers can process them correctly.
"""

from dataclasses import dataclass
from typing import Dict, Any, Callable
from datetime import datetime
import json


@dataclass
class TenantEvent:
    """
    Event with tenant context.
    
    All events must include tenant_id for proper routing and processing.
    """
    event_id: str
    event_type: str
    tenant_id: str
    timestamp: datetime
    payload: Dict[str, Any]
    
    def to_json(self) -> str:
        return json.dumps({
            "event_id": self.event_id,
            "event_type": self.event_type,
            "tenant_id": self.tenant_id,
            "timestamp": self.timestamp.isoformat(),
            "payload": self.payload
        })
    
    @classmethod
    def from_json(cls, data: str) -> 'TenantEvent':
        d = json.loads(data)
        return cls(
            event_id=d["event_id"],
            event_type=d["event_type"],
            tenant_id=d["tenant_id"],
            timestamp=datetime.fromisoformat(d["timestamp"]),
            payload=d["payload"]
        )


class TenantEventPublisher:
    """
    Publishes events with tenant context.
    """
    
    def __init__(self, kafka_producer):
        self.producer = kafka_producer
    
    async def publish(self, topic: str, event: Dict[str, Any]):
        """
        Publish event with current tenant context.
        """
        import uuid
        
        tenant = get_current_tenant()
        
        tenant_event = TenantEvent(
            event_id=str(uuid.uuid4()),
            event_type=event.get("type", "unknown"),
            tenant_id=tenant.tenant_id,
            timestamp=datetime.utcnow(),
            payload=event
        )
        
        # Use tenant_id as partition key for ordering
        await self.producer.send(
            topic=topic,
            key=tenant.tenant_id,
            value=tenant_event.to_json()
        )


class TenantEventConsumer:
    """
    Consumes events and sets tenant context before processing.
    """
    
    def __init__(self, kafka_consumer, tenant_service):
        self.consumer = kafka_consumer
        self.tenant_service = tenant_service
        self.handlers: Dict[str, Callable] = {}
    
    def register_handler(self, event_type: str, handler: Callable):
        """Register handler for event type."""
        self.handlers[event_type] = handler
    
    async def start(self):
        """Start consuming events."""
        async for message in self.consumer:
            await self._process_message(message)
    
    async def _process_message(self, message):
        """Process a single message with tenant context."""
        
        event = TenantEvent.from_json(message.value)
        
        # Load tenant context
        tenant_ctx = await self.tenant_service.get_tenant_context(
            event.tenant_id
        )
        
        if not tenant_ctx:
            logger.error(f"Unknown tenant: {event.tenant_id}")
            return
        
        # Set tenant context and process
        with tenant_scope(tenant_ctx):
            handler = self.handlers.get(event.event_type)
            
            if handler:
                try:
                    await handler(event)
                except Exception as e:
                    logger.error(
                        f"Event processing failed: {e}",
                        extra={
                            "tenant_id": event.tenant_id,
                            "event_id": event.event_id,
                            "event_type": event.event_type
                        }
                    )
            else:
                logger.warning(f"No handler for event type: {event.event_type}")

7.3 Tenant-Aware Background Jobs

# tenant_isolation/jobs.py

"""
Tenant-aware background job processing.

All background jobs must run within tenant context.
"""

from celery import Celery
from functools import wraps


celery_app = Celery('tasks')


def tenant_task(func):
    """
    Decorator for tenant-aware Celery tasks.
    
    Usage:
        @celery_app.task
        @tenant_task
        def process_document(document_id):
            # Runs in tenant context
            doc = document_repo.find_by_id(document_id)
    """
    @wraps(func)
    def wrapper(tenant_id: str, *args, **kwargs):
        # Load tenant context
        tenant_ctx = load_tenant_sync(tenant_id)
        
        if not tenant_ctx:
            raise ValueError(f"Unknown tenant: {tenant_id}")
        
        # Set context and run
        with tenant_scope(tenant_ctx):
            return func(*args, **kwargs)
    
    return wrapper


# Example tasks

@celery_app.task
@tenant_task
def send_document_notification(document_id: str, user_ids: list):
    """
    Send notifications about a document.
    
    Note: tenant_id is automatically prepended by @tenant_task
    """
    document = document_repo.find_by_id(document_id)
    
    if not document:
        return
    
    for user_id in user_ids:
        user = user_repo.find_by_id(user_id)
        if user:
            notification_service.send(
                user=user,
                template="document_shared",
                context={"document": document}
            )


@celery_app.task
@tenant_task
def generate_tenant_report():
    """
    Generate usage report for current tenant.
    """
    tenant = get_current_tenant()
    
    stats = {
        "tenant_id": tenant.tenant_id,
        "user_count": user_repo.count(),
        "document_count": document_repo.count(),
        "storage_used": storage_service.get_usage(),
    }
    
    report_service.generate(stats)


# Scheduling tenant jobs
class TenantJobScheduler:
    """
    Schedule jobs for all tenants or specific tenant.
    """
    
    def __init__(self, tenant_service):
        self.tenant_service = tenant_service
    
    async def schedule_for_all_tenants(self, task, *args, **kwargs):
        """
        Schedule a task to run for each tenant.
        
        Useful for nightly batch jobs.
        """
        tenants = await self.tenant_service.get_all_active_tenants()
        
        for tenant in tenants:
            # Include tenant_id as first argument
            task.delay(tenant.tenant_id, *args, **kwargs)
    
    async def schedule_for_tenant(self, tenant_id: str, task, *args, **kwargs):
        """Schedule a task for specific tenant."""
        task.delay(tenant_id, *args, **kwargs)

Part III: Real-World Application

Chapter 7: Case Studies

7.1 Slack's Multi-Tenancy

SLACK'S ISOLATION ARCHITECTURE

Challenge:
├── Thousands of enterprise customers
├── Strict data isolation required
├── Real-time messaging at scale
├── Some customers need dedicated infrastructure

Solution: HYBRID APPROACH

Standard Tenants (Shared Infrastructure):
├── Logical isolation with workspace_id
├── All messages in shared Vitess cluster
├── Row-level filtering in application
├── Shared search infrastructure

Enterprise Grid (Enhanced Isolation):
├── Dedicated Vitess shards
├── Isolated search clusters
├── Customer-managed encryption keys
├── Can be in customer's preferred region

Key Innovations:
├── WORKSPACE AS TENANT BOUNDARY
│   └── Clear isolation unit
│
├── CHANNEL-BASED PARTITIONING
│   └── Data sharded by channel for performance
│
├── ENTERPRISE KEY MANAGEMENT (EKM)
│   └── Customer controls encryption keys
│   └── Slack can't read customer data
│
└── SLACK CONNECT
    └── Controlled cross-tenant communication
    └── Explicit consent from both sides

Lesson: Offer different isolation levels at different price points

7.2 Salesforce Multi-Tenancy

SALESFORCE'S ARCHITECTURE

Scale:
├── 150,000+ customers
├── Shared infrastructure
├── Custom objects and fields per tenant
├── 20+ years of evolution

Architecture:

┌───────────────────────────────────────────────────────────────────────┐
│                      SALESFORCE MULTI-TENANT                          │
│                                                                       │
│  Application Layer:                                                   │
│  ├── Stateless app servers                                            │
│  ├── Tenant context from session                                      │
│  └── All queries include OrgId                                        │
│                                                                       │
│  Metadata Layer:                                                      │
│  ├── Custom objects stored as metadata                                │
│  ├── Fields defined in metadata tables                                │
│  └── Runtime schema assembly per tenant                               │
│                                                                       │
│  Database Layer:                                                      │
│  ├── Oracle RAC clusters                                              │
│  ├── OrgId in every table                                             │
│  ├── Massive indexes on OrgId                                         │
│  └── Data and metadata tables separate                                │
│                                                                       │
└───────────────────────────────────────────────────────────────────────┘

Key Innovations:

GOVERNOR LIMITS:
├── CPU time per transaction: 10 seconds
├── SOQL queries per transaction: 100
├── Records retrieved: 50,000
├── API calls per day: Based on licenses
└── Purpose: Prevent noisy neighbors

METADATA-DRIVEN CUSTOMIZATION:
├── Custom objects don't create real tables
├── Pivot tables store custom field data
├── Runtime query assembly
└── Allows unlimited customization

TRUST.SALESFORCE.COM:
├── Real-time system status
├── Performance metrics per instance
├── Transparency about incidents
└── Builds customer trust

Lesson: Strict resource limits enable massive multi-tenancy

7.3 AWS Multi-Tenancy Patterns

AWS'S GUIDANCE ON MULTI-TENANCY

AWS categorizes into "Silo" vs "Pool" models:

SILO MODEL (Dedicated Resources):
├── Separate AWS accounts per tenant
├── VPC isolation
├── Dedicated RDS instances
├── Independent scaling
├── Maximum isolation
└── Higher cost, simpler compliance

POOL MODEL (Shared Resources):
├── Single AWS account
├── Shared infrastructure
├── Tenant_id filtering
├── More efficient resource use
├── Lower cost, more complex
└── Requires careful design

BRIDGE MODEL (Hybrid):
├── Shared compute (ECS/EKS)
├── Isolated data stores
├── Tenant-specific encryption keys
├── Balance of cost and isolation
└── Most common for SaaS

AWS Recommendations:
├── Use AWS Organizations for account management
├── Use Resource Access Manager for sharing
├── Use KMS with per-tenant keys
├── Use CloudWatch with tenant dimensions
└── Use IAM policies for access control

Chapter 8: Common Mistakes

8.1 Tenant Isolation Mistakes

MULTI-TENANCY ANTI-PATTERNS

❌ MISTAKE 1: Missing WHERE Clause

Wrong:
  # Forgot tenant filter!
  users = db.query("SELECT * FROM users WHERE role = 'admin'")
  
Problem:
  Returns admins from ALL tenants
  Data breach waiting to happen

Right:
  # Always use tenant-aware repository
  users = await user_repo.find_all(filters={"role": "admin"})
  # Repository adds tenant_id automatically


❌ MISTAKE 2: Trusting Client-Provided Tenant ID

Wrong:
  tenant_id = request.headers.get("X-Tenant-ID")
  # Trust it blindly
  
Problem:
  Attacker sets X-Tenant-ID: victim_tenant
  Access other tenant's data

Right:
  # Derive tenant from authenticated user
  tenant_id = request.user.tenant_id
  # Or validate header against user's allowed tenants


❌ MISTAKE 3: Shared Cache Without Tenant Prefix

Wrong:
  cache_key = f"user:{user_id}"
  cached_user = redis.get(cache_key)
  
Problem:
  User ID 123 exists in both tenants
  Returns wrong tenant's user

Right:
  tenant_id = get_current_tenant_id()
  cache_key = f"tenant:{tenant_id}:user:{user_id}"
  cached_user = redis.get(cache_key)


❌ MISTAKE 4: Logging PII Without Tenant Context

Wrong:
  logger.info(f"User {user.email} logged in")
  
Problem:
  No way to filter logs by tenant
  Can't provide tenant-specific audit logs
  Cross-tenant data in same log stream

Right:
  logger.info(
      "User logged in",
      extra={
          "tenant_id": get_current_tenant_id(),
          "user_id": user.id,  # Not email!
      }
  )


❌ MISTAKE 5: Background Jobs Without Tenant Context

Wrong:
  @celery.task
  def send_welcome_email(user_id):
      user = User.get(user_id)  # Which tenant?!
      send_email(user.email)
  
Problem:
  Job runs without tenant context
  Query might fail or return wrong data

Right:
  @celery.task
  def send_welcome_email(tenant_id, user_id):
      with tenant_scope(load_tenant(tenant_id)):
          user = user_repo.find_by_id(user_id)
          send_email(user.email)

8.2 Security Checklist

MULTI-TENANT SECURITY CHECKLIST

Before Production:

TENANT IDENTIFICATION:
├── [ ] Tenant derived from authenticated session, not headers
├── [ ] Tenant ID validated on every request
├── [ ] Admin endpoints require additional auth
└── [ ] API keys are tenant-scoped

DATA ACCESS:
├── [ ] All queries include tenant filter
├── [ ] Database has RLS enabled (PostgreSQL)
├── [ ] Cache keys include tenant prefix
├── [ ] File storage paths include tenant
└── [ ] Search indexes are tenant-scoped

BACKGROUND PROCESSING:
├── [ ] Jobs include tenant context
├── [ ] Queues are tenant-aware
├── [ ] Scheduled tasks run in tenant scope
└── [ ] Webhooks validated before processing

LOGGING AND MONITORING:
├── [ ] All logs include tenant_id
├── [ ] Metrics tagged with tenant
├── [ ] Alerts can be filtered by tenant
└── [ ] No PII in logs (use IDs)

TESTING:
├── [ ] Tests run in isolated tenant
├── [ ] Cross-tenant access tests exist
├── [ ] Penetration testing for tenant isolation
└── [ ] Automated security scans

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 When to Discuss Multi-Tenancy

MULTI-TENANCY INTERVIEW TRIGGERS

"Design a SaaS platform for..."
├── Immediately ask about tenant count
├── Ask about enterprise requirements
└── Discuss isolation levels

"Design a system that stores user data..."
├── Ask: "Multiple organizations?"
├── If yes, tenant isolation is critical
└── Discuss data segregation

"How would you handle multiple customers?"
├── This IS a multi-tenancy question
├── Discuss isolation spectrum
└── Propose appropriate level

"Security requirements for B2B..."
├── Multi-tenancy is implicit
├── Enterprise customers want isolation
└── Compliance often requires segregation

9.2 Key Phrases

DISCUSSING MULTI-TENANCY IN INTERVIEWS

Opening:
"For multi-tenancy, the key decision is the isolation level.
This is a spectrum from shared tables with row-level filtering
to completely dedicated infrastructure per tenant."

Shared Tables:
"For a startup with SMB customers, shared tables with tenant_id
filtering is cost-effective. I'd use PostgreSQL Row-Level
Security for database-enforced isolation, plus application-level
filtering as defense in depth."

Schema Per Tenant:
"For mid-market SaaS with compliance requirements, schema-per-tenant
provides stronger isolation. Each tenant gets their own schema,
so a bug can't accidentally expose cross-tenant data."

Database Per Tenant:
"For enterprise customers requiring strict isolation, database-per-tenant
makes sense. It enables per-tenant backups, independent scaling,
and meets compliance requirements. The trade-off is operational
complexity and cost."

Hybrid Approach:
"Many successful SaaS platforms use a tiered approach: shared tables
for self-serve customers, and dedicated resources for enterprise
customers who pay for isolation. Slack does this with their
Enterprise Grid offering."

Chapter 10: Practice Problems

Problem 1: E-commerce SaaS Platform

Setup: Design a multi-tenant e-commerce platform where merchants can set up online stores. You have 10,000 merchants, ranging from small shops (100 orders/month) to large retailers (100,000 orders/month).

Requirements:

  • Each merchant's data must be isolated
  • Large merchants shouldn't affect small merchants
  • Some merchants need dedicated infrastructure

Questions:

  1. What isolation level would you choose?
  2. How do you handle the 10x variation in scale?
  3. How do you migrate a merchant from shared to dedicated?
  • Consider hybrid approach: shared for small, dedicated for large
  • Shard by merchant_id for the shared tier
  • Design migration path from shared to dedicated
  • Use feature flags to control which tier a merchant uses

Problem 2: Healthcare SaaS

Setup: Design a multi-tenant EHR (Electronic Health Records) system for clinics. HIPAA compliance required.

Requirements:

  • Strong data isolation (PHI)
  • Audit logging of all access
  • Some clinics part of same hospital network

Questions:

  1. Can you use shared tables for PHI?
  2. How do you handle clinic networks (parent-child tenants)?
  3. What's your audit logging strategy?
  • PHI typically requires database-per-tenant or stronger
  • Consider organization hierarchy in tenant model
  • Audit logging at database level (triggers)
  • Encryption with per-tenant keys

Chapter 11: Sample Interview Dialogue

Interviewer: "Design a project management SaaS like Asana. Multiple companies will use it."

You: "Great. This is a multi-tenant system. Let me start by understanding the scale and customer profile. How many organizations are we targeting? What's the range — startups to enterprises?"

Interviewer: "Let's say 50,000 organizations, from 5-person startups to 10,000-person enterprises."

You: "That range is important. For isolation, I'd propose a tiered approach:

Standard tier: Shared PostgreSQL with tenant_id in every table. I'd use Row-Level Security so the database enforces isolation. This works well for startups and SMBs — cost-effective and scales to tens of thousands of tenants.

Enterprise tier: For large organizations with compliance requirements, I'd offer schema-per-tenant or dedicated database options. This provides stronger isolation guarantees and enables features like customer-managed encryption keys.

Let me draw the data model..."

TENANT MODEL

tenants table:
├── id (UUID)
├── name
├── plan (standard, enterprise)
├── isolation_level (shared, schema, dedicated)
├── created_at
└── settings (JSONB)

For shared tier, all tables have:
├── tenant_id (indexed)
├── Composite indexes: (tenant_id, other_columns)
└── RLS policy: WHERE tenant_id = current_tenant()

Interviewer: "What if an enterprise customer runs a huge report that slows down the database?"

You: "Noisy neighbor problem — great question. Several defenses:

  1. Query timeouts: Limit query execution time per tenant
  2. Resource quotas: Track and limit database connections, CPU time
  3. Read replicas: Route heavy analytics to replicas
  4. For enterprise tier: Their dedicated database means they only affect themselves

For the shared tier, I'd also implement query analysis to flag expensive queries before they run."

Interviewer: "How do you handle tenant context across microservices?"

You: "Tenant context needs to propagate through the entire request path. I'd implement:

  1. API Gateway: Extracts tenant from subdomain or JWT, sets X-Tenant-ID header
  2. Service mesh: Propagates header automatically between services
  3. Each service: Middleware validates header, sets context variable
  4. Database layer: Uses context to set RLS variables

For async jobs, the message includes tenant_id, and the worker sets context before processing.

This ensures every database query, cache lookup, and log entry is tenant-scoped."


Chapter 12: Testing Multi-Tenant Systems

12.1 Isolation Testing

# tests/test_tenant_isolation.py

"""
Tests to verify tenant isolation.

These tests are CRITICAL - they verify that tenants can't see each other's data.
Run these in CI/CD and before every deployment.
"""

import pytest
from uuid import uuid4


class TestTenantIsolation:
    """
    Test suite for verifying tenant isolation.
    """
    
    @pytest.fixture
    async def tenant_a(self, tenant_service):
        """Create test tenant A."""
        return await tenant_service.create_tenant(
            name="Test Tenant A",
            plan="standard"
        )
    
    @pytest.fixture
    async def tenant_b(self, tenant_service):
        """Create test tenant B."""
        return await tenant_service.create_tenant(
            name="Test Tenant B",
            plan="standard"
        )
    
    async def test_tenant_cannot_see_other_tenant_data(
        self,
        tenant_a,
        tenant_b,
        document_service
    ):
        """
        Verify that Tenant A cannot see Tenant B's documents.
        """
        # Create document as Tenant A
        with tenant_scope(tenant_a):
            doc_a = await document_service.create_document(
                CreateDocumentRequest(
                    title="Tenant A Secret",
                    content="This is confidential"
                ),
                created_by_user_id="user_a"
            )
        
        # Try to access from Tenant B
        with tenant_scope(tenant_b):
            # Should not find Tenant A's document
            doc = await document_service.get_document(doc_a.id)
            assert doc is None, "Tenant B should not see Tenant A's document!"
            
            # List should not include Tenant A's documents
            docs = await document_service.list_documents()
            doc_ids = [d.id for d in docs]
            assert doc_a.id not in doc_ids, "Tenant A's document leaked to Tenant B!"
    
    async def test_tenant_cannot_modify_other_tenant_data(
        self,
        tenant_a,
        tenant_b,
        document_repo
    ):
        """
        Verify that Tenant A cannot modify Tenant B's documents.
        """
        # Create document as Tenant A
        with tenant_scope(tenant_a):
            doc_a = await document_repo.create({
                "title": "Original Title",
                "content": "Original Content"
            })
        
        # Try to update from Tenant B
        with tenant_scope(tenant_b):
            result = await document_repo.update(
                doc_a.id,
                {"title": "Hacked Title"}
            )
            assert result is None, "Tenant B should not modify Tenant A's document!"
        
        # Verify document unchanged
        with tenant_scope(tenant_a):
            doc = await document_repo.find_by_id(doc_a.id)
            assert doc.title == "Original Title"
    
    async def test_tenant_cannot_delete_other_tenant_data(
        self,
        tenant_a,
        tenant_b,
        document_repo
    ):
        """
        Verify that Tenant A cannot delete Tenant B's documents.
        """
        # Create document as Tenant A
        with tenant_scope(tenant_a):
            doc_a = await document_repo.create({
                "title": "Important Document",
                "content": "Do not delete"
            })
        
        # Try to delete from Tenant B
        with tenant_scope(tenant_b):
            deleted = await document_repo.delete(doc_a.id)
            assert not deleted, "Tenant B should not delete Tenant A's document!"
        
        # Verify document still exists
        with tenant_scope(tenant_a):
            doc = await document_repo.find_by_id(doc_a.id)
            assert doc is not None
    
    async def test_sql_injection_cannot_bypass_tenant(
        self,
        tenant_a,
        tenant_b,
        db_connection
    ):
        """
        Verify that SQL injection attempts cannot bypass tenant isolation.
        
        This tests the RLS policy directly.
        """
        # Create document as Tenant A
        with tenant_scope(tenant_a):
            # Attempt SQL injection in search query
            malicious_query = "'; SELECT * FROM documents; --"
            
            # This should not return Tenant B's documents
            # If RLS is working, it's impossible to bypass
            docs = await document_service.search_documents(malicious_query)
            
            for doc in docs:
                assert doc.tenant_id == tenant_a.tenant_id
    
    async def test_cache_isolation(
        self,
        tenant_a,
        tenant_b,
        document_service,
        cache
    ):
        """
        Verify that cached data is tenant-isolated.
        """
        # Create and cache document as Tenant A
        with tenant_scope(tenant_a):
            doc_a = await document_service.create_document(
                CreateDocumentRequest(title="Cached Doc", content="Secret"),
                created_by_user_id="user_a"
            )
            # Access to populate cache
            await document_service.get_document(doc_a.id)
        
        # Verify cache key is tenant-scoped
        cache_key_a = f"tenant:{tenant_a.tenant_id}:doc:{doc_a.id}"
        assert await cache.exists(cache_key_a)
        
        # Tenant B should not hit this cache
        with tenant_scope(tenant_b):
            cache_key_b = f"tenant:{tenant_b.tenant_id}:doc:{doc_a.id}"
            assert not await cache.exists(cache_key_b)


class TestTenantContextPropagation:
    """
    Test that tenant context flows correctly through the system.
    """
    
    async def test_context_available_in_service(self, tenant_a, document_service):
        """Verify tenant context is available in service layer."""
        with tenant_scope(tenant_a):
            # Service should be able to get current tenant
            tenant = get_current_tenant()
            assert tenant.tenant_id == tenant_a.tenant_id
    
    async def test_context_propagates_to_repository(self, tenant_a, document_repo):
        """Verify tenant context propagates to repository."""
        with tenant_scope(tenant_a):
            # Repository should automatically use current tenant
            doc = await document_repo.create({"title": "Test", "content": "Test"})
            assert doc.tenant_id == tenant_a.tenant_id
    
    async def test_no_context_raises_error(self, document_service):
        """Verify operations fail without tenant context."""
        # Outside tenant scope
        with pytest.raises(TenantContextError):
            await document_service.list_documents()
    
    async def test_context_cleared_after_scope(self, tenant_a):
        """Verify context is cleared after scope exits."""
        with tenant_scope(tenant_a):
            assert get_current_tenant().tenant_id == tenant_a.tenant_id
        
        # After scope, context should be cleared
        with pytest.raises(TenantContextError):
            get_current_tenant()

12.2 Load Testing Multi-Tenant Systems

# tests/load_test_tenants.py

"""
Load testing for multi-tenant systems.

Verify that:
1. Multiple tenants can operate concurrently
2. One tenant's load doesn't affect others
3. System handles tenant context correctly under load
"""

import asyncio
from locust import HttpUser, task, between


class TenantLoadTest(HttpUser):
    """
    Locust load test simulating multiple tenants.
    """
    wait_time = between(1, 3)
    
    def on_start(self):
        """Setup - assign this user to a random tenant."""
        import random
        self.tenant_id = f"tenant_{random.randint(1, 100)}"
        self.headers = {"X-Tenant-ID": self.tenant_id}
    
    @task(3)
    def list_documents(self):
        """List documents for this tenant."""
        self.client.get("/api/documents", headers=self.headers)
    
    @task(2)
    def create_document(self):
        """Create a document for this tenant."""
        self.client.post(
            "/api/documents",
            json={"title": "Load Test Doc", "content": "Test content"},
            headers=self.headers
        )
    
    @task(1)
    def get_document(self):
        """Get a specific document."""
        self.client.get(f"/api/documents/doc_1", headers=self.headers)


async def verify_isolation_under_load():
    """
    Verify tenant isolation while system is under load.
    
    Run this while load test is active.
    """
    tenants = ["tenant_1", "tenant_2", "tenant_3"]
    
    async def check_tenant(tenant_id):
        """Verify tenant only sees their own data."""
        async with httpx.AsyncClient() as client:
            response = await client.get(
                "http://localhost:8000/api/documents",
                headers={"X-Tenant-ID": tenant_id}
            )
            
            docs = response.json()["documents"]
            for doc in docs:
                assert doc["tenant_id"] == tenant_id, \
                    f"Tenant {tenant_id} saw document from {doc['tenant_id']}!"
    
    # Check all tenants concurrently
    await asyncio.gather(*[check_tenant(t) for t in tenants])
    print("Isolation verified under load!")

Summary

DAY 1 KEY TAKEAWAYS

ISOLATION SPECTRUM:
├── Shared tables: Lowest cost, application-enforced
├── Row-level security: Database-enforced, same tables
├── Schema per tenant: Stronger isolation, same server
├── Database per tenant: Physical isolation, higher cost
└── Dedicated infrastructure: Complete isolation, highest cost

TENANT CONTEXT:
├── Extract from auth, not client headers
├── Propagate through all layers
├── Include in async jobs and messages
├── Tag all logs and metrics

ROW-LEVEL SECURITY:
├── PostgreSQL RLS is powerful
├── Defense in depth with application filtering
├── Set context per connection
└── Test extensively

HYBRID APPROACH:
├── Different isolation for different tiers
├── Self-serve: Shared infrastructure
├── Enterprise: Dedicated resources
├── Migration path between tiers

DEFAULT CHOICE:
├── Start with shared tables + RLS
├── Add schema separation for compliance
├── Offer dedicated for enterprise deals
└── Never compromise on context propagation

Further Reading

Documentation:

Engineering Blogs:

  • Slack Engineering: "Scaling Slack's Database"
  • Salesforce Architecture: "The Metadata-Driven Architecture"

Books:

  • "Building Multi-Tenant SaaS Architectures" (AWS)

End of Day 1: Tenant Isolation Strategies

Tomorrow: Day 2 — Noisy Neighbor Prevention. We'll learn how to stop one tenant from ruining everyone else's experience with quotas, rate limiting, and fair scheduling.