Himanshu Kukreja
0%
LearnSystem DesignWeek 9Security Architecture
Day 05

Week 9 β€” Day 5: Security Architecture

System Design Mastery Series β€” Multi-Tenancy, Security, and Compliance Week


Preface

You're reviewing a pull request when you notice something alarming:

THE SECURITY INCIDENT

Pull Request #4521: Add new analytics integration

+++ config/analytics.py
+ ANALYTICS_API_KEY = "sk_live_a8f7g9h2j3k4l5m6n7o8p9"
+ DATABASE_URL = "postgres://admin:SuperSecret123@prod-db.example.com:5432/app"

Your Slack lights up:

Security Bot: 🚨 ALERT: Credential detected in commit
Security Bot: Repository: backend-api
Security Bot: File: config/analytics.py
Security Bot: Detected: API key, Database password

You check the git history:

commit a1b2c3d (3 hours ago)
Author: junior.dev@company.com
Message: Add analytics integration

This has been in main for 3 hours.
Production deployed 2 hours ago.
The secret is now in:
β”œβ”€β”€ GitHub history (forever unless force-pushed)
β”œβ”€β”€ Docker image layers (pushed to registry)
β”œβ”€β”€ CI/CD logs (visible to team)
β”œβ”€β”€ Developer laptops (git pulled)
└── Any forks of the repo

Even if you delete it now, it's been exposed.

Questions:
β”œβ”€β”€ How did this happen? (No secrets scanning in CI)
β”œβ”€β”€ Why could a dev access production DB password? (No separation)
β”œβ”€β”€ Why is there a password at all? (Should use IAM roles)
└── How do we prevent this forever? (Security architecture)

Today, we'll build a security architecture that makes this class of mistake impossible.


Part I: Foundations

Chapter 1: Security Architecture Principles

1.1 Defense in Depth

Defense in depth means multiple layers of security controls, so that if one fails, others still protect the system.

DEFENSE IN DEPTH LAYERS

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                                                        β”‚
β”‚  LAYER 1: PERIMETER                                                    β”‚
β”‚  β”œβ”€β”€ WAF (Web Application Firewall)                                    β”‚
β”‚  β”œβ”€β”€ DDoS protection                                                   β”‚
β”‚  β”œβ”€β”€ Rate limiting                                                     β”‚
β”‚  └── IP allowlisting (for admin)                                       β”‚
β”‚                                                                        β”‚
β”‚  LAYER 2: NETWORK                                                      β”‚
β”‚  β”œβ”€β”€ VPC isolation                                                     β”‚
β”‚  β”œβ”€β”€ Security groups                                                   β”‚
β”‚  β”œβ”€β”€ Private subnets for databases                                     β”‚
β”‚  └── Network ACLs                                                      β”‚
β”‚                                                                        β”‚
β”‚  LAYER 3: APPLICATION                                                  β”‚
β”‚  β”œβ”€β”€ Authentication (who are you?)                                     β”‚
β”‚  β”œβ”€β”€ Authorization (what can you do?)                                  β”‚
β”‚  β”œβ”€β”€ Input validation                                                  β”‚
β”‚  └── Output encoding                                                   β”‚
β”‚                                                                        β”‚
β”‚  LAYER 4: DATA                                                         β”‚
β”‚  β”œβ”€β”€ Encryption at rest                                                β”‚
β”‚  β”œβ”€β”€ Encryption in transit                                             β”‚
β”‚  β”œβ”€β”€ Field-level encryption                                            β”‚
β”‚  └── Key management                                                    β”‚
β”‚                                                                        β”‚
β”‚  LAYER 5: MONITORING                                                   β”‚
β”‚  β”œβ”€β”€ Audit logging                                                     β”‚
β”‚  β”œβ”€β”€ Intrusion detection                                               β”‚
β”‚  β”œβ”€β”€ Anomaly detection                                                 β”‚
β”‚  └── Incident response                                                 β”‚
β”‚                                                                        β”‚
β”‚  Each layer assumes other layers might fail                            β”‚
β”‚                                                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

1.2 Zero Trust Architecture

Zero Trust means "never trust, always verify" - every request must be authenticated and authorized regardless of network location.

ZERO TRUST PRINCIPLES

TRADITIONAL (PERIMETER-BASED):
─────────────────────────────
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚           Corporate Network             β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”          β”‚
β”‚  β”‚ App │───│ DB  │───│ API β”‚  TRUSTED β”‚
β”‚  β””β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”˜          β”‚
β”‚                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
              [Firewall]
                   β”‚
              UNTRUSTED
                   β”‚
              [Internet]

Problem: Once inside, attacker has free access


ZERO TRUST:
───────────
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”       β”Œβ”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ App │──?──▢ β”‚ DB  β”‚ ◀──?──│ API β”‚  β”‚
β”‚  β””β”€β”€β”¬β”€β”€β”˜       β””β”€β”€β”¬β”€β”€β”˜       β””β”€β”€β”¬β”€β”€β”˜  β”‚
β”‚     β”‚             β”‚             β”‚      β”‚
β”‚     β–Ό             β–Ό             β–Ό      β”‚
β”‚  [Auth]        [Auth]        [Auth]   β”‚
β”‚                                         β”‚
β”‚  Every connection authenticated         β”‚
β”‚  Every request authorized               β”‚
β”‚  No implicit trust                      β”‚
β”‚                                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Key principles:
β”œβ”€β”€ Verify explicitly (every request)
β”œβ”€β”€ Use least privilege access
β”œβ”€β”€ Assume breach (design for compromise)
└── Micro-segmentation (isolate everything)

1.3 Principle of Least Privilege

LEAST PRIVILEGE IN PRACTICE

❌ WRONG: Over-privileged access

Developer laptop:
β”œβ”€β”€ AWS Admin access
β”œβ”€β”€ Production database credentials
β”œβ”€β”€ All API keys
└── Root access to servers

Problem: If laptop compromised, attacker has everything


βœ“ RIGHT: Minimal necessary access

Developer laptop:
β”œβ”€β”€ AWS access: Dev account only, read-only prod
β”œβ”€β”€ Database: Dev database only, no prod
β”œβ”€β”€ API keys: Test keys only
└── Servers: No direct access, use bastion + MFA

Production services:
β”œβ”€β”€ App server: Can read DB, can't modify schema
β”œβ”€β”€ Worker: Can write to specific queues
β”œβ”€β”€ Analytics: Read-only replica access
└── Each service: Only what it needs


IMPLEMENTING LEAST PRIVILEGE:
β”œβ”€β”€ Role-based access control (RBAC)
β”œβ”€β”€ Just-in-time access (temporary elevation)
β”œβ”€β”€ Regular access reviews (remove unused)
β”œβ”€β”€ Separate environments (dev/staging/prod)
└── Service accounts per function

Chapter 2: Trust Boundaries and Threat Modeling

2.1 Identifying Trust Boundaries

