Week 9 β Day 5: Security Architecture
System Design Mastery Series β Multi-Tenancy, Security, and Compliance Week
Preface
You're reviewing a pull request when you notice something alarming:
THE SECURITY INCIDENT
Pull Request #4521: Add new analytics integration
+++ config/analytics.py
+ ANALYTICS_API_KEY = "sk_live_a8f7g9h2j3k4l5m6n7o8p9"
+ DATABASE_URL = "postgres://admin:SuperSecret123@prod-db.example.com:5432/app"
Your Slack lights up:
Security Bot: π¨ ALERT: Credential detected in commit
Security Bot: Repository: backend-api
Security Bot: File: config/analytics.py
Security Bot: Detected: API key, Database password
You check the git history:
commit a1b2c3d (3 hours ago)
Author: junior.dev@company.com
Message: Add analytics integration
This has been in main for 3 hours.
Production deployed 2 hours ago.
The secret is now in:
βββ GitHub history (forever unless force-pushed)
βββ Docker image layers (pushed to registry)
βββ CI/CD logs (visible to team)
βββ Developer laptops (git pulled)
βββ Any forks of the repo
Even if you delete it now, it's been exposed.
Questions:
βββ How did this happen? (No secrets scanning in CI)
βββ Why could a dev access production DB password? (No separation)
βββ Why is there a password at all? (Should use IAM roles)
βββ How do we prevent this forever? (Security architecture)
Today, we'll build a security architecture that makes this class of mistake impossible.
Part I: Foundations
Chapter 1: Security Architecture Principles
1.1 Defense in Depth
Defense in depth means multiple layers of security controls, so that if one fails, others still protect the system.
DEFENSE IN DEPTH LAYERS
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β LAYER 1: PERIMETER β
β βββ WAF (Web Application Firewall) β
β βββ DDoS protection β
β βββ Rate limiting β
β βββ IP allowlisting (for admin) β
β β
β LAYER 2: NETWORK β
β βββ VPC isolation β
β βββ Security groups β
β βββ Private subnets for databases β
β βββ Network ACLs β
β β
β LAYER 3: APPLICATION β
β βββ Authentication (who are you?) β
β βββ Authorization (what can you do?) β
β βββ Input validation β
β βββ Output encoding β
β β
β LAYER 4: DATA β
β βββ Encryption at rest β
β βββ Encryption in transit β
β βββ Field-level encryption β
β βββ Key management β
β β
β LAYER 5: MONITORING β
β βββ Audit logging β
β βββ Intrusion detection β
β βββ Anomaly detection β
β βββ Incident response β
β β
β Each layer assumes other layers might fail β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.2 Zero Trust Architecture
Zero Trust means "never trust, always verify" - every request must be authenticated and authorized regardless of network location.
ZERO TRUST PRINCIPLES
TRADITIONAL (PERIMETER-BASED):
βββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββ
β Corporate Network β
β βββββββ βββββββ βββββββ β
β β App βββββ DB βββββ API β TRUSTED β
β βββββββ βββββββ βββββββ β
β β
ββββββββββββββββββββ¬βββββββββββββββββββββββ
β
[Firewall]
β
UNTRUSTED
β
[Internet]
Problem: Once inside, attacker has free access
ZERO TRUST:
βββββββββββ
βββββββββββββββββββββββββββββββββββββββββββ
β β
β βββββββ βββββββ βββββββ β
β β App βββ?βββΆ β DB β βββ?βββ API β β
β ββββ¬βββ ββββ¬βββ ββββ¬βββ β
β β β β β
β βΌ βΌ βΌ β
β [Auth] [Auth] [Auth] β
β β
β Every connection authenticated β
β Every request authorized β
β No implicit trust β
β β
βββββββββββββββββββββββββββββββββββββββββββ
Key principles:
βββ Verify explicitly (every request)
βββ Use least privilege access
βββ Assume breach (design for compromise)
βββ Micro-segmentation (isolate everything)
1.3 Principle of Least Privilege
LEAST PRIVILEGE IN PRACTICE
β WRONG: Over-privileged access
Developer laptop:
βββ AWS Admin access
βββ Production database credentials
βββ All API keys
βββ Root access to servers
Problem: If laptop compromised, attacker has everything
β RIGHT: Minimal necessary access
Developer laptop:
βββ AWS access: Dev account only, read-only prod
βββ Database: Dev database only, no prod
βββ API keys: Test keys only
βββ Servers: No direct access, use bastion + MFA
Production services:
βββ App server: Can read DB, can't modify schema
βββ Worker: Can write to specific queues
βββ Analytics: Read-only replica access
βββ Each service: Only what it needs
IMPLEMENTING LEAST PRIVILEGE:
βββ Role-based access control (RBAC)
βββ Just-in-time access (temporary elevation)
βββ Regular access reviews (remove unused)
βββ Separate environments (dev/staging/prod)
βββ Service accounts per function
Chapter 2: Trust Boundaries and Threat Modeling
2.1 Identifying Trust Boundaries
TRUST BOUNDARY MAP
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β TRUST BOUNDARIES β
β β
β ββββββββββββββββββββ INTERNET (Untrusted) ββββββββββββββββββββ β
β β β
β [Boundary 1] β
β β β
β βββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββββ β
β β DMZ / Edge β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β β β CDN β β WAF β β ALB β β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β βββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββ β
β β β
β [Boundary 2] β
β β β
β βββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββββ β
β β Application Tier β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β β β API β β Worker β β Admin β β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β βββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββ β
β β β
β [Boundary 3] β
β β β
β βββββββββββββββββββββββββββββ΄βββββββββββββββββββββββββββ β
β β Data Tier β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β β β DB β β Cache β β S3 β β β
β β βββββββββββ βββββββββββ βββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
β RULES: β
β β’ Data crossing boundaries must be validated β
β β’ Each boundary requires authentication β
β β’ Encryption required at every boundary β
β β’ Log all cross-boundary access β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.2 STRIDE Threat Modeling
STRIDE THREAT MODEL
For each component, consider:
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β THREAT β DESCRIPTION β MITIGATION β
βββββββββββββββββββΌβββββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Spoofing β Pretending to be β Strong authentication β
β β someone else β MFA, certificates β
βββββββββββββββββββΌβββββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Tampering β Modifying data or β Input validation β
β β code β Integrity checks, signing β
βββββββββββββββββββΌβββββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Repudiation β Denying actions β Audit logging β
β β taken β Non-repudiation controls β
βββββββββββββββββββΌβββββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Information β Exposing data to β Encryption β
β Disclosure β unauthorized parties β Access controls β
βββββββββββββββββββΌβββββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Denial of β Making system β Rate limiting β
β Service β unavailable β Redundancy, scaling β
βββββββββββββββββββΌβββββββββββββββββββββββββΌββββββββββββββββββββββββββββββ€
β Elevation of β Gaining unauthorized β Least privilege β
β Privilege β access β Input validation β
βββββββββββββββββββ΄βββββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββ
EXAMPLE: API Endpoint Threat Model
Component: POST /api/users
βββ Spoofing: Attacker creates account as another user
β βββ Mitigation: Email verification, CAPTCHA
βββ Tampering: SQL injection in user data
β βββ Mitigation: Parameterized queries, validation
βββ Repudiation: User denies creating account
β βββ Mitigation: Audit log with IP, timestamp
βββ Information Disclosure: User data leaked
β βββ Mitigation: Field-level encryption, HTTPS
βββ DoS: Flood of account creation
β βββ Mitigation: Rate limiting, CAPTCHA
βββ Elevation: Create admin account
βββ Mitigation: No role in registration, admin approval
Chapter 3: Encryption Strategy
3.1 Encryption Layers
ENCRYPTION AT EVERY LAYER
LAYER 1: IN TRANSIT
ββββββββββββββββββββ
Client ββ[TLS 1.3]βββΆ Load Balancer ββ[mTLS]βββΆ Service ββ[TLS]βββΆ Database
Requirements:
βββ TLS 1.3 for external connections
βββ mTLS between services (mutual authentication)
βββ No plaintext internal traffic
βββ Certificate rotation automated
LAYER 2: AT REST
ββββββββββββββββ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β DATABASE: β
β βββ Transparent Data Encryption (TDE) β
β βββ Encrypted with AWS KMS key β
β βββ Automatic, no application changes β
β β
β FILE STORAGE (S3): β
β βββ Server-side encryption (SSE-KMS) β
β βββ Client-side encryption for sensitive files β
β βββ Bucket policy enforces encryption β
β β
β BACKUPS: β
β βββ Encrypted with separate key β
β βββ Key escrowed for disaster recovery β
β βββ Cross-region replicas also encrypted β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
LAYER 3: APPLICATION-LEVEL (Field Encryption)
βββββββββββββββββββββββββββββββββββββββββββββ
Encrypt sensitive fields before storing:
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β users table: β
β βββ id: 12345 (plaintext - for queries) β
β βββ email: user@example.com (plaintext - for login) β
β βββ ssn: ENC[AES256:abc123...] (encrypted) β
β βββ credit_card: ENC[AES256:def456...] (encrypted) β
β βββ medical_history: ENC[AES256:ghi789...] (encrypted) β
β β
β Benefits: β
β βββ Database admin can't read sensitive data β
β βββ Backup exposure doesn't leak PII β
β βββ Compliance (PCI, HIPAA) requirements met β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3.2 Key Management Hierarchy
KEY MANAGEMENT HIERARCHY
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β βββββββββββββββββββ β
β β Master Key β β
β β (HSM-backed) β β
β β Never leaves β β
β β HSM β β
β ββββββββββ¬βββββββββ β
β β β
β ββββββββββββββββΌβββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββ βββββββββββββ βββββββββββββ β
β β Tenant β β Tenant β β Service β β
β β Key A β β Key B β β Keys β β
β βββββββ¬ββββββ βββββββ¬ββββββ βββββββ¬ββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββ βββββββββββββ βββββββββββββ β
β βData Keys β βData Keys β βData Keys β β
β β(per-row) β β(per-row) β β(per-job) β β
β βββββββββββββ βββββββββββββ βββββββββββββ β
β β
β HIERARCHY BENEFITS: β
β βββ Master key rotation doesn't re-encrypt all data β
β βββ Tenant isolation: Tenant A can't decrypt Tenant B β
β βββ Key per data item: Compromised key limits blast radius β
β βββ HSM protection: Master key never exposed β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Part II: Implementation
Chapter 4: Secrets Management
4.1 Secrets Management Service
# security/secrets.py
"""
Secrets management using AWS Secrets Manager / HashiCorp Vault.
Never store secrets in:
- Environment variables (visible in process lists)
- Config files (committed to git)
- Container images (visible in layers)
"""
from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import json
import logging
logger = logging.getLogger(__name__)
@dataclass
class Secret:
"""A secret value with metadata."""
name: str
value: str
version: str
created_at: datetime
expires_at: Optional[datetime]
metadata: Dict[str, Any]
class SecretsManager:
"""
Centralized secrets management.
Features:
- Automatic rotation
- Audit logging
- Caching with TTL
- Tenant isolation
"""
def __init__(self, vault_client, cache, audit_logger):
self.vault = vault_client
self.cache = cache
self.audit = audit_logger
self._cache_ttl = 300 # 5 minutes
async def get_secret(
self,
secret_name: str,
tenant_id: Optional[str] = None
) -> Secret:
"""
Retrieve a secret.
Secrets are cached locally to reduce Vault calls,
but cache TTL ensures rotation takes effect.
"""
# Build full path
if tenant_id:
path = f"tenants/{tenant_id}/secrets/{secret_name}"
else:
path = f"global/secrets/{secret_name}"
# Check cache
cache_key = f"secret:{path}"
cached = await self.cache.get(cache_key)
if cached:
return Secret(**json.loads(cached))
# Fetch from Vault
try:
result = await self.vault.read(path)
secret = Secret(
name=secret_name,
value=result["data"]["value"],
version=result["metadata"]["version"],
created_at=datetime.fromisoformat(result["metadata"]["created_time"]),
expires_at=result["data"].get("expires_at"),
metadata=result["metadata"]
)
# Cache it
await self.cache.set(
cache_key,
json.dumps(secret.__dict__, default=str),
ttl=self._cache_ttl
)
# Audit log
await self.audit.log(
action="secret_accessed",
secret_name=secret_name,
tenant_id=tenant_id
)
return secret
except Exception as e:
logger.error(f"Failed to retrieve secret {secret_name}: {e}")
raise SecretNotFoundError(f"Secret not found: {secret_name}")
async def set_secret(
self,
secret_name: str,
value: str,
tenant_id: Optional[str] = None,
expires_in: Optional[timedelta] = None
) -> Secret:
"""
Store a secret.
"""
if tenant_id:
path = f"tenants/{tenant_id}/secrets/{secret_name}"
else:
path = f"global/secrets/{secret_name}"
data = {"value": value}
if expires_in:
data["expires_at"] = (datetime.utcnow() + expires_in).isoformat()
result = await self.vault.write(path, data)
# Invalidate cache
await self.cache.delete(f"secret:{path}")
# Audit log
await self.audit.log(
action="secret_updated",
secret_name=secret_name,
tenant_id=tenant_id
)
return await self.get_secret(secret_name, tenant_id)
async def rotate_secret(
self,
secret_name: str,
rotation_func,
tenant_id: Optional[str] = None
):
"""
Rotate a secret using provided rotation function.
rotation_func should:
1. Generate new secret value
2. Update external system (e.g., database password)
3. Return new value
"""
logger.info(f"Rotating secret: {secret_name}")
try:
# Generate new secret
new_value = await rotation_func()
# Store new version
await self.set_secret(secret_name, new_value, tenant_id)
# Audit log
await self.audit.log(
action="secret_rotated",
secret_name=secret_name,
tenant_id=tenant_id
)
logger.info(f"Secret rotated: {secret_name}")
except Exception as e:
logger.error(f"Secret rotation failed: {e}")
await self.audit.log(
action="secret_rotation_failed",
secret_name=secret_name,
tenant_id=tenant_id,
error=str(e)
)
raise
class SecretNotFoundError(Exception):
"""Raised when a secret is not found."""
pass
# Database credential rotation
async def rotate_database_password(db_admin_client, username: str):
"""
Rotate a database user's password.
This is called by SecretsManager.rotate_secret()
"""
import secrets
# Generate new password
new_password = secrets.token_urlsafe(32)
# Update in database
await db_admin_client.execute(
f"ALTER USER {username} WITH PASSWORD %s",
new_password
)
return new_password
4.2 Application Configuration Without Secrets
# security/config.py
"""
Application configuration that separates secrets from config.
Config: In code/environment (non-sensitive)
Secrets: In secrets manager (sensitive)
"""
from dataclasses import dataclass
from typing import Optional
import os
@dataclass
class DatabaseConfig:
"""Database configuration (secrets fetched separately)."""
host: str
port: int
database: str
ssl_mode: str = "require"
pool_size: int = 10
# Note: No password here!
@classmethod
def from_env(cls):
return cls(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "app"),
ssl_mode=os.getenv("DB_SSL_MODE", "require"),
pool_size=int(os.getenv("DB_POOL_SIZE", "10"))
)
@dataclass
class AppConfig:
"""Application configuration."""
environment: str
debug: bool
log_level: str
database: DatabaseConfig
# Secrets are NOT in config
# They're fetched at runtime from SecretsManager
@classmethod
def from_env(cls):
return cls(
environment=os.getenv("ENVIRONMENT", "development"),
debug=os.getenv("DEBUG", "false").lower() == "true",
log_level=os.getenv("LOG_LEVEL", "INFO"),
database=DatabaseConfig.from_env()
)
class SecureConnectionFactory:
"""
Creates database connections with secrets from vault.
Secrets are fetched at connection time, not startup time.
This allows rotation without restart.
"""
def __init__(self, config: DatabaseConfig, secrets_manager: SecretsManager):
self.config = config
self.secrets = secrets_manager
async def create_connection(self):
"""Create a database connection with current credentials."""
import asyncpg
# Fetch current password from secrets manager
secret = await self.secrets.get_secret("database/app_user_password")
return await asyncpg.connect(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user="app_user",
password=secret.value, # From vault, not config
ssl=self.config.ssl_mode
)
async def create_pool(self):
"""Create a connection pool with credential refresh."""
import asyncpg
async def get_password():
secret = await self.secrets.get_secret("database/app_user_password")
return secret.value
return await asyncpg.create_pool(
host=self.config.host,
port=self.config.port,
database=self.config.database,
user="app_user",
password=await get_password(),
ssl=self.config.ssl_mode,
min_size=2,
max_size=self.config.pool_size
)
Chapter 5: Authentication and Authorization
5.1 Authentication Service
# security/authentication.py
"""
Authentication service with multiple methods.
"""
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timedelta
from enum import Enum
import jwt
import bcrypt
import secrets
import logging
logger = logging.getLogger(__name__)
class AuthMethod(Enum):
"""Supported authentication methods."""
PASSWORD = "password"
API_KEY = "api_key"
OAUTH = "oauth"
SAML = "saml"
MFA_TOTP = "mfa_totp"
@dataclass
class AuthenticatedUser:
"""Result of successful authentication."""
user_id: str
tenant_id: str
email: str
roles: List[str]
permissions: List[str]
auth_method: AuthMethod
mfa_verified: bool
session_id: str
@dataclass
class AuthToken:
"""JWT token with claims."""
access_token: str
refresh_token: str
expires_at: datetime
token_type: str = "Bearer"
class AuthenticationService:
"""
Handles user authentication.
Security features:
- Password hashing with bcrypt
- Rate limiting on failures
- MFA support
- Session management
- Audit logging
"""
def __init__(
self,
db,
cache,
secrets_manager,
audit_logger
):
self.db = db
self.cache = cache
self.secrets = secrets_manager
self.audit = audit_logger
async def authenticate_password(
self,
email: str,
password: str,
ip_address: str,
user_agent: str
) -> Optional[AuthenticatedUser]:
"""
Authenticate with email and password.
"""
# Check rate limit
if await self._is_rate_limited(email, ip_address):
await self.audit.log(
action="auth_rate_limited",
email=email,
ip_address=ip_address
)
raise AuthenticationError("Too many attempts. Try again later.")
# Find user
user = await self.db.fetchone(
"""
SELECT id, tenant_id, email, password_hash, roles,
mfa_enabled, status
FROM users
WHERE email = $1
""",
email.lower()
)
if not user:
await self._record_failed_attempt(email, ip_address)
await self.audit.log(
action="auth_failed",
reason="user_not_found",
email=email,
ip_address=ip_address
)
raise AuthenticationError("Invalid credentials")
# Check status
if user["status"] != "active":
await self.audit.log(
action="auth_failed",
reason="account_inactive",
user_id=user["id"]
)
raise AuthenticationError("Account is not active")
# Verify password
if not bcrypt.checkpw(
password.encode(),
user["password_hash"].encode()
):
await self._record_failed_attempt(email, ip_address)
await self.audit.log(
action="auth_failed",
reason="invalid_password",
user_id=user["id"],
ip_address=ip_address
)
raise AuthenticationError("Invalid credentials")
# Clear rate limit on success
await self._clear_failed_attempts(email, ip_address)
# Create session
session_id = secrets.token_urlsafe(32)
authenticated = AuthenticatedUser(
user_id=user["id"],
tenant_id=user["tenant_id"],
email=user["email"],
roles=user["roles"],
permissions=await self._get_permissions(user["roles"]),
auth_method=AuthMethod.PASSWORD,
mfa_verified=not user["mfa_enabled"], # False if MFA required
session_id=session_id
)
# Store session
await self._create_session(authenticated, ip_address, user_agent)
# Audit log
await self.audit.log(
action="auth_success",
user_id=user["id"],
method="password",
ip_address=ip_address,
mfa_required=user["mfa_enabled"]
)
return authenticated
async def verify_mfa(
self,
session_id: str,
totp_code: str
) -> AuthenticatedUser:
"""
Verify MFA TOTP code.
"""
# Get session
session = await self._get_session(session_id)
if not session:
raise AuthenticationError("Session not found")
if session["mfa_verified"]:
raise AuthenticationError("MFA already verified")
# Get user's TOTP secret
user = await self.db.fetchone(
"SELECT mfa_secret FROM users WHERE id = $1",
session["user_id"]
)
# Verify TOTP
import pyotp
totp = pyotp.TOTP(user["mfa_secret"])
if not totp.verify(totp_code, valid_window=1):
await self.audit.log(
action="mfa_failed",
user_id=session["user_id"]
)
raise AuthenticationError("Invalid MFA code")
# Update session
await self._update_session_mfa(session_id)
await self.audit.log(
action="mfa_success",
user_id=session["user_id"]
)
session["mfa_verified"] = True
return AuthenticatedUser(**session)
async def create_tokens(
self,
user: AuthenticatedUser
) -> AuthToken:
"""
Create JWT access and refresh tokens.
"""
# Get signing key from secrets
signing_key = await self.secrets.get_secret("jwt/signing_key")
now = datetime.utcnow()
access_expires = now + timedelta(minutes=15)
refresh_expires = now + timedelta(days=7)
# Access token (short-lived)
access_payload = {
"sub": user.user_id,
"tenant_id": user.tenant_id,
"email": user.email,
"roles": user.roles,
"session_id": user.session_id,
"type": "access",
"iat": now,
"exp": access_expires
}
access_token = jwt.encode(
access_payload,
signing_key.value,
algorithm="HS256"
)
# Refresh token (longer-lived, minimal claims)
refresh_payload = {
"sub": user.user_id,
"session_id": user.session_id,
"type": "refresh",
"iat": now,
"exp": refresh_expires
}
refresh_token = jwt.encode(
refresh_payload,
signing_key.value,
algorithm="HS256"
)
return AuthToken(
access_token=access_token,
refresh_token=refresh_token,
expires_at=access_expires
)
async def validate_token(self, token: str) -> AuthenticatedUser:
"""
Validate a JWT token.
"""
signing_key = await self.secrets.get_secret("jwt/signing_key")
try:
payload = jwt.decode(
token,
signing_key.value,
algorithms=["HS256"]
)
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")
# Verify session is still valid
session = await self._get_session(payload["session_id"])
if not session:
raise AuthenticationError("Session expired")
return AuthenticatedUser(
user_id=payload["sub"],
tenant_id=payload["tenant_id"],
email=payload["email"],
roles=payload["roles"],
permissions=await self._get_permissions(payload["roles"]),
auth_method=AuthMethod.PASSWORD,
mfa_verified=True,
session_id=payload["session_id"]
)
async def _is_rate_limited(self, email: str, ip_address: str) -> bool:
"""Check if login attempts are rate limited."""
key = f"auth_attempts:{email}:{ip_address}"
attempts = await self.cache.get(key)
return attempts and int(attempts) >= 5
async def _record_failed_attempt(self, email: str, ip_address: str):
"""Record a failed login attempt."""
key = f"auth_attempts:{email}:{ip_address}"
await self.cache.incr(key)
await self.cache.expire(key, 900) # 15 minutes
async def _clear_failed_attempts(self, email: str, ip_address: str):
"""Clear failed attempts after successful login."""
key = f"auth_attempts:{email}:{ip_address}"
await self.cache.delete(key)
async def _get_permissions(self, roles: List[str]) -> List[str]:
"""Get permissions for roles."""
permissions = set()
for role in roles:
role_perms = await self.cache.get(f"role_permissions:{role}")
if role_perms:
permissions.update(role_perms)
return list(permissions)
async def _create_session(
self,
user: AuthenticatedUser,
ip_address: str,
user_agent: str
):
"""Create a new session."""
await self.cache.setex(
f"session:{user.session_id}",
86400 * 7, # 7 days
{
"user_id": user.user_id,
"tenant_id": user.tenant_id,
"email": user.email,
"roles": user.roles,
"mfa_verified": user.mfa_verified,
"ip_address": ip_address,
"user_agent": user_agent,
"created_at": datetime.utcnow().isoformat()
}
)
async def _get_session(self, session_id: str) -> Optional[dict]:
"""Get session by ID."""
return await self.cache.get(f"session:{session_id}")
async def _update_session_mfa(self, session_id: str):
"""Mark session as MFA verified."""
session = await self._get_session(session_id)
if session:
session["mfa_verified"] = True
await self.cache.setex(
f"session:{session_id}",
86400 * 7,
session
)
class AuthenticationError(Exception):
"""Authentication failed."""
pass
5.2 Authorization Service
# security/authorization.py
"""
Authorization service implementing RBAC and ABAC.
"""
from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class Permission(Enum):
"""System permissions."""
# User permissions
USER_READ = "user:read"
USER_WRITE = "user:write"
USER_DELETE = "user:delete"
# Resource permissions
RESOURCE_READ = "resource:read"
RESOURCE_WRITE = "resource:write"
RESOURCE_DELETE = "resource:delete"
# Admin permissions
ADMIN_ACCESS = "admin:access"
ADMIN_USERS = "admin:users"
ADMIN_BILLING = "admin:billing"
ADMIN_SETTINGS = "admin:settings"
# Tenant permissions
TENANT_MANAGE = "tenant:manage"
# Role definitions
ROLE_PERMISSIONS = {
"viewer": [
Permission.USER_READ,
Permission.RESOURCE_READ,
],
"editor": [
Permission.USER_READ,
Permission.RESOURCE_READ,
Permission.RESOURCE_WRITE,
],
"admin": [
Permission.USER_READ,
Permission.USER_WRITE,
Permission.RESOURCE_READ,
Permission.RESOURCE_WRITE,
Permission.RESOURCE_DELETE,
Permission.ADMIN_ACCESS,
Permission.ADMIN_USERS,
Permission.ADMIN_SETTINGS,
],
"owner": [
# All permissions
*[p for p in Permission],
],
}
@dataclass
class AuthorizationContext:
"""Context for authorization decisions."""
user_id: str
tenant_id: str
roles: List[str]
resource_id: Optional[str] = None
resource_type: Optional[str] = None
resource_owner_id: Optional[str] = None
resource_tenant_id: Optional[str] = None
class AuthorizationService:
"""
Handles authorization decisions.
Implements:
- Role-Based Access Control (RBAC)
- Attribute-Based Access Control (ABAC)
- Tenant isolation
"""
def __init__(self, db, cache, audit_logger):
self.db = db
self.cache = cache
self.audit = audit_logger
async def check_permission(
self,
context: AuthorizationContext,
required_permission: Permission
) -> bool:
"""
Check if user has a specific permission.
"""
# Get user's permissions from roles
user_permissions = set()
for role in context.roles:
role_perms = ROLE_PERMISSIONS.get(role, [])
user_permissions.update(role_perms)
has_permission = required_permission in user_permissions
# Audit log
await self.audit.log(
action="authorization_check",
user_id=context.user_id,
permission=required_permission.value,
granted=has_permission
)
return has_permission
async def check_resource_access(
self,
context: AuthorizationContext,
required_permission: Permission
) -> bool:
"""
Check if user can access a specific resource.
Enforces:
1. Tenant isolation (user can only access own tenant's resources)
2. Permission check
3. Resource-level policies
"""
# CRITICAL: Tenant isolation check
if context.resource_tenant_id and context.resource_tenant_id != context.tenant_id:
await self.audit.log(
action="authorization_denied",
reason="tenant_mismatch",
user_id=context.user_id,
user_tenant=context.tenant_id,
resource_tenant=context.resource_tenant_id
)
return False
# Check base permission
if not await self.check_permission(context, required_permission):
return False
# Check resource-specific policies
if context.resource_type:
policy_result = await self._check_resource_policy(
context, required_permission
)
if not policy_result:
return False
return True
async def _check_resource_policy(
self,
context: AuthorizationContext,
permission: Permission
) -> bool:
"""
Check resource-specific access policies.
For example: Users can only delete their own resources
"""
# Delete operations: Must be owner or admin
if permission in [Permission.RESOURCE_DELETE, Permission.USER_DELETE]:
is_owner = context.resource_owner_id == context.user_id
is_admin = "admin" in context.roles or "owner" in context.roles
if not (is_owner or is_admin):
await self.audit.log(
action="authorization_denied",
reason="not_owner_or_admin",
user_id=context.user_id,
resource_id=context.resource_id
)
return False
return True
async def get_accessible_resources(
self,
user_id: str,
tenant_id: str,
resource_type: str,
permission: Permission
) -> List[str]:
"""
Get list of resource IDs the user can access.
Used for filtering queries.
"""
# For read permission, return all tenant resources
if permission == Permission.RESOURCE_READ:
result = await self.db.fetch(
f"""
SELECT id FROM {resource_type}s
WHERE tenant_id = $1
""",
tenant_id
)
return [row["id"] for row in result]
# For write/delete, return owned resources + admin override
user = await self.db.fetchone(
"SELECT roles FROM users WHERE id = $1",
user_id
)
if "admin" in user["roles"] or "owner" in user["roles"]:
# Admins can access all tenant resources
result = await self.db.fetch(
f"SELECT id FROM {resource_type}s WHERE tenant_id = $1",
tenant_id
)
else:
# Regular users only their own
result = await self.db.fetch(
f"""
SELECT id FROM {resource_type}s
WHERE tenant_id = $1 AND owner_id = $2
""",
tenant_id, user_id
)
return [row["id"] for row in result]
def require_permission(permission: Permission):
"""
Decorator that enforces permission on endpoint.
"""
def decorator(func):
async def wrapper(*args, **kwargs):
# Get current user from context
request = kwargs.get("request")
user = request.state.user
context = AuthorizationContext(
user_id=user.user_id,
tenant_id=user.tenant_id,
roles=user.roles
)
auth_service = request.app.state.authorization
if not await auth_service.check_permission(context, permission):
raise PermissionDeniedError(
f"Permission denied: {permission.value}"
)
return await func(*args, **kwargs)
return wrapper
return decorator
class PermissionDeniedError(Exception):
"""User doesn't have required permission."""
pass
Chapter 6: Security Middleware and Validation
6.1 Security Middleware
# security/middleware.py
"""
Security middleware for request/response protection.
"""
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import logging
import time
logger = logging.getLogger(__name__)
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""
Adds security headers to all responses.
"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Prevent clickjacking
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Enable XSS filter
response.headers["X-XSS-Protection"] = "1; mode=block"
# Content Security Policy
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self'; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none';"
)
# Strict Transport Security
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
# Referrer Policy
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Permissions Policy
response.headers["Permissions-Policy"] = (
"accelerometer=(), camera=(), geolocation=(), "
"gyroscope=(), magnetometer=(), microphone=(), "
"payment=(), usb=()"
)
return response
class RequestLoggingMiddleware(BaseHTTPMiddleware):
"""
Logs all requests for security auditing.
"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Generate request ID
request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
# Log request
logger.info(
"Request started",
extra={
"request_id": request_id,
"method": request.method,
"path": request.url.path,
"client_ip": request.client.host,
"user_agent": request.headers.get("User-Agent"),
"tenant_id": getattr(request.state, "tenant_id", None),
"user_id": getattr(request.state, "user_id", None)
}
)
response = await call_next(request)
# Calculate duration
duration = time.time() - start_time
# Log response
logger.info(
"Request completed",
extra={
"request_id": request_id,
"status_code": response.status_code,
"duration_ms": round(duration * 1000, 2)
}
)
# Add request ID to response
response.headers["X-Request-ID"] = request_id
return response
class InputSanitizationMiddleware(BaseHTTPMiddleware):
"""
Sanitizes input to prevent injection attacks.
"""
# Patterns that might indicate attacks
SUSPICIOUS_PATTERNS = [
"<script",
"javascript:",
"onerror=",
"onclick=",
"UNION SELECT",
"DROP TABLE",
"'; --",
"${",
"{{",
]
async def dispatch(self, request: Request, call_next):
# Check query parameters
for key, value in request.query_params.items():
if self._is_suspicious(value):
logger.warning(
"Suspicious query parameter blocked",
extra={
"param": key,
"client_ip": request.client.host
}
)
return Response(
content="Bad request",
status_code=400
)
# For POST/PUT, check body
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
body_str = body.decode("utf-8", errors="ignore")
if self._is_suspicious(body_str):
logger.warning(
"Suspicious request body blocked",
extra={"client_ip": request.client.host}
)
return Response(
content="Bad request",
status_code=400
)
return await call_next(request)
def _is_suspicious(self, value: str) -> bool:
"""Check if value contains suspicious patterns."""
value_lower = value.lower()
for pattern in self.SUSPICIOUS_PATTERNS:
if pattern.lower() in value_lower:
return True
return False
6.2 Input Validation
# security/validation.py
"""
Input validation utilities.
"""
from pydantic import BaseModel, validator, EmailStr, constr
from typing import Optional, List
import re
import bleach
class SecureUserInput(BaseModel):
"""
Base model with security validations.
"""
class Config:
# Strip whitespace from strings
anystr_strip_whitespace = True
# Limit string length
max_anystr_length = 10000
@validator("*", pre=True)
def sanitize_strings(cls, v):
"""Sanitize string inputs."""
if isinstance(v, str):
# Remove null bytes
v = v.replace("\x00", "")
# Limit length
v = v[:10000]
return v
class CreateUserRequest(SecureUserInput):
"""Validated user creation request."""
email: EmailStr
name: constr(min_length=1, max_length=100)
password: constr(min_length=12, max_length=128)
@validator("name")
def validate_name(cls, v):
"""Validate name contains only allowed characters."""
if not re.match(r"^[\w\s\-'.]+$", v):
raise ValueError("Name contains invalid characters")
return v
@validator("password")
def validate_password(cls, v):
"""Validate password strength."""
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain uppercase letter")
if not re.search(r"[a-z]", v):
raise ValueError("Password must contain lowercase letter")
if not re.search(r"\d", v):
raise ValueError("Password must contain digit")
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
raise ValueError("Password must contain special character")
return v
class ContentInput(SecureUserInput):
"""Validated content input (allows some HTML)."""
title: constr(min_length=1, max_length=200)
body: constr(min_length=1, max_length=50000)
@validator("title")
def sanitize_title(cls, v):
"""Strip all HTML from title."""
return bleach.clean(v, tags=[], strip=True)
@validator("body")
def sanitize_body(cls, v):
"""Allow safe HTML in body."""
allowed_tags = [
"p", "br", "strong", "em", "u", "a", "ul", "ol", "li",
"h1", "h2", "h3", "h4", "h5", "h6", "blockquote", "code", "pre"
]
allowed_attrs = {
"a": ["href", "title"],
}
return bleach.clean(
v,
tags=allowed_tags,
attributes=allowed_attrs,
strip=True
)
def validate_uuid(value: str) -> bool:
"""Validate UUID format."""
uuid_pattern = re.compile(
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
re.IGNORECASE
)
return bool(uuid_pattern.match(value))
def validate_tenant_id(tenant_id: str) -> bool:
"""Validate tenant ID format and existence."""
if not tenant_id:
return False
if not re.match(r'^[a-z0-9_-]{3,50}$', tenant_id):
return False
return True
Part III: Real-World Application
Chapter 7: Case Studies
7.1 How Stripe Handles Security
STRIPE'S SECURITY ARCHITECTURE
Challenge:
βββ Process billions in payments
βββ PCI DSS Level 1 compliance
βββ Target for attackers
βββ Must be developer-friendly
Key Security Measures:
1. ENCRYPTION EVERYWHERE
βββ TLS 1.2+ required for all API calls
βββ Certificate pinning in SDKs
βββ All data encrypted at rest (AES-256)
βββ Card numbers encrypted with per-merchant keys
βββ HSMs for key management
2. TOKENIZATION
βββ Card numbers never hit merchant servers
βββ Stripe.js collects card details
βββ Token returned to merchant
βββ Token can only be used by that merchant
βββ Reduces merchant PCI scope
3. AUTHENTICATION
βββ API keys: Publishable (frontend) vs Secret (backend)
βββ Secret keys: Test vs Live modes
βββ Webhook signatures for verification
βββ OAuth for Connect platforms
4. MONITORING
βββ All API calls logged
βββ Radar for fraud detection
βββ Real-time anomaly detection
βββ Automatic blocking of suspicious activity
5. INFRASTRUCTURE
βββ Private data centers (not just cloud)
βββ Physical security controls
βββ Network segmentation
βββ Regular penetration testing
Lessons:
βββ Tokenization reduces scope of compliance
βββ Separate test and live credentials
βββ Make security invisible to developers
βββ Defense in depth at every layer
7.2 How Google Handles Zero Trust
GOOGLE'S BEYONDCORP (ZERO TRUST)
Background:
βββ Google was targeted by Operation Aurora (2009)
βββ Realized perimeter security insufficient
βββ Invented BeyondCorp (now industry standard)
Key Principles:
1. NO PRIVILEGED NETWORK
βββ Internal network same trust as internet
βββ No VPN for accessing internal apps
βββ All access through Access Proxy
βββ Location doesn't determine access
2. DEVICE TRUST
βββ All devices must be managed
βββ Device inventory maintained
βββ Device health checked continuously
βββ Unmanaged devices: limited access
3. USER TRUST
βββ Strong authentication (MFA required)
βββ Context-aware access decisions
βββ Session tokens, not passwords
βββ Continuous verification
4. ACCESS TIERS
βββ Level 1: Any authenticated user
βββ Level 2: Managed device required
βββ Level 3: Managed device + location
βββ Level 4: Full compliance required
βββ Access level per application
5. IMPLEMENTATION
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β User Device β
β β β
β βΌ β
β βββββββββββββββ β
β β Access Proxy β βββ All access flows through here β
β ββββββββ¬βββββββ β
β β β
β ββββββββ΄βββββββ β
β β β β
β βΌ βΌ β
β ββββββββ βββββββββ β
β βDeviceβ βAccess β βββ Makes access decision β
β βTrust β βControlβ β
β ββββββββ βEngine β β
β βββββ¬ββββ β
β β β
β βΌ β
β ββββββββββββ β
β β Internal β β
β β App β β
β ββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Lessons:
βββ Network location is not a security boundary
βββ Every access decision needs context
βββ Device health is as important as user identity
βββ Continuous verification, not point-in-time
Chapter 8: Common Mistakes
8.1 Security Anti-Patterns
SECURITY MISTAKES
β MISTAKE 1: Secrets in Code
Wrong:
# config.py
DATABASE_PASSWORD = "SuperSecret123"
API_KEY = "sk_live_abc123"
# .env committed to git
DATABASE_URL=postgres://admin:password@db:5432/app
Problem:
Secrets in git history forever
Anyone with repo access has credentials
Can't rotate without code change
Right:
# config.py
DATABASE_HOST = os.getenv("DB_HOST")
# Password fetched from vault at runtime
# Use secrets manager
password = await secrets_manager.get_secret("db_password")
β MISTAKE 2: Trusting Frontend Validation
Wrong:
@app.post("/api/transfer")
async def transfer(amount: float, to_account: str):
# Frontend validated this, we're good!
await db.execute(
"UPDATE accounts SET balance = balance - $1 WHERE id = $2",
amount, current_user.account_id
)
Problem:
Attacker bypasses frontend
Negative amount = free money
No authorization check
Right:
@app.post("/api/transfer")
async def transfer(request: TransferRequest):
# Server-side validation
if request.amount <= 0:
raise ValidationError("Amount must be positive")
if request.amount > MAX_TRANSFER:
raise ValidationError("Amount exceeds limit")
# Check authorization
if not await can_transfer(current_user, request.to_account):
raise PermissionError("Not authorized")
# Check balance
balance = await get_balance(current_user.account_id)
if balance < request.amount:
raise ValidationError("Insufficient funds")
# Execute transfer
await execute_transfer(...)
β MISTAKE 3: Missing Tenant Isolation
Wrong:
@app.get("/api/documents/{doc_id}")
async def get_document(doc_id: str):
# Fetch document by ID
doc = await db.fetchone(
"SELECT * FROM documents WHERE id = $1",
doc_id
)
return doc
Problem:
Any user can access any document
Just guess document IDs
Complete data breach
Right:
@app.get("/api/documents/{doc_id}")
async def get_document(doc_id: str, user: User = Depends(get_current_user)):
# ALWAYS filter by tenant
doc = await db.fetchone(
"SELECT * FROM documents WHERE id = $1 AND tenant_id = $2",
doc_id, user.tenant_id
)
if not doc:
raise NotFoundError("Document not found")
return doc
β MISTAKE 4: Logging Sensitive Data
Wrong:
logger.info(f"User login: {email}, password: {password}")
logger.info(f"Payment processed: {credit_card_number}")
logger.info(f"API request: {request.headers}") # Contains auth token
Problem:
Passwords in logs!
PCI violation (card numbers)
Tokens can be stolen from logs
Right:
logger.info(f"User login attempt", extra={"email": email})
# Never log passwords
logger.info(f"Payment processed", extra={
"last_four": card[-4:],
"amount": amount
})
# Sanitize headers before logging
safe_headers = sanitize_headers(request.headers)
logger.info(f"API request", extra={"headers": safe_headers})
β MISTAKE 5: Overly Permissive CORS
Wrong:
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Any origin!
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Problem:
Any website can make API calls
Combined with credentials = disaster
CSRF attacks possible
Right:
app.add_middleware(
CORSMiddleware,
allow_origins=[
"https://app.example.com",
"https://admin.example.com",
],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
Part IV: Interview Preparation
Chapter 9: Interview Tips
9.1 Security Discussion Framework
DISCUSSING SECURITY IN INTERVIEWS
When security comes up:
1. START WITH THREAT MODEL
"First, let me identify what we're protecting against:
- External attackers (internet)
- Malicious users (authenticated but hostile)
- Internal threats (compromised employee)
- Data breaches (encryption focus)"
2. APPLY DEFENSE IN DEPTH
"I'd implement security at multiple layers:
- Network: VPC, security groups, WAF
- Application: Auth, authz, validation
- Data: Encryption at rest and in transit
- Monitoring: Audit logs, alerting"
3. DISCUSS SPECIFIC CONTROLS
"For authentication, I'd use:
- Password hashing with bcrypt (cost 12+)
- JWT tokens with short expiry
- MFA for sensitive operations
- Rate limiting on login"
4. ADDRESS SECRETS
"Secrets management is critical:
- Never in code or environment variables
- Use Vault or AWS Secrets Manager
- Rotate credentials automatically
- Separate dev/prod credentials"
5. MENTION COMPLIANCE
"Depending on the domain:
- PCI DSS for payments (tokenization)
- HIPAA for health (encryption, audit logs)
- SOC 2 for SaaS (access controls)
These drive specific requirements"
9.2 Key Phrases
SECURITY KEY PHRASES
On Defense in Depth:
"I design with defense in depth - assuming any single layer might
fail. Even if an attacker bypasses the firewall, they still face
application authentication, encryption, and monitoring. No single
point of failure."
On Zero Trust:
"I follow zero trust principles - never trust, always verify.
Network location doesn't grant access. Every request is authenticated
and authorized, whether it comes from inside or outside the network."
On Secrets Management:
"Secrets never go in code or environment variables. I use a secrets
manager like Vault, with automatic rotation. Applications fetch
secrets at runtime, so rotation doesn't require restarts."
On Authentication:
"For authentication, I'd use JWT tokens with short expiry (15 minutes)
and refresh tokens for longer sessions. Passwords are hashed with
bcrypt, never stored plaintext. MFA is required for admin access."
On Encryption:
"I implement encryption at multiple levels: TLS for transit, AES-256
for storage, and field-level encryption for sensitive data like SSNs.
Keys are managed through KMS with automatic rotation."
Chapter 10: Practice Problems
Problem 1: Secure API Design
Scenario: You're designing an API for a banking application that lets users view accounts and transfer money.
Questions:
- How do you authenticate users?
- How do you prevent unauthorized transfers?
- How do you protect against common attacks?
- OAuth 2.0 or JWT with MFA
- Transaction signing or step-up authentication
- Rate limiting, input validation, CSRF protection
- Audit logging for all transactions
- Amount limits and velocity checks
Problem 2: Multi-Tenant Security
Scenario: Your SaaS platform stores sensitive data for multiple customers. One customer is a competitor of another.
Questions:
- How do you ensure data isolation?
- What if an engineer needs to debug a customer issue?
- How do you handle encryption keys?
- Row-level security or schema separation
- Just-in-time access with audit trails
- Per-tenant encryption keys
- No cross-tenant queries possible
- Support access requires customer approval
Chapter 11: Sample Interview Dialogue
Interviewer: "How would you secure a multi-tenant SaaS application?"
You: "I'd approach this with defense in depth, focusing on several key areas.
First, network security:"
Internet β WAF β ALB β Security Groups β App
- WAF blocks common attacks (OWASP Top 10)
- ALB terminates TLS, requires 1.2+
- Security groups: Only ALB can reach app servers
- App servers in private subnet, no public IP
"Second, tenant isolation. This is critical for multi-tenant:"
Every database query:
SELECT * FROM data WHERE tenant_id = :current_tenant_id
Enforced at:
1. Application layer (middleware adds tenant filter)
2. Database layer (RLS policies)
3. API layer (tenant from JWT, not request)
Cross-tenant queries are impossible by design.
Interviewer: "What about authentication and secrets?"
You: "For authentication, I'd implement:
- Password hashing: bcrypt with cost factor 12
- JWT tokens: 15-minute access, 7-day refresh
- MFA: Required for admin, optional for users
- Session management: Server-side session store in Redis
- Rate limiting: 5 failed attempts = 15-minute lockout
For secrets:"
WRONG: RIGHT:
config.py: Vault:
DB_PASS = "secret" βββ secrets/
βββ database/password
.env: βββ api/stripe_key
API_KEY=sk_live_xxx βββ jwt/signing_key
Application fetches secrets at runtime:
password = await vault.get("database/password")
Rotation: Vault rotates, app gets new secret on next fetch
Interviewer: "How do you handle a security incident?"
You: "I'd have several layers of detection and response:
Detection:
- Audit logs for all access (who, what, when)
- Anomaly detection (unusual access patterns)
- Failed authentication alerts
- Data access monitoring
Response:
- Automated blocking of suspicious IPs
- Session revocation capability
- Incident response runbook
- Communication templates for affected customers
For example, if we detect unusual data access:
- Alert fires β On-call gets paged
- Immediate: Revoke affected sessions
- Investigate: Query audit logs for scope
- Contain: Block attacker access
- Remediate: Patch vulnerability
- Communicate: Notify affected customers
- Postmortem: Prevent recurrence"
Summary
DAY 5 KEY TAKEAWAYS
DEFENSE IN DEPTH:
βββ Perimeter (WAF, DDoS, rate limiting)
βββ Network (VPC, security groups, private subnets)
βββ Application (auth, authz, validation)
βββ Data (encryption at rest and transit)
βββ Monitoring (audit logs, alerting)
ZERO TRUST:
βββ Never trust, always verify
βββ Network location doesn't grant access
βββ Every request authenticated/authorized
βββ Assume breach, limit blast radius
βββ Micro-segmentation
SECRETS MANAGEMENT:
βββ Never in code or environment variables
βββ Use Vault/Secrets Manager
βββ Automatic rotation
βββ Separate dev/prod credentials
βββ Fetch at runtime, not startup
ENCRYPTION:
βββ In transit: TLS 1.2+ everywhere
βββ At rest: AES-256 for storage
βββ Field-level: Sensitive data (SSN, etc.)
βββ Key hierarchy: Master β Tenant β Data
βββ HSM for key protection
AUTHENTICATION:
βββ bcrypt for passwords (cost 12+)
βββ JWT with short expiry
βββ MFA for sensitive operations
βββ Rate limiting on login
βββ Session management
AUTHORIZATION:
βββ RBAC for permissions
βββ Tenant isolation always
βββ Least privilege
βββ Resource-level policies
βββ Audit all access
COMMON MISTAKES:
βββ Secrets in code
βββ Trusting frontend validation
βββ Missing tenant isolation
βββ Logging sensitive data
βββ Overly permissive CORS
DEFAULT SECURITY POSTURE:
βββ Deny by default
βββ Validate all input
βββ Encrypt everything
βββ Log everything
βββ Rotate credentials
βββ Assume compromise
Further Reading
Standards and Frameworks:
- OWASP Top 10: https://owasp.org/Top10/
- NIST Cybersecurity Framework: https://www.nist.gov/cyberframework
- CIS Controls: https://www.cisecurity.org/controls
Zero Trust:
- Google BeyondCorp: https://cloud.google.com/beyondcorp
- NIST Zero Trust Architecture: https://csrc.nist.gov/publications/detail/sp/800-207/final
Tools:
- HashiCorp Vault: https://www.vaultproject.io/
- AWS Secrets Manager: https://aws.amazon.com/secrets-manager/
End of Day 5: Security Architecture
This Week Complete! Next week: Production Readiness and Operational Excellence β SLOs, Observability, Deployment, and Incident Management.