Course Navigation
← Back to Course OverviewAll Lessons
✓
Introduction and Database Fundamentals ✓
Building the Core Data Structure ✓
Concurrency and Thread Safety ✓
Append-Only Log (Write-Ahead Log) ✓
SSTables and LSM Trees ✓
Compaction and Optimization ✓
TCP Server and Protocol Design ✓
Client Library and Advanced Networking 9
Transactions and ACID Properties 10
Replication and High Availability 11
Monitoring, Metrics, and Observability 12
Performance Optimization and Tuning 13
Configuration and Deployment 14
Security and Production Hardening 15
Final Project and Beyond Current Lesson
9 of 15
Progress 60%
Transactions and ACID Properties
Learning Objectives
- • Master ACID properties and their implementation
- • Understand isolation levels and their trade-offs
- • Implement Multi-Version Concurrency Control (MVCC)
- • Build snapshot isolation for repeatable reads
- • Detect and prevent transaction conflicts
- • Implement deadlock detection and prevention
Lesson 9.1: Transaction Fundamentals
ACID Properties in Key-Value Stores
ACID guarantees are fundamental to databases. Let's understand each property and how to implement them in our key-value store.
Atomicity
All or nothing - transaction either fully completes or fully rolls back
Transfer $100 from Account A to Account B:
Step 1: Deduct $100 from A
Step 2: Add $100 to B
Both happen or neither happens.
If crash between steps: Rollback both. Consistency
Data remains in valid state
Constraint: Balance >= 0
Before: A=$50, B=$50
Transaction: Transfer $100 (would violate constraint)
Result: Transaction rejected (consistency preserved) Isolation
Concurrent transactions don't interfere
Transaction 1: Read Balance A
Transaction 2: Modify Balance A
Without isolation:
T1 reads $100
T2 modifies to $50
T1 uses stale $100 (inconsistency!)
With isolation:
T1 reads $100
T2 waits or sees snapshot
Consistent view Durability
Committed data survives crashes
Transaction: Transfer $100, committed
Power failure 1 second later
System restarts
Data still shows transfer: Durable! Isolation Levels
Different levels provide different guarantees vs performance trade-offs:
Read Uncommitted (Dirty Reads Allowed)
Transaction A: Read X=100
Transaction B: Modify X to 50 (not committed)
Transaction A: Sees X=50 (DIRTY READ!)
Transaction B: Rollback
Transaction A: Used stale data (consistency violation!) Isolation Level
Lowest
Consistency
Lowest
Performance
Highest
Read Committed (No Dirty Reads)
Transaction A: Read X=100
Transaction B: Modify X to 50, COMMIT
Transaction A: Read X again = 50 (NON-REPEATABLE READ)
Same transaction sees different values! Isolation Level
Medium
Consistency
Medium
Performance
Good
Repeatable Read (No Non-Repeatable Reads)
Transaction A: Read X=100
Transaction B: Modify X to 50, COMMIT
Transaction A: Read X again = 100 (REPEATABLE!)
But phantom records possible:
Transaction A: Range query (0-100)
Transaction B: Insert record at 75, COMMIT
Transaction A: Range query again = includes new record Isolation Level
High
Consistency
High
Performance
Moderate
Serializable (Highest Isolation)
Transactions execute as if serially, one after another.
Transaction A: Read/Write X, Y
Transaction B: Read/Write X, Y
One fully completes before other starts (or reorder to avoid conflict).
SLOWEST but SAFEST. Isolation Level
Highest
Consistency
Highest
Performance
Lowest
Multi-Version Concurrency Control (MVCC)
Problem: Locking reduces concurrency
Write lock held on row during transaction:
T1: Lock row → Read → Process → Update (long operation)
T2: Wants to read same row → BLOCKED for entire duration! Solution: MVCC - multiple versions allow readers and writers to coexist
Instead of locking, maintain multiple versions:
Row "user:1" versions:
v1 (tx_id=100): {"name": "alice", "age": 30}
v2 (tx_id=105): {"name": "alice", "age": 31}
v3 (tx_id=110): {"name": "alice", "age": 32}
Transaction A (start at v2):
Reads: Always sees v2 (consistent snapshot)
Transaction B (start at v3):
Modifies to v4: {"name": "alice", "age": 33}
Both proceed concurrently! Version Chain
Every row has version chain:
Key: user:1
v1 → v2 → v3 → v4 → ...
Each version tagged with:
- Transaction ID (who created it)
- Commit timestamp
- Visible to which transactions
Readers: Follow chain to find visible version
Writers: Append new version Snapshot Isolation
Snapshot Isolation: Each transaction operates on consistent snapshot
Global state at time T0:
user:1 = alice
user:2 = bob
user:3 = charlie
txn_id = 110
Transaction starts at T0:
Snapshot: {user:1=alice, user:2=bob, user:3=charlie}
All reads return from this snapshot
Modifications create new version
At T1, global state changes (other transactions):
user:1 = alice_new
user:2 = bob_new
txn_id = 115
Transaction still reads snapshot values:
Repeatable reads guaranteed!
But new transactions see new values Lesson 9.2: Implementation - Core Transaction Engine
Transaction Structure
package transaction
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
// TransactionID uniquely identifies transaction
type TransactionID uint64
// VersionNumber tracks versions for MVCC
type VersionNumber uint64
// Transaction represents active transaction
type Transaction struct {
ID TransactionID
StartTime time.Time
StartVersion VersionNumber
Status TxnStatus
ReadSet map[string]VersionNumber // key → version read
WriteSet map[string][]byte // key → new value
DeleteSet map[string]bool // keys marked for deletion
mu sync.RWMutex
}
type TxnStatus int
const (
TxnPending TxnStatus = iota
TxnCommitted
TxnRolledback
TxnAborted // Due to conflict
)
func (t *Transaction) String() string {
return fmt.Sprintf("Txn[id=%d, status=%d, startVer=%d]", t.ID, t.Status, t.StartVersion)
}
// VersionedValue stores multiple versions
type VersionedValue struct {
Version VersionNumber
Value []byte
CreatedBy TransactionID
CreatedAt time.Time
IsDeleted bool
}
// VersionChain manages all versions of a key
type VersionChain struct {
Key string
Versions []*VersionedValue
mu sync.RWMutex
}
// GetVisibleVersion returns version visible to transaction
func (vc *VersionChain) GetVisibleVersion(txn *Transaction) (*VersionedValue, error) {
vc.mu.RLock()
defer vc.mu.RUnlock()
// Walk backwards through versions (newest first)
for i := len(vc.Versions) - 1; i >= 0; i-- {
v := vc.Versions[i]
// Version must be created before transaction started
if v.Version <= txn.StartVersion {
if !v.IsDeleted {
return v, nil
}
// Key was deleted
return nil, fmt.Errorf("key deleted at version %d", v.Version)
}
}
return nil, fmt.Errorf("key not found")
}
// AppendVersion adds new version
func (vc *VersionChain) AppendVersion(v *VersionedValue) {
vc.mu.Lock()
defer vc.mu.Unlock()
vc.Versions = append(vc.Versions, v)
}
// CleanupOldVersions removes versions no longer needed
func (vc *VersionChain) CleanupOldVersions(oldestActiveVersion VersionNumber) int {
vc.mu.Lock()
defer vc.mu.Unlock()
keepIdx := 0
for i, v := range vc.Versions {
if v.Version >= oldestActiveVersion {
vc.Versions[keepIdx] = vc.Versions[i]
keepIdx++
}
}
removed := len(vc.Versions) - keepIdx
vc.Versions = vc.Versions[:keepIdx]
return removed
} Transaction Manager
// TransactionManager coordinates transactions
type TransactionManager struct {
nextTxnID uint64
nextVersionNumber VersionNumber
activeTxns map[TransactionID]*Transaction
versionChains map[string]*VersionChain
mu sync.RWMutex
conflictDetector *ConflictDetector
deadlockDetector *DeadlockDetector
undoLog *UndoLog
}
func NewTransactionManager() *TransactionManager {
return &TransactionManager{
activeTxns: make(map[TransactionID]*Transaction),
versionChains: make(map[string]*VersionChain),
conflictDetector: NewConflictDetector(),
deadlockDetector: NewDeadlockDetector(),
undoLog: NewUndoLog(),
}
}
// BeginTxn starts new transaction
func (tm *TransactionManager) BeginTxn(ctx context.Context) (*Transaction, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
txnID := TransactionID(atomic.AddUint64(&tm.nextTxnID, 1))
txn := &Transaction{
ID: txnID,
StartTime: time.Now(),
StartVersion: VersionNumber(atomic.LoadUint64((*uint64)(&tm.nextVersionNumber))),
Status: TxnPending,
ReadSet: make(map[string]VersionNumber),
WriteSet: make(map[string][]byte),
DeleteSet: make(map[string]bool),
}
tm.activeTxns[txnID] = txn
return txn, nil
}
// Get retrieves value within transaction
func (tm *TransactionManager) Get(txn *Transaction, key string) ([]byte, error) {
if txn.Status != TxnPending {
return nil, fmt.Errorf("transaction not pending")
}
txn.mu.Lock()
defer txn.mu.Unlock()
// Check write buffer first (read-your-write)
if val, ok := txn.WriteSet[key]; ok {
return val, nil
}
// Check delete buffer
if txn.DeleteSet[key] {
return nil, fmt.Errorf("key marked for deletion")
}
// Get version chain
tm.mu.RLock()
chain, exists := tm.versionChains[key]
tm.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("key not found")
}
// Get visible version
version, err := chain.GetVisibleVersion(txn)
if err != nil {
return nil, err
}
// Record read
txn.ReadSet[key] = version.Version
return version.Value, nil
}
// Put writes value (buffered)
func (tm *TransactionManager) Put(txn *Transaction, key string, value []byte) error {
if txn.Status != TxnPending {
return fmt.Errorf("transaction not pending")
}
txn.mu.Lock()
defer txn.mu.Unlock()
// Remove from delete set if present
delete(txn.DeleteSet, key)
// Add to write set
txn.WriteSet[key] = value
// Log to undo log for potential rollback
tm.undoLog.LogWrite(txn.ID, key, value)
return nil
}
// Delete marks key for deletion
func (tm *TransactionManager) Delete(txn *Transaction, key string) error {
if txn.Status != TxnPending {
return fmt.Errorf("transaction not pending")
}
txn.mu.Lock()
defer txn.mu.Unlock()
// Remove from write set if present
delete(txn.WriteSet, key)
// Add to delete set
txn.DeleteSet[key] = true
return nil
}
// CommitTxn validates and applies writes
func (tm *TransactionManager) CommitTxn(txn *Transaction) error {
if txn.Status != TxnPending {
return fmt.Errorf("transaction not pending: %v", txn.Status)
}
tm.mu.Lock()
defer tm.mu.Unlock()
// Check for conflicts
if err := tm.conflictDetector.CheckConflict(txn); err != nil {
txn.Status = TxnAborted
delete(tm.activeTxns, txn.ID)
return fmt.Errorf("conflict detected: %w", err)
}
// Check for deadlocks
if err := tm.deadlockDetector.CheckDeadlock(txn); err != nil {
txn.Status = TxnAborted
delete(tm.activeTxns, txn.ID)
return fmt.Errorf("deadlock detected: %w", err)
}
// Get new version number for all writes
newVersion := VersionNumber(atomic.AddUint64((*uint64)(&tm.nextVersionNumber), 1))
// Apply writes
for key, value := range txn.WriteSet {
tm.getOrCreateChain(key).AppendVersion(&VersionedValue{
Version: newVersion,
Value: value,
CreatedBy: txn.ID,
CreatedAt: time.Now(),
IsDeleted: false,
})
}
// Apply deletes
for key := range txn.DeleteSet {
tm.getOrCreateChain(key).AppendVersion(&VersionedValue{
Version: newVersion,
CreatedBy: txn.ID,
CreatedAt: time.Now(),
IsDeleted: true,
})
}
txn.Status = TxnCommitted
delete(tm.activeTxns, txn.ID)
return nil
}
// RollbackTxn discards transaction
func (tm *TransactionManager) RollbackTxn(txn *Transaction) error {
tm.mu.Lock()
defer tm.mu.Unlock()
txn.mu.Lock()
defer txn.mu.Unlock()
// Clear write/delete buffers
txn.WriteSet = make(map[string][]byte)
txn.DeleteSet = make(map[string]bool)
txn.ReadSet = make(map[string]VersionNumber)
txn.Status = TxnRolledback
delete(tm.activeTxns, txn.ID)
return nil
}
func (tm *TransactionManager) getOrCreateChain(key string) *VersionChain {
if chain, ok := tm.versionChains[key]; ok {
return chain
}
chain := &VersionChain{Key: key, Versions: make([]*VersionedValue, 0)}
tm.versionChains[key] = chain
return chain
} Conflict Detection
// ConflictDetector checks for transaction conflicts
type ConflictDetector struct {
readSet map[TransactionID]map[string]bool // txn → set of keys read
writeSet map[TransactionID]map[string]bool // txn → set of keys written
mu sync.RWMutex
}
func NewConflictDetector() *ConflictDetector {
return &ConflictDetector{
readSet: make(map[TransactionID]map[string]bool),
writeSet: make(map[TransactionID]map[string]bool),
}
}
// CheckConflict detects conflicts
func (cd *ConflictDetector) CheckConflict(txn *Transaction) error {
cd.mu.RLock()
defer cd.mu.RUnlock()
txn.mu.RLock()
defer txn.mu.RUnlock()
// Check write-write conflicts
for key := range txn.WriteSet {
for otherID, otherWrites := range cd.writeSet {
if otherID != txn.ID && otherWrites[key] {
return fmt.Errorf("write-write conflict on key %s with txn %d", key, otherID)
}
}
}
// Check write-read conflicts
for key := range txn.WriteSet {
for otherID, otherReads := range cd.readSet {
if otherID != txn.ID && otherReads[key] {
return fmt.Errorf("write-read conflict on key %s with txn %d", key, otherID)
}
}
}
// Check read-write conflicts
for key := range txn.ReadSet {
for otherID, otherWrites := range cd.writeSet {
if otherID != txn.ID && otherWrites[key] {
return fmt.Errorf("read-write conflict on key %s with txn %d", key, otherID)
}
}
}
return nil
} Deadlock Detection
// DeadlockDetector prevents circular wait
type DeadlockDetector struct {
waitGraph map[TransactionID]map[TransactionID]bool // txn A → txns it waits for
mu sync.RWMutex
}
func NewDeadlockDetector() *DeadlockDetector {
return &DeadlockDetector{
waitGraph: make(map[TransactionID]map[TransactionID]bool),
}
}
// RecordWait records that txnA waits for txnB
func (dd *DeadlockDetector) RecordWait(txnA, txnB TransactionID) error {
dd.mu.Lock()
defer dd.mu.Unlock()
if dd.waitGraph[txnA] == nil {
dd.waitGraph[txnA] = make(map[TransactionID]bool)
}
dd.waitGraph[txnA][txnB] = true
// Check for cycle
if dd.hasCycle(txnA) {
return fmt.Errorf("deadlock detected: cycle involving txn %d", txnA)
}
return nil
}
// hasCycle detects cycle using DFS
func (dd *DeadlockDetector) hasCycle(start TransactionID) bool {
visited := make(map[TransactionID]bool)
recStack := make(map[TransactionID]bool)
return dd.dfs(start, visited, recStack)
}
func (dd *DeadlockDetector) dfs(node TransactionID, visited, recStack map[TransactionID]bool) bool {
visited[node] = true
recStack[node] = true
for neighbor := range dd.waitGraph[node] {
if !visited[neighbor] {
if dd.dfs(neighbor, visited, recStack) {
return true
}
} else if recStack[neighbor] {
return true // Back edge = cycle
}
}
recStack[node] = false
return false
} Lesson 9.3: Usage Examples and Complete Workflows
Example 1: Simple Transaction
// Transfer money between accounts
func TransferMoney(tm *TransactionManager, fromKey, toKey string, amount int) error {
// Begin transaction
txn, err := tm.BeginTxn(context.Background())
if err != nil {
return err
}
// Read balances
fromBal, _ := tm.Get(txn, fromKey)
toBal, _ := tm.Get(txn, toKey)
fromVal := parseInt(fromBal)
toVal := parseInt(toBal)
// Validate
if fromVal < amount {
tm.RollbackTxn(txn)
return fmt.Errorf("insufficient funds")
}
// Update balances
tm.Put(txn, fromKey, formatInt(fromVal-amount))
tm.Put(txn, toKey, formatInt(toVal+amount))
// Commit
if err := tm.CommitTxn(txn); err != nil {
tm.RollbackTxn(txn)
return err
}
return nil
} Example 2: Read-Modify-Write with Retry
func IncrementCounter(tm *TransactionManager, key string, maxRetries int) error {
for attempt := 0; attempt < maxRetries; attempt++ {
txn, _ := tm.BeginTxn(context.Background())
// Read current value
val, _ := tm.Get(txn, key)
current := parseInt(val)
// Increment
tm.Put(txn, key, formatInt(current+1))
// Try to commit
err := tm.CommitTxn(txn)
if err == nil {
return nil
}
// If conflict, retry
if strings.Contains(err.Error(), "conflict") {
continue
}
return err
}
return fmt.Errorf("max retries exceeded")
} Example 3: Multi-Step Transaction
func ProcessOrder(tm *TransactionManager, orderedKeys []string) error {
txn, _ := tm.BeginTxn(context.Background())
defer func() {
if txn.Status == TxnPending {
tm.RollbackTxn(txn)
}
}()
// Step 1: Reserve inventory
for _, key := range orderedKeys {
qty, _ := tm.Get(txn, key)
current := parseInt(qty)
if current < 1 {
return fmt.Errorf("out of stock: %s", key)
}
tm.Put(txn, key, formatInt(current-1))
}
// Step 2: Create order record
orderID := generateID()
tm.Put(txn, "order:"+orderID, []byte("pending"))
// Step 3: Commit all changes atomically
return tm.CommitTxn(txn)
} Lab 9.1: Implement Complete MVCC Transaction Engine
Objective
Build production-ready transaction system with MVCC, conflict detection, and deadlock prevention.
Requirements
- • Transaction Support: Begin/commit/rollback operations, snapshot isolation
- • MVCC Implementation: Multiple versions per key, version chains with visibility
- • Conflict Detection: Write-write, write-read, read-write conflicts
- • Deadlock Prevention: Wait-for graph, cycle detection
- • Testing: Basic read/write transactions, snapshot isolation verification
- • Benchmarks: Transaction throughput, conflict rate impact, memory overhead
Starter Code
package transaction
import (
"context"
"fmt"
"sync"
"time"
)
// Transaction represents database transaction
type Transaction struct {
ID TransactionID
StartTime time.Time
StartVersion VersionNumber
Status TxnStatus
// TODO: Add read set, write set, delete set
}
// TransactionManager coordinates transactions
type TransactionManager struct {
// TODO: Add fields for transaction tracking, version management
}
// BeginTxn starts new transaction
// TODO: Implement with snapshot capture
func (tm *TransactionManager) BeginTxn(ctx context.Context) (*Transaction, error) {
return nil, nil
}
// Get reads value within transaction
// TODO: Implement with snapshot isolation
func (tm *TransactionManager) Get(txn *Transaction, key string) ([]byte, error) {
return nil, nil
}
// Put buffers write in transaction
// TODO: Implement write buffering
func (tm *TransactionManager) Put(txn *Transaction, key string, value []byte) error {
return nil
}
// Delete buffers delete in transaction
// TODO: Implement delete buffering
func (tm *TransactionManager) Delete(txn *Transaction, key string) error {
return nil
}
// CommitTxn validates and applies writes
// TODO: Implement conflict detection and commit
func (tm *TransactionManager) CommitTxn(txn *Transaction) error {
return nil
}
// RollbackTxn discards transaction
// TODO: Implement cleanup
func (tm *TransactionManager) RollbackTxn(txn *Transaction) error {
return nil
} Test Template
func TestSnapshotIsolation(t *testing.T) {
tm := NewTransactionManager()
// Setup: initial value
txn0, _ := tm.BeginTxn(context.Background())
tm.Put(txn0, "key", []byte("initial"))
tm.CommitTxn(txn0)
// Transaction A: Read value
txnA, _ := tm.BeginTxn(context.Background())
v1, _ := tm.Get(txnA, "key")
assert.Equal(t, "initial", string(v1))
// Transaction B: Modify and commit
txnB, _ := tm.BeginTxn(context.Background())
tm.Put(txnB, "key", []byte("modified"))
assert.NoError(t, tm.CommitTxn(txnB))
// Transaction A: Still sees initial (snapshot isolation!)
v2, _ := tm.Get(txnA, "key")
assert.Equal(t, "initial", string(v2), "snapshot isolation violated")
// Commit A
assert.NoError(t, tm.CommitTxn(txnA))
}
func TestWriteWriteConflict(t *testing.T) {
tm := NewTransactionManager()
// Setup: initial value
txn0, _ := tm.BeginTxn(context.Background())
tm.Put(txn0, "key", []byte("initial"))
tm.CommitTxn(txn0)
txnA, _ := tm.BeginTxn(context.Background())
txnB, _ := tm.BeginTxn(context.Background())
// Both modify same key
tm.Put(txnA, "key", []byte("valueA"))
tm.Put(txnB, "key", []byte("valueB"))
// First commit succeeds
assert.NoError(t, tm.CommitTxn(txnA))
// Second fails (write-write conflict)
err := tm.CommitTxn(txnB)
assert.Error(t, err, "expected write-write conflict")
} Acceptance Criteria
- ✅ Snapshot isolation verified
- ✅ Conflict detection preventing anomalies
- ✅ MVCC with version chains working
- ✅ Concurrent transactions coexist
- ✅ Write-write conflicts detected
- ✅ Read-write conflicts detected
- ✅ Deadlock prevention working
- ✅ Garbage collection cleaning old versions
- ✅ > 80% code coverage
- ✅ 10K+ transactions/sec throughput
- ✅ < 5% abort rate for typical workloads
Summary: Week 9 Complete
By completing Week 9, you've learned and implemented:
1. ACID Properties
- • Atomicity: All or nothing
- • Consistency: Valid state
- • Isolation: Concurrent access
- • Durability: Crash survival
2. Isolation Levels
- • Read Uncommitted → Serializable
- • Performance vs consistency trade-offs
- • Dirty reads, non-repeatable reads
- • Phantom reads prevention
3. MVCC Architecture
- • Multiple versions for concurrent access
- • Version chains with visibility
- • Snapshot isolation implementation
- • Garbage collection of old versions
4. Conflict Detection
- • Write-write conflicts
- • Write-read conflicts
- • Read-write conflicts
- • Automatic abort on conflict
Metrics Achieved:
- ✅ Snapshot isolation guarantees
- ✅ Concurrent transactions coexist
- ✅ Automatic conflict detection
- ✅ Deadlock prevention
- ✅ 10K+ transactions/sec throughput
- ✅ <5% abort rate for typical workloads
Ready for Week 10?
Next week we'll implement replication and high availability with leader-follower architecture, automatic failover, and consistency models.
Continue to Week 10: Replication →