TRUST BOUNDARY MAP

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                          TRUST BOUNDARIES                              β”‚
β”‚                                                                        β”‚
β”‚  ════════════════════ INTERNET (Untrusted) ════════════════════        β”‚
β”‚                              β”‚                                         β”‚
β”‚                         [Boundary 1]                                   β”‚
β”‚                              β”‚                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚                     DMZ / Edge                       β”‚              β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚              β”‚
β”‚  β”‚  β”‚   CDN   β”‚   β”‚   WAF   β”‚   β”‚   ALB   β”‚             β”‚              β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚                              β”‚                                         β”‚
β”‚                         [Boundary 2]                                   β”‚
β”‚                              β”‚                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚                  Application Tier                    β”‚              β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚              β”‚
β”‚  β”‚  β”‚   API   β”‚   β”‚ Worker  β”‚   β”‚  Admin  β”‚             β”‚              β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚                              β”‚                                         β”‚
β”‚                         [Boundary 3]                                   β”‚
β”‚                              β”‚                                         β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”              β”‚
β”‚  β”‚                     Data Tier                        β”‚              β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”             β”‚              β”‚
β”‚  β”‚  β”‚   DB    β”‚   β”‚  Cache  β”‚   β”‚   S3    β”‚             β”‚              β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜             β”‚              β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜              β”‚
β”‚                                                                        β”‚
β”‚  RULES:                                                                β”‚
β”‚  β€’ Data crossing boundaries must be validated                          β”‚
β”‚  β€’ Each boundary requires authentication                               β”‚
β”‚  β€’ Encryption required at every boundary                               β”‚
β”‚  β€’ Log all cross-boundary access                                       β”‚
β”‚                                                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2.2 STRIDE Threat Modeling

STRIDE THREAT MODEL

For each component, consider:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ THREAT          β”‚ DESCRIPTION            β”‚ MITIGATION                  β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Spoofing        β”‚ Pretending to be       β”‚ Strong authentication       β”‚
β”‚                 β”‚ someone else           β”‚ MFA, certificates           β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Tampering       β”‚ Modifying data or      β”‚ Input validation            β”‚
β”‚                 β”‚ code                   β”‚ Integrity checks, signing   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Repudiation     β”‚ Denying actions        β”‚ Audit logging               β”‚
β”‚                 β”‚ taken                  β”‚ Non-repudiation controls    β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Information     β”‚ Exposing data to       β”‚ Encryption                  β”‚
β”‚ Disclosure      β”‚ unauthorized parties   β”‚ Access controls             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Denial of       β”‚ Making system          β”‚ Rate limiting               β”‚
β”‚ Service         β”‚ unavailable            β”‚ Redundancy, scaling         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ Elevation of    β”‚ Gaining unauthorized   β”‚ Least privilege             β”‚
β”‚ Privilege       β”‚ access                 β”‚ Input validation            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

EXAMPLE: API Endpoint Threat Model

Component: POST /api/users
β”œβ”€β”€ Spoofing: Attacker creates account as another user
β”‚   └── Mitigation: Email verification, CAPTCHA
β”œβ”€β”€ Tampering: SQL injection in user data
β”‚   └── Mitigation: Parameterized queries, validation
β”œβ”€β”€ Repudiation: User denies creating account
β”‚   └── Mitigation: Audit log with IP, timestamp
β”œβ”€β”€ Information Disclosure: User data leaked
β”‚   └── Mitigation: Field-level encryption, HTTPS
β”œβ”€β”€ DoS: Flood of account creation
β”‚   └── Mitigation: Rate limiting, CAPTCHA
└── Elevation: Create admin account
    └── Mitigation: No role in registration, admin approval

Chapter 3: Encryption Strategy

3.1 Encryption Layers

ENCRYPTION AT EVERY LAYER

LAYER 1: IN TRANSIT
────────────────────
Client ──[TLS 1.3]──▢ Load Balancer ──[mTLS]──▢ Service ──[TLS]──▢ Database

Requirements:
β”œβ”€β”€ TLS 1.3 for external connections
β”œβ”€β”€ mTLS between services (mutual authentication)
β”œβ”€β”€ No plaintext internal traffic
└── Certificate rotation automated


LAYER 2: AT REST
────────────────
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                                                        β”‚
β”‚  DATABASE:                                                             β”‚
β”‚  β”œβ”€β”€ Transparent Data Encryption (TDE)                                 β”‚
β”‚  β”œβ”€β”€ Encrypted with AWS KMS key                                        β”‚
β”‚  └── Automatic, no application changes                                 β”‚
β”‚                                                                        β”‚
β”‚  FILE STORAGE (S3):                                                    β”‚
β”‚  β”œβ”€β”€ Server-side encryption (SSE-KMS)                                  β”‚
β”‚  β”œβ”€β”€ Client-side encryption for sensitive files                        β”‚
β”‚  └── Bucket policy enforces encryption                                 β”‚
β”‚                                                                        β”‚
β”‚  BACKUPS:                                                              β”‚
β”‚  β”œβ”€β”€ Encrypted with separate key                                       β”‚
β”‚  β”œβ”€β”€ Key escrowed for disaster recovery                                β”‚
β”‚  └── Cross-region replicas also encrypted                              β”‚
β”‚                                                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜


LAYER 3: APPLICATION-LEVEL (Field Encryption)
─────────────────────────────────────────────
Encrypt sensitive fields before storing:

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                                                       β”‚
β”‚  users table:                                                         β”‚
β”‚  β”œβ”€β”€ id: 12345 (plaintext - for queries)                              β”‚
β”‚  β”œβ”€β”€ email: user@example.com (plaintext - for login)                  β”‚
β”‚  β”œβ”€β”€ ssn: ENC[AES256:abc123...] (encrypted)                           β”‚
β”‚  β”œβ”€β”€ credit_card: ENC[AES256:def456...] (encrypted)                   β”‚
β”‚  └── medical_history: ENC[AES256:ghi789...] (encrypted)               β”‚
β”‚                                                                       β”‚
β”‚  Benefits:                                                            β”‚
β”‚  β”œβ”€β”€ Database admin can't read sensitive data                         β”‚
β”‚  β”œβ”€β”€ Backup exposure doesn't leak PII                                 β”‚
β”‚  └── Compliance (PCI, HIPAA) requirements met                         β”‚
β”‚                                                                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

3.2 Key Management Hierarchy

KEY MANAGEMENT HIERARCHY

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                                                                        β”‚
β”‚                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                 β”‚
β”‚                    β”‚   Master Key    β”‚                                 β”‚
β”‚                    β”‚  (HSM-backed)   β”‚                                 β”‚
β”‚                    β”‚   Never leaves  β”‚                                 β”‚
β”‚                    β”‚      HSM        β”‚                                 β”‚
β”‚                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                 β”‚
β”‚                             β”‚                                          β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                           β”‚
β”‚              β”‚              β”‚              β”‚                           β”‚
β”‚              β–Ό              β–Ό              β–Ό                           β”‚
β”‚      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚
β”‚      β”‚  Tenant   β”‚  β”‚  Tenant   β”‚  β”‚  Service  β”‚                       β”‚
β”‚      β”‚  Key A    β”‚  β”‚  Key B    β”‚  β”‚   Keys    β”‚                       β”‚
β”‚      β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜                       β”‚
β”‚            β”‚              β”‚              β”‚                             β”‚
β”‚            β–Ό              β–Ό              β–Ό                             β”‚
β”‚      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                       β”‚
β”‚      β”‚Data Keys  β”‚  β”‚Data Keys  β”‚  β”‚Data Keys  β”‚                       β”‚
β”‚      β”‚(per-row)  β”‚  β”‚(per-row)  β”‚  β”‚(per-job)  β”‚                       β”‚
β”‚      β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                       β”‚
β”‚                                                                        β”‚
β”‚  HIERARCHY BENEFITS:                                                   β”‚
β”‚  β”œβ”€β”€ Master key rotation doesn't re-encrypt all data                   β”‚
β”‚  β”œβ”€β”€ Tenant isolation: Tenant A can't decrypt Tenant B                 β”‚
β”‚  β”œβ”€β”€ Key per data item: Compromised key limits blast radius            β”‚
β”‚  └── HSM protection: Master key never exposed                          β”‚
β”‚                                                                        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Part II: Implementation

Chapter 4: Secrets Management

4.1 Secrets Management Service

# security/secrets.py

"""
Secrets management using AWS Secrets Manager / HashiCorp Vault.

Never store secrets in:
- Environment variables (visible in process lists)
- Config files (committed to git)
- Container images (visible in layers)
"""

from dataclasses import dataclass
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
import json
import logging

logger = logging.getLogger(__name__)


@dataclass
class Secret:
    """A secret value with metadata."""
    name: str
    value: str
    version: str
    created_at: datetime
    expires_at: Optional[datetime]
    metadata: Dict[str, Any]


class SecretsManager:
    """
    Centralized secrets management.
    
    Features:
    - Automatic rotation
    - Audit logging
    - Caching with TTL
    - Tenant isolation
    """
    
    def __init__(self, vault_client, cache, audit_logger):
        self.vault = vault_client
        self.cache = cache
        self.audit = audit_logger
        self._cache_ttl = 300  # 5 minutes
    
    async def get_secret(
        self,
        secret_name: str,
        tenant_id: Optional[str] = None
    ) -> Secret:
        """
        Retrieve a secret.
        
        Secrets are cached locally to reduce Vault calls,
        but cache TTL ensures rotation takes effect.
        """
        # Build full path
        if tenant_id:
            path = f"tenants/{tenant_id}/secrets/{secret_name}"
        else:
            path = f"global/secrets/{secret_name}"
        
        # Check cache
        cache_key = f"secret:{path}"
        cached = await self.cache.get(cache_key)
        
        if cached:
            return Secret(**json.loads(cached))
        
        # Fetch from Vault
        try:
            result = await self.vault.read(path)
            
            secret = Secret(
                name=secret_name,
                value=result["data"]["value"],
                version=result["metadata"]["version"],
                created_at=datetime.fromisoformat(result["metadata"]["created_time"]),
                expires_at=result["data"].get("expires_at"),
                metadata=result["metadata"]
            )
            
            # Cache it
            await self.cache.set(
                cache_key,
                json.dumps(secret.__dict__, default=str),
                ttl=self._cache_ttl
            )
            
            # Audit log
            await self.audit.log(
                action="secret_accessed",
                secret_name=secret_name,
                tenant_id=tenant_id
            )
            
            return secret
            
        except Exception as e:
            logger.error(f"Failed to retrieve secret {secret_name}: {e}")
            raise SecretNotFoundError(f"Secret not found: {secret_name}")
    
    async def set_secret(
        self,
        secret_name: str,
        value: str,
        tenant_id: Optional[str] = None,
        expires_in: Optional[timedelta] = None
    ) -> Secret:
        """
        Store a secret.
        """
        if tenant_id:
            path = f"tenants/{tenant_id}/secrets/{secret_name}"
        else:
            path = f"global/secrets/{secret_name}"
        
        data = {"value": value}
        
        if expires_in:
            data["expires_at"] = (datetime.utcnow() + expires_in).isoformat()
        
        result = await self.vault.write(path, data)
        
        # Invalidate cache
        await self.cache.delete(f"secret:{path}")
        
        # Audit log
        await self.audit.log(
            action="secret_updated",
            secret_name=secret_name,
            tenant_id=tenant_id
        )
        
        return await self.get_secret(secret_name, tenant_id)
    
    async def rotate_secret(
        self,
        secret_name: str,
        rotation_func,
        tenant_id: Optional[str] = None
    ):
        """
        Rotate a secret using provided rotation function.
        
        rotation_func should:
        1. Generate new secret value
        2. Update external system (e.g., database password)
        3. Return new value
        """
        logger.info(f"Rotating secret: {secret_name}")
        
        try:
            # Generate new secret
            new_value = await rotation_func()
            
            # Store new version
            await self.set_secret(secret_name, new_value, tenant_id)
            
            # Audit log
            await self.audit.log(
                action="secret_rotated",
                secret_name=secret_name,
                tenant_id=tenant_id
            )
            
            logger.info(f"Secret rotated: {secret_name}")
            
        except Exception as e:
            logger.error(f"Secret rotation failed: {e}")
            await self.audit.log(
                action="secret_rotation_failed",
                secret_name=secret_name,
                tenant_id=tenant_id,
                error=str(e)
            )
            raise


class SecretNotFoundError(Exception):
    """Raised when a secret is not found."""
    pass


# Database credential rotation
async def rotate_database_password(db_admin_client, username: str):
    """
    Rotate a database user's password.
    
    This is called by SecretsManager.rotate_secret()
    """
    import secrets
    
    # Generate new password
    new_password = secrets.token_urlsafe(32)
    
    # Update in database
    await db_admin_client.execute(
        f"ALTER USER {username} WITH PASSWORD %s",
        new_password
    )
    
    return new_password

4.2 Application Configuration Without Secrets

# security/config.py

"""
Application configuration that separates secrets from config.

Config: In code/environment (non-sensitive)
Secrets: In secrets manager (sensitive)
"""

from dataclasses import dataclass
from typing import Optional
import os


@dataclass
class DatabaseConfig:
    """Database configuration (secrets fetched separately)."""
    host: str
    port: int
    database: str
    ssl_mode: str = "require"
    pool_size: int = 10
    
    # Note: No password here!
    
    @classmethod
    def from_env(cls):
        return cls(
            host=os.getenv("DB_HOST", "localhost"),
            port=int(os.getenv("DB_PORT", "5432")),
            database=os.getenv("DB_NAME", "app"),
            ssl_mode=os.getenv("DB_SSL_MODE", "require"),
            pool_size=int(os.getenv("DB_POOL_SIZE", "10"))
        )


@dataclass  
class AppConfig:
    """Application configuration."""
    environment: str
    debug: bool
    log_level: str
    
    database: DatabaseConfig
    
    # Secrets are NOT in config
    # They're fetched at runtime from SecretsManager
    
    @classmethod
    def from_env(cls):
        return cls(
            environment=os.getenv("ENVIRONMENT", "development"),
            debug=os.getenv("DEBUG", "false").lower() == "true",
            log_level=os.getenv("LOG_LEVEL", "INFO"),
            database=DatabaseConfig.from_env()
        )


class SecureConnectionFactory:
    """
    Creates database connections with secrets from vault.
    
    Secrets are fetched at connection time, not startup time.
    This allows rotation without restart.
    """
    
    def __init__(self, config: DatabaseConfig, secrets_manager: SecretsManager):
        self.config = config
        self.secrets = secrets_manager
    
    async def create_connection(self):
        """Create a database connection with current credentials."""
        import asyncpg
        
        # Fetch current password from secrets manager
        secret = await self.secrets.get_secret("database/app_user_password")
        
        return await asyncpg.connect(
            host=self.config.host,
            port=self.config.port,
            database=self.config.database,
            user="app_user",
            password=secret.value,  # From vault, not config
            ssl=self.config.ssl_mode
        )
    
    async def create_pool(self):
        """Create a connection pool with credential refresh."""
        import asyncpg
        
        async def get_password():
            secret = await self.secrets.get_secret("database/app_user_password")
            return secret.value
        
        return await asyncpg.create_pool(
            host=self.config.host,
            port=self.config.port,
            database=self.config.database,
            user="app_user",
            password=await get_password(),
            ssl=self.config.ssl_mode,
            min_size=2,
            max_size=self.config.pool_size
        )

Chapter 5: Authentication and Authorization

5.1 Authentication Service

# security/authentication.py

"""
Authentication service with multiple methods.
"""

from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime, timedelta
from enum import Enum
import jwt
import bcrypt
import secrets
import logging

logger = logging.getLogger(__name__)


class AuthMethod(Enum):
    """Supported authentication methods."""
    PASSWORD = "password"
    API_KEY = "api_key"
    OAUTH = "oauth"
    SAML = "saml"
    MFA_TOTP = "mfa_totp"


@dataclass
class AuthenticatedUser:
    """Result of successful authentication."""
    user_id: str
    tenant_id: str
    email: str
    roles: List[str]
    permissions: List[str]
    auth_method: AuthMethod
    mfa_verified: bool
    session_id: str


@dataclass
class AuthToken:
    """JWT token with claims."""
    access_token: str
    refresh_token: str
    expires_at: datetime
    token_type: str = "Bearer"


class AuthenticationService:
    """
    Handles user authentication.
    
    Security features:
    - Password hashing with bcrypt
    - Rate limiting on failures
    - MFA support
    - Session management
    - Audit logging
    """
    
    def __init__(
        self,
        db,
        cache,
        secrets_manager,
        audit_logger
    ):
        self.db = db
        self.cache = cache
        self.secrets = secrets_manager
        self.audit = audit_logger
    
    async def authenticate_password(
        self,
        email: str,
        password: str,
        ip_address: str,
        user_agent: str
    ) -> Optional[AuthenticatedUser]:
        """
        Authenticate with email and password.
        """
        # Check rate limit
        if await self._is_rate_limited(email, ip_address):
            await self.audit.log(
                action="auth_rate_limited",
                email=email,
                ip_address=ip_address
            )
            raise AuthenticationError("Too many attempts. Try again later.")
        
        # Find user
        user = await self.db.fetchone(
            """
            SELECT id, tenant_id, email, password_hash, roles, 
                   mfa_enabled, status
            FROM users 
            WHERE email = $1
            """,
            email.lower()
        )
        
        if not user:
            await self._record_failed_attempt(email, ip_address)
            await self.audit.log(
                action="auth_failed",
                reason="user_not_found",
                email=email,
                ip_address=ip_address
            )
            raise AuthenticationError("Invalid credentials")
        
        # Check status
        if user["status"] != "active":
            await self.audit.log(
                action="auth_failed",
                reason="account_inactive",
                user_id=user["id"]
            )
            raise AuthenticationError("Account is not active")
        
        # Verify password
        if not bcrypt.checkpw(
            password.encode(),
            user["password_hash"].encode()
        ):
            await self._record_failed_attempt(email, ip_address)
            await self.audit.log(
                action="auth_failed",
                reason="invalid_password",
                user_id=user["id"],
                ip_address=ip_address
            )
            raise AuthenticationError("Invalid credentials")
        
        # Clear rate limit on success
        await self._clear_failed_attempts(email, ip_address)
        
        # Create session
        session_id = secrets.token_urlsafe(32)
        
        authenticated = AuthenticatedUser(
            user_id=user["id"],
            tenant_id=user["tenant_id"],
            email=user["email"],
            roles=user["roles"],
            permissions=await self._get_permissions(user["roles"]),
            auth_method=AuthMethod.PASSWORD,
            mfa_verified=not user["mfa_enabled"],  # False if MFA required
            session_id=session_id
        )
        
        # Store session
        await self._create_session(authenticated, ip_address, user_agent)
        
        # Audit log
        await self.audit.log(
            action="auth_success",
            user_id=user["id"],
            method="password",
            ip_address=ip_address,
            mfa_required=user["mfa_enabled"]
        )
        
        return authenticated
    
    async def verify_mfa(
        self,
        session_id: str,
        totp_code: str
    ) -> AuthenticatedUser:
        """
        Verify MFA TOTP code.
        """
        # Get session
        session = await self._get_session(session_id)
        
        if not session:
            raise AuthenticationError("Session not found")
        
        if session["mfa_verified"]:
            raise AuthenticationError("MFA already verified")
        
        # Get user's TOTP secret
        user = await self.db.fetchone(
            "SELECT mfa_secret FROM users WHERE id = $1",
            session["user_id"]
        )
        
        # Verify TOTP
        import pyotp
        totp = pyotp.TOTP(user["mfa_secret"])
        
        if not totp.verify(totp_code, valid_window=1):
            await self.audit.log(
                action="mfa_failed",
                user_id=session["user_id"]
            )
            raise AuthenticationError("Invalid MFA code")
        
        # Update session
        await self._update_session_mfa(session_id)
        
        await self.audit.log(
            action="mfa_success",
            user_id=session["user_id"]
        )
        
        session["mfa_verified"] = True
        return AuthenticatedUser(**session)
    
    async def create_tokens(
        self,
        user: AuthenticatedUser
    ) -> AuthToken:
        """
        Create JWT access and refresh tokens.
        """
        # Get signing key from secrets
        signing_key = await self.secrets.get_secret("jwt/signing_key")
        
        now = datetime.utcnow()
        access_expires = now + timedelta(minutes=15)
        refresh_expires = now + timedelta(days=7)
        
        # Access token (short-lived)
        access_payload = {
            "sub": user.user_id,
            "tenant_id": user.tenant_id,
            "email": user.email,
            "roles": user.roles,
            "session_id": user.session_id,
            "type": "access",
            "iat": now,
            "exp": access_expires
        }
        
        access_token = jwt.encode(
            access_payload,
            signing_key.value,
            algorithm="HS256"
        )
        
        # Refresh token (longer-lived, minimal claims)
        refresh_payload = {
            "sub": user.user_id,
            "session_id": user.session_id,
            "type": "refresh",
            "iat": now,
            "exp": refresh_expires
        }
        
        refresh_token = jwt.encode(
            refresh_payload,
            signing_key.value,
            algorithm="HS256"
        )
        
        return AuthToken(
            access_token=access_token,
            refresh_token=refresh_token,
            expires_at=access_expires
        )
    
    async def validate_token(self, token: str) -> AuthenticatedUser:
        """
        Validate a JWT token.
        """
        signing_key = await self.secrets.get_secret("jwt/signing_key")
        
        try:
            payload = jwt.decode(
                token,
                signing_key.value,
                algorithms=["HS256"]
            )
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Token expired")
        except jwt.InvalidTokenError:
            raise AuthenticationError("Invalid token")
        
        # Verify session is still valid
        session = await self._get_session(payload["session_id"])
        
        if not session:
            raise AuthenticationError("Session expired")
        
        return AuthenticatedUser(
            user_id=payload["sub"],
            tenant_id=payload["tenant_id"],
            email=payload["email"],
            roles=payload["roles"],
            permissions=await self._get_permissions(payload["roles"]),
            auth_method=AuthMethod.PASSWORD,
            mfa_verified=True,
            session_id=payload["session_id"]
        )
    
    async def _is_rate_limited(self, email: str, ip_address: str) -> bool:
        """Check if login attempts are rate limited."""
        key = f"auth_attempts:{email}:{ip_address}"
        attempts = await self.cache.get(key)
        return attempts and int(attempts) >= 5
    
    async def _record_failed_attempt(self, email: str, ip_address: str):
        """Record a failed login attempt."""
        key = f"auth_attempts:{email}:{ip_address}"
        await self.cache.incr(key)
        await self.cache.expire(key, 900)  # 15 minutes
    
    async def _clear_failed_attempts(self, email: str, ip_address: str):
        """Clear failed attempts after successful login."""
        key = f"auth_attempts:{email}:{ip_address}"
        await self.cache.delete(key)
    
    async def _get_permissions(self, roles: List[str]) -> List[str]:
        """Get permissions for roles."""
        permissions = set()
        
        for role in roles:
            role_perms = await self.cache.get(f"role_permissions:{role}")
            if role_perms:
                permissions.update(role_perms)
        
        return list(permissions)
    
    async def _create_session(
        self,
        user: AuthenticatedUser,
        ip_address: str,
        user_agent: str
    ):
        """Create a new session."""
        await self.cache.setex(
            f"session:{user.session_id}",
            86400 * 7,  # 7 days
            {
                "user_id": user.user_id,
                "tenant_id": user.tenant_id,
                "email": user.email,
                "roles": user.roles,
                "mfa_verified": user.mfa_verified,
                "ip_address": ip_address,
                "user_agent": user_agent,
                "created_at": datetime.utcnow().isoformat()
            }
        )
    
    async def _get_session(self, session_id: str) -> Optional[dict]:
        """Get session by ID."""
        return await self.cache.get(f"session:{session_id}")
    
    async def _update_session_mfa(self, session_id: str):
        """Mark session as MFA verified."""
        session = await self._get_session(session_id)
        if session:
            session["mfa_verified"] = True
            await self.cache.setex(
                f"session:{session_id}",
                86400 * 7,
                session
            )


class AuthenticationError(Exception):
    """Authentication failed."""
    pass

5.2 Authorization Service

# security/authorization.py

"""
Authorization service implementing RBAC and ABAC.
"""

from dataclasses import dataclass
from typing import List, Optional, Dict, Any
from enum import Enum
import logging

logger = logging.getLogger(__name__)


class Permission(Enum):
    """System permissions."""
    # User permissions
    USER_READ = "user:read"
    USER_WRITE = "user:write"
    USER_DELETE = "user:delete"
    
    # Resource permissions
    RESOURCE_READ = "resource:read"
    RESOURCE_WRITE = "resource:write"
    RESOURCE_DELETE = "resource:delete"
    
    # Admin permissions
    ADMIN_ACCESS = "admin:access"
    ADMIN_USERS = "admin:users"
    ADMIN_BILLING = "admin:billing"
    ADMIN_SETTINGS = "admin:settings"
    
    # Tenant permissions
    TENANT_MANAGE = "tenant:manage"


# Role definitions
ROLE_PERMISSIONS = {
    "viewer": [
        Permission.USER_READ,
        Permission.RESOURCE_READ,
    ],
    "editor": [
        Permission.USER_READ,
        Permission.RESOURCE_READ,
        Permission.RESOURCE_WRITE,
    ],
    "admin": [
        Permission.USER_READ,
        Permission.USER_WRITE,
        Permission.RESOURCE_READ,
        Permission.RESOURCE_WRITE,
        Permission.RESOURCE_DELETE,
        Permission.ADMIN_ACCESS,
        Permission.ADMIN_USERS,
        Permission.ADMIN_SETTINGS,
    ],
    "owner": [
        # All permissions
        *[p for p in Permission],
    ],
}


@dataclass
class AuthorizationContext:
    """Context for authorization decisions."""
    user_id: str
    tenant_id: str
    roles: List[str]
    resource_id: Optional[str] = None
    resource_type: Optional[str] = None
    resource_owner_id: Optional[str] = None
    resource_tenant_id: Optional[str] = None


class AuthorizationService:
    """
    Handles authorization decisions.
    
    Implements:
    - Role-Based Access Control (RBAC)
    - Attribute-Based Access Control (ABAC)
    - Tenant isolation
    """
    
    def __init__(self, db, cache, audit_logger):
        self.db = db
        self.cache = cache
        self.audit = audit_logger
    
    async def check_permission(
        self,
        context: AuthorizationContext,
        required_permission: Permission
    ) -> bool:
        """
        Check if user has a specific permission.
        """
        # Get user's permissions from roles
        user_permissions = set()
        
        for role in context.roles:
            role_perms = ROLE_PERMISSIONS.get(role, [])
            user_permissions.update(role_perms)
        
        has_permission = required_permission in user_permissions
        
        # Audit log
        await self.audit.log(
            action="authorization_check",
            user_id=context.user_id,
            permission=required_permission.value,
            granted=has_permission
        )
        
        return has_permission
    
    async def check_resource_access(
        self,
        context: AuthorizationContext,
        required_permission: Permission
    ) -> bool:
        """
        Check if user can access a specific resource.
        
        Enforces:
        1. Tenant isolation (user can only access own tenant's resources)
        2. Permission check
        3. Resource-level policies
        """
        # CRITICAL: Tenant isolation check
        if context.resource_tenant_id and context.resource_tenant_id != context.tenant_id:
            await self.audit.log(
                action="authorization_denied",
                reason="tenant_mismatch",
                user_id=context.user_id,
                user_tenant=context.tenant_id,
                resource_tenant=context.resource_tenant_id
            )
            return False
        
        # Check base permission
        if not await self.check_permission(context, required_permission):
            return False
        
        # Check resource-specific policies
        if context.resource_type:
            policy_result = await self._check_resource_policy(
                context, required_permission
            )
            if not policy_result:
                return False
        
        return True
    
    async def _check_resource_policy(
        self,
        context: AuthorizationContext,
        permission: Permission
    ) -> bool:
        """
        Check resource-specific access policies.
        
        For example: Users can only delete their own resources
        """
        # Delete operations: Must be owner or admin
        if permission in [Permission.RESOURCE_DELETE, Permission.USER_DELETE]:
            is_owner = context.resource_owner_id == context.user_id
            is_admin = "admin" in context.roles or "owner" in context.roles
            
            if not (is_owner or is_admin):
                await self.audit.log(
                    action="authorization_denied",
                    reason="not_owner_or_admin",
                    user_id=context.user_id,
                    resource_id=context.resource_id
                )
                return False
        
        return True
    
    async def get_accessible_resources(
        self,
        user_id: str,
        tenant_id: str,
        resource_type: str,
        permission: Permission
    ) -> List[str]:
        """
        Get list of resource IDs the user can access.
        
        Used for filtering queries.
        """
        # For read permission, return all tenant resources
        if permission == Permission.RESOURCE_READ:
            result = await self.db.fetch(
                f"""
                SELECT id FROM {resource_type}s 
                WHERE tenant_id = $1
                """,
                tenant_id
            )
            return [row["id"] for row in result]
        
        # For write/delete, return owned resources + admin override
        user = await self.db.fetchone(
            "SELECT roles FROM users WHERE id = $1",
            user_id
        )
        
        if "admin" in user["roles"] or "owner" in user["roles"]:
            # Admins can access all tenant resources
            result = await self.db.fetch(
                f"SELECT id FROM {resource_type}s WHERE tenant_id = $1",
                tenant_id
            )
        else:
            # Regular users only their own
            result = await self.db.fetch(
                f"""
                SELECT id FROM {resource_type}s 
                WHERE tenant_id = $1 AND owner_id = $2
                """,
                tenant_id, user_id
            )
        
        return [row["id"] for row in result]


def require_permission(permission: Permission):
    """
    Decorator that enforces permission on endpoint.
    """
    def decorator(func):
        async def wrapper(*args, **kwargs):
            # Get current user from context
            request = kwargs.get("request")
            user = request.state.user
            
            context = AuthorizationContext(
                user_id=user.user_id,
                tenant_id=user.tenant_id,
                roles=user.roles
            )
            
            auth_service = request.app.state.authorization
            
            if not await auth_service.check_permission(context, permission):
                raise PermissionDeniedError(
                    f"Permission denied: {permission.value}"
                )
            
            return await func(*args, **kwargs)
        
        return wrapper
    return decorator


class PermissionDeniedError(Exception):
    """User doesn't have required permission."""
    pass

Chapter 6: Security Middleware and Validation

6.1 Security Middleware

# security/middleware.py

"""
Security middleware for request/response protection.
"""

from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
import logging
import time

logger = logging.getLogger(__name__)


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """
    Adds security headers to all responses.
    """
    
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        # Prevent clickjacking
        response.headers["X-Frame-Options"] = "DENY"
        
        # Prevent MIME type sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"
        
        # Enable XSS filter
        response.headers["X-XSS-Protection"] = "1; mode=block"
        
        # Content Security Policy
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'self'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:; "
            "font-src 'self'; "
            "connect-src 'self' https://api.example.com; "
            "frame-ancestors 'none';"
        )
        
        # Strict Transport Security
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains; preload"
        )
        
        # Referrer Policy
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        
        # Permissions Policy
        response.headers["Permissions-Policy"] = (
            "accelerometer=(), camera=(), geolocation=(), "
            "gyroscope=(), magnetometer=(), microphone=(), "
            "payment=(), usb=()"
        )
        
        return response


class RequestLoggingMiddleware(BaseHTTPMiddleware):
    """
    Logs all requests for security auditing.
    """
    
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        # Generate request ID
        request_id = request.headers.get("X-Request-ID") or str(uuid.uuid4())
        
        # Log request
        logger.info(
            "Request started",
            extra={
                "request_id": request_id,
                "method": request.method,
                "path": request.url.path,
                "client_ip": request.client.host,
                "user_agent": request.headers.get("User-Agent"),
                "tenant_id": getattr(request.state, "tenant_id", None),
                "user_id": getattr(request.state, "user_id", None)
            }
        )
        
        response = await call_next(request)
        
        # Calculate duration
        duration = time.time() - start_time
        
        # Log response
        logger.info(
            "Request completed",
            extra={
                "request_id": request_id,
                "status_code": response.status_code,
                "duration_ms": round(duration * 1000, 2)
            }
        )
        
        # Add request ID to response
        response.headers["X-Request-ID"] = request_id
        
        return response


class InputSanitizationMiddleware(BaseHTTPMiddleware):
    """
    Sanitizes input to prevent injection attacks.
    """
    
    # Patterns that might indicate attacks
    SUSPICIOUS_PATTERNS = [
        "<script",
        "javascript:",
        "onerror=",
        "onclick=",
        "UNION SELECT",
        "DROP TABLE",
        "'; --",
        "${",
        "{{",
    ]
    
    async def dispatch(self, request: Request, call_next):
        # Check query parameters
        for key, value in request.query_params.items():
            if self._is_suspicious(value):
                logger.warning(
                    "Suspicious query parameter blocked",
                    extra={
                        "param": key,
                        "client_ip": request.client.host
                    }
                )
                return Response(
                    content="Bad request",
                    status_code=400
                )
        
        # For POST/PUT, check body
        if request.method in ["POST", "PUT", "PATCH"]:
            body = await request.body()
            body_str = body.decode("utf-8", errors="ignore")
            
            if self._is_suspicious(body_str):
                logger.warning(
                    "Suspicious request body blocked",
                    extra={"client_ip": request.client.host}
                )
                return Response(
                    content="Bad request",
                    status_code=400
                )
        
        return await call_next(request)
    
    def _is_suspicious(self, value: str) -> bool:
        """Check if value contains suspicious patterns."""
        value_lower = value.lower()
        
        for pattern in self.SUSPICIOUS_PATTERNS:
            if pattern.lower() in value_lower:
                return True
        
        return False

6.2 Input Validation

# security/validation.py

"""
Input validation utilities.
"""

from pydantic import BaseModel, validator, EmailStr, constr
from typing import Optional, List
import re
import bleach


class SecureUserInput(BaseModel):
    """
    Base model with security validations.
    """
    
    class Config:
        # Strip whitespace from strings
        anystr_strip_whitespace = True
        # Limit string length
        max_anystr_length = 10000
    
    @validator("*", pre=True)
    def sanitize_strings(cls, v):
        """Sanitize string inputs."""
        if isinstance(v, str):
            # Remove null bytes
            v = v.replace("\x00", "")
            # Limit length
            v = v[:10000]
        return v


class CreateUserRequest(SecureUserInput):
    """Validated user creation request."""
    
    email: EmailStr
    name: constr(min_length=1, max_length=100)
    password: constr(min_length=12, max_length=128)
    
    @validator("name")
    def validate_name(cls, v):
        """Validate name contains only allowed characters."""
        if not re.match(r"^[\w\s\-'.]+$", v):
            raise ValueError("Name contains invalid characters")
        return v
    
    @validator("password")
    def validate_password(cls, v):
        """Validate password strength."""
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain uppercase letter")
        if not re.search(r"[a-z]", v):
            raise ValueError("Password must contain lowercase letter")
        if not re.search(r"\d", v):
            raise ValueError("Password must contain digit")
        if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
            raise ValueError("Password must contain special character")
        return v


class ContentInput(SecureUserInput):
    """Validated content input (allows some HTML)."""
    
    title: constr(min_length=1, max_length=200)
    body: constr(min_length=1, max_length=50000)
    
    @validator("title")
    def sanitize_title(cls, v):
        """Strip all HTML from title."""
        return bleach.clean(v, tags=[], strip=True)
    
    @validator("body")
    def sanitize_body(cls, v):
        """Allow safe HTML in body."""
        allowed_tags = [
            "p", "br", "strong", "em", "u", "a", "ul", "ol", "li",
            "h1", "h2", "h3", "h4", "h5", "h6", "blockquote", "code", "pre"
        ]
        allowed_attrs = {
            "a": ["href", "title"],
        }
        return bleach.clean(
            v,
            tags=allowed_tags,
            attributes=allowed_attrs,
            strip=True
        )


def validate_uuid(value: str) -> bool:
    """Validate UUID format."""
    uuid_pattern = re.compile(
        r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$',
        re.IGNORECASE
    )
    return bool(uuid_pattern.match(value))


def validate_tenant_id(tenant_id: str) -> bool:
    """Validate tenant ID format and existence."""
    if not tenant_id:
        return False
    if not re.match(r'^[a-z0-9_-]{3,50}$', tenant_id):
        return False
    return True

Part III: Real-World Application

Chapter 7: Case Studies

7.1 How Stripe Handles Security

STRIPE'S SECURITY ARCHITECTURE

Challenge:
β”œβ”€β”€ Process billions in payments
β”œβ”€β”€ PCI DSS Level 1 compliance
β”œβ”€β”€ Target for attackers
β”œβ”€β”€ Must be developer-friendly

Key Security Measures:

1. ENCRYPTION EVERYWHERE
   β”œβ”€β”€ TLS 1.2+ required for all API calls
   β”œβ”€β”€ Certificate pinning in SDKs
   β”œβ”€β”€ All data encrypted at rest (AES-256)
   β”œβ”€β”€ Card numbers encrypted with per-merchant keys
   └── HSMs for key management

2. TOKENIZATION
   β”œβ”€β”€ Card numbers never hit merchant servers
   β”œβ”€β”€ Stripe.js collects card details
   β”œβ”€β”€ Token returned to merchant
   β”œβ”€β”€ Token can only be used by that merchant
   └── Reduces merchant PCI scope

3. AUTHENTICATION
   β”œβ”€β”€ API keys: Publishable (frontend) vs Secret (backend)
   β”œβ”€β”€ Secret keys: Test vs Live modes
   β”œβ”€β”€ Webhook signatures for verification
   └── OAuth for Connect platforms

4. MONITORING
   β”œβ”€β”€ All API calls logged
   β”œβ”€β”€ Radar for fraud detection
   β”œβ”€β”€ Real-time anomaly detection
   └── Automatic blocking of suspicious activity

5. INFRASTRUCTURE
   β”œβ”€β”€ Private data centers (not just cloud)
   β”œβ”€β”€ Physical security controls
   β”œβ”€β”€ Network segmentation
   └── Regular penetration testing

Lessons:
β”œβ”€β”€ Tokenization reduces scope of compliance
β”œβ”€β”€ Separate test and live credentials
β”œβ”€β”€ Make security invisible to developers
└── Defense in depth at every layer

7.2 How Google Handles Zero Trust

GOOGLE'S BEYONDCORP (ZERO TRUST)

Background:
β”œβ”€β”€ Google was targeted by Operation Aurora (2009)
β”œβ”€β”€ Realized perimeter security insufficient
β”œβ”€β”€ Invented BeyondCorp (now industry standard)

Key Principles:

1. NO PRIVILEGED NETWORK
   β”œβ”€β”€ Internal network same trust as internet
   β”œβ”€β”€ No VPN for accessing internal apps
   β”œβ”€β”€ All access through Access Proxy
   └── Location doesn't determine access

2. DEVICE TRUST
   β”œβ”€β”€ All devices must be managed
   β”œβ”€β”€ Device inventory maintained
   β”œβ”€β”€ Device health checked continuously
   └── Unmanaged devices: limited access

3. USER TRUST
   β”œβ”€β”€ Strong authentication (MFA required)
   β”œβ”€β”€ Context-aware access decisions
   β”œβ”€β”€ Session tokens, not passwords
   └── Continuous verification

4. ACCESS TIERS
   β”œβ”€β”€ Level 1: Any authenticated user
   β”œβ”€β”€ Level 2: Managed device required
   β”œβ”€β”€ Level 3: Managed device + location
   β”œβ”€β”€ Level 4: Full compliance required
   └── Access level per application

5. IMPLEMENTATION
   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
   β”‚                                                                β”‚
   β”‚    User Device                                                 β”‚
   β”‚         β”‚                                                      β”‚
   β”‚         β–Ό                                                      β”‚
   β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                             β”‚
   β”‚    β”‚ Access Proxy β”‚ ◀── All access flows through here          β”‚
   β”‚    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜                                             β”‚
   β”‚           β”‚                                                    β”‚
   β”‚    β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”                                             β”‚
   β”‚    β”‚             β”‚                                             β”‚
   β”‚    β–Ό             β–Ό                                             β”‚
   β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”                                          β”‚
   β”‚ β”‚Deviceβ”‚    β”‚Access β”‚ ◀── Makes access decision                β”‚
   β”‚ β”‚Trust β”‚    β”‚Controlβ”‚                                          β”‚
   β”‚ β””β”€β”€β”€β”€β”€β”€β”˜    β”‚Engine β”‚                                          β”‚
   β”‚             β””β”€β”€β”€β”¬β”€β”€β”€β”˜                                          β”‚
   β”‚                 β”‚                                              β”‚
   β”‚                 β–Ό                                              β”‚
   β”‚           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                         β”‚
   β”‚           β”‚ Internal β”‚                                         β”‚
   β”‚           β”‚   App    β”‚                                         β”‚
   β”‚           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                         β”‚
   β”‚                                                                β”‚
   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Lessons:
β”œβ”€β”€ Network location is not a security boundary
β”œβ”€β”€ Every access decision needs context
β”œβ”€β”€ Device health is as important as user identity
└── Continuous verification, not point-in-time

Chapter 8: Common Mistakes

8.1 Security Anti-Patterns

SECURITY MISTAKES

❌ MISTAKE 1: Secrets in Code

Wrong:
  # config.py
  DATABASE_PASSWORD = "SuperSecret123"
  API_KEY = "sk_live_abc123"
  
  # .env committed to git
  DATABASE_URL=postgres://admin:password@db:5432/app

Problem:
  Secrets in git history forever
  Anyone with repo access has credentials
  Can't rotate without code change

Right:
  # config.py
  DATABASE_HOST = os.getenv("DB_HOST")
  # Password fetched from vault at runtime
  
  # Use secrets manager
  password = await secrets_manager.get_secret("db_password")


❌ MISTAKE 2: Trusting Frontend Validation

Wrong:
  @app.post("/api/transfer")
  async def transfer(amount: float, to_account: str):
      # Frontend validated this, we're good!
      await db.execute(
          "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
          amount, current_user.account_id
      )

Problem:
  Attacker bypasses frontend
  Negative amount = free money
  No authorization check

Right:
  @app.post("/api/transfer")
  async def transfer(request: TransferRequest):
      # Server-side validation
      if request.amount <= 0:
          raise ValidationError("Amount must be positive")
      
      if request.amount > MAX_TRANSFER:
          raise ValidationError("Amount exceeds limit")
      
      # Check authorization
      if not await can_transfer(current_user, request.to_account):
          raise PermissionError("Not authorized")
      
      # Check balance
      balance = await get_balance(current_user.account_id)
      if balance < request.amount:
          raise ValidationError("Insufficient funds")
      
      # Execute transfer
      await execute_transfer(...)


❌ MISTAKE 3: Missing Tenant Isolation

Wrong:
  @app.get("/api/documents/{doc_id}")
  async def get_document(doc_id: str):
      # Fetch document by ID
      doc = await db.fetchone(
          "SELECT * FROM documents WHERE id = $1",
          doc_id
      )
      return doc

Problem:
  Any user can access any document
  Just guess document IDs
  Complete data breach

Right:
  @app.get("/api/documents/{doc_id}")
  async def get_document(doc_id: str, user: User = Depends(get_current_user)):
      # ALWAYS filter by tenant
      doc = await db.fetchone(
          "SELECT * FROM documents WHERE id = $1 AND tenant_id = $2",
          doc_id, user.tenant_id
      )
      
      if not doc:
          raise NotFoundError("Document not found")
      
      return doc


❌ MISTAKE 4: Logging Sensitive Data

Wrong:
  logger.info(f"User login: {email}, password: {password}")
  logger.info(f"Payment processed: {credit_card_number}")
  logger.info(f"API request: {request.headers}")  # Contains auth token

Problem:
  Passwords in logs!
  PCI violation (card numbers)
  Tokens can be stolen from logs

Right:
  logger.info(f"User login attempt", extra={"email": email})
  # Never log passwords
  
  logger.info(f"Payment processed", extra={
      "last_four": card[-4:],
      "amount": amount
  })
  
  # Sanitize headers before logging
  safe_headers = sanitize_headers(request.headers)
  logger.info(f"API request", extra={"headers": safe_headers})


❌ MISTAKE 5: Overly Permissive CORS

Wrong:
  app.add_middleware(
      CORSMiddleware,
      allow_origins=["*"],  # Any origin!
      allow_credentials=True,
      allow_methods=["*"],
      allow_headers=["*"],
  )

Problem:
  Any website can make API calls
  Combined with credentials = disaster
  CSRF attacks possible

Right:
  app.add_middleware(
      CORSMiddleware,
      allow_origins=[
          "https://app.example.com",
          "https://admin.example.com",
      ],
      allow_credentials=True,
      allow_methods=["GET", "POST", "PUT", "DELETE"],
      allow_headers=["Authorization", "Content-Type"],
  )

Part IV: Interview Preparation

Chapter 9: Interview Tips

9.1 Security Discussion Framework

DISCUSSING SECURITY IN INTERVIEWS

When security comes up:

1. START WITH THREAT MODEL
   "First, let me identify what we're protecting against:
    - External attackers (internet)
    - Malicious users (authenticated but hostile)
    - Internal threats (compromised employee)
    - Data breaches (encryption focus)"

2. APPLY DEFENSE IN DEPTH
   "I'd implement security at multiple layers:
    - Network: VPC, security groups, WAF
    - Application: Auth, authz, validation
    - Data: Encryption at rest and in transit
    - Monitoring: Audit logs, alerting"

3. DISCUSS SPECIFIC CONTROLS
   "For authentication, I'd use:
    - Password hashing with bcrypt (cost 12+)
    - JWT tokens with short expiry
    - MFA for sensitive operations
    - Rate limiting on login"

4. ADDRESS SECRETS
   "Secrets management is critical:
    - Never in code or environment variables
    - Use Vault or AWS Secrets Manager
    - Rotate credentials automatically
    - Separate dev/prod credentials"

5. MENTION COMPLIANCE
   "Depending on the domain:
    - PCI DSS for payments (tokenization)
    - HIPAA for health (encryption, audit logs)
    - SOC 2 for SaaS (access controls)
    These drive specific requirements"

9.2 Key Phrases

SECURITY KEY PHRASES

On Defense in Depth:
"I design with defense in depth - assuming any single layer might
fail. Even if an attacker bypasses the firewall, they still face
application authentication, encryption, and monitoring. No single
point of failure."

On Zero Trust:
"I follow zero trust principles - never trust, always verify.
Network location doesn't grant access. Every request is authenticated
and authorized, whether it comes from inside or outside the network."

On Secrets Management:
"Secrets never go in code or environment variables. I use a secrets
manager like Vault, with automatic rotation. Applications fetch
secrets at runtime, so rotation doesn't require restarts."

On Authentication:
"For authentication, I'd use JWT tokens with short expiry (15 minutes)
and refresh tokens for longer sessions. Passwords are hashed with
bcrypt, never stored plaintext. MFA is required for admin access."

On Encryption:
"I implement encryption at multiple levels: TLS for transit, AES-256
for storage, and field-level encryption for sensitive data like SSNs.
Keys are managed through KMS with automatic rotation."

Chapter 10: Practice Problems

Problem 1: Secure API Design

Scenario: You're designing an API for a banking application that lets users view accounts and transfer money.

Questions:

  1. How do you authenticate users?
  2. How do you prevent unauthorized transfers?
  3. How do you protect against common attacks?
  • OAuth 2.0 or JWT with MFA
  • Transaction signing or step-up authentication
  • Rate limiting, input validation, CSRF protection
  • Audit logging for all transactions
  • Amount limits and velocity checks

Problem 2: Multi-Tenant Security

Scenario: Your SaaS platform stores sensitive data for multiple customers. One customer is a competitor of another.

Questions:

  1. How do you ensure data isolation?
  2. What if an engineer needs to debug a customer issue?
  3. How do you handle encryption keys?
  • Row-level security or schema separation
  • Just-in-time access with audit trails
  • Per-tenant encryption keys
  • No cross-tenant queries possible
  • Support access requires customer approval

Chapter 11: Sample Interview Dialogue

Interviewer: "How would you secure a multi-tenant SaaS application?"

You: "I'd approach this with defense in depth, focusing on several key areas.

First, network security:"

Internet β†’ WAF β†’ ALB β†’ Security Groups β†’ App

- WAF blocks common attacks (OWASP Top 10)
- ALB terminates TLS, requires 1.2+
- Security groups: Only ALB can reach app servers
- App servers in private subnet, no public IP

"Second, tenant isolation. This is critical for multi-tenant:"

Every database query:
  SELECT * FROM data WHERE tenant_id = :current_tenant_id

Enforced at:
1. Application layer (middleware adds tenant filter)
2. Database layer (RLS policies)
3. API layer (tenant from JWT, not request)

Cross-tenant queries are impossible by design.

Interviewer: "What about authentication and secrets?"

You: "For authentication, I'd implement:

  • Password hashing: bcrypt with cost factor 12
  • JWT tokens: 15-minute access, 7-day refresh
  • MFA: Required for admin, optional for users
  • Session management: Server-side session store in Redis
  • Rate limiting: 5 failed attempts = 15-minute lockout

For secrets:"

WRONG:                          RIGHT:
config.py:                      Vault:
  DB_PASS = "secret"             └── secrets/
                                      β”œβ”€β”€ database/password
.env:                                 β”œβ”€β”€ api/stripe_key
  API_KEY=sk_live_xxx                └── jwt/signing_key

Application fetches secrets at runtime:
  password = await vault.get("database/password")
  
Rotation: Vault rotates, app gets new secret on next fetch

Interviewer: "How do you handle a security incident?"

You: "I'd have several layers of detection and response:

Detection:

  • Audit logs for all access (who, what, when)
  • Anomaly detection (unusual access patterns)
  • Failed authentication alerts
  • Data access monitoring

Response:

  • Automated blocking of suspicious IPs
  • Session revocation capability
  • Incident response runbook
  • Communication templates for affected customers

For example, if we detect unusual data access:

  1. Alert fires β†’ On-call gets paged
  2. Immediate: Revoke affected sessions
  3. Investigate: Query audit logs for scope
  4. Contain: Block attacker access
  5. Remediate: Patch vulnerability
  6. Communicate: Notify affected customers
  7. Postmortem: Prevent recurrence"

Summary

DAY 5 KEY TAKEAWAYS

DEFENSE IN DEPTH:
β”œβ”€β”€ Perimeter (WAF, DDoS, rate limiting)
β”œβ”€β”€ Network (VPC, security groups, private subnets)
β”œβ”€β”€ Application (auth, authz, validation)
β”œβ”€β”€ Data (encryption at rest and transit)
└── Monitoring (audit logs, alerting)

ZERO TRUST:
β”œβ”€β”€ Never trust, always verify
β”œβ”€β”€ Network location doesn't grant access
β”œβ”€β”€ Every request authenticated/authorized
β”œβ”€β”€ Assume breach, limit blast radius
└── Micro-segmentation

SECRETS MANAGEMENT:
β”œβ”€β”€ Never in code or environment variables
β”œβ”€β”€ Use Vault/Secrets Manager
β”œβ”€β”€ Automatic rotation
β”œβ”€β”€ Separate dev/prod credentials
└── Fetch at runtime, not startup

ENCRYPTION:
β”œβ”€β”€ In transit: TLS 1.2+ everywhere
β”œβ”€β”€ At rest: AES-256 for storage
β”œβ”€β”€ Field-level: Sensitive data (SSN, etc.)
β”œβ”€β”€ Key hierarchy: Master β†’ Tenant β†’ Data
└── HSM for key protection

AUTHENTICATION:
β”œβ”€β”€ bcrypt for passwords (cost 12+)
β”œβ”€β”€ JWT with short expiry
β”œβ”€β”€ MFA for sensitive operations
β”œβ”€β”€ Rate limiting on login
└── Session management

AUTHORIZATION:
β”œβ”€β”€ RBAC for permissions
β”œβ”€β”€ Tenant isolation always
β”œβ”€β”€ Least privilege
β”œβ”€β”€ Resource-level policies
└── Audit all access

COMMON MISTAKES:
β”œβ”€β”€ Secrets in code
β”œβ”€β”€ Trusting frontend validation
β”œβ”€β”€ Missing tenant isolation
β”œβ”€β”€ Logging sensitive data
└── Overly permissive CORS

DEFAULT SECURITY POSTURE:
β”œβ”€β”€ Deny by default
β”œβ”€β”€ Validate all input
β”œβ”€β”€ Encrypt everything
β”œβ”€β”€ Log everything
β”œβ”€β”€ Rotate credentials
└── Assume compromise

Further Reading

Standards and Frameworks:

Zero Trust:

Tools:


End of Day 5: Security Architecture

This Week Complete! Next week: Production Readiness and Operational Excellence β€” SLOs, Observability, Deployment, and Incident Management.