Course Navigation
← Back to Course OverviewAll Lessons
✓
Introduction and Database Fundamentals ✓
Building the Core Data Structure ✓
Concurrency and Thread Safety ✓
Append-Only Log (Write-Ahead Log) ✓
SSTables and LSM Trees ✓
Compaction and Optimization ✓
TCP Server and Protocol Design 8
Client Library and Advanced Networking 9
Transactions and ACID Properties 10
Replication and High Availability 11
Monitoring, Metrics, and Observability 12
Performance Optimization and Tuning 13
Configuration and Deployment 14
Security and Production Hardening 15
Final Project and Beyond Current Lesson
8 of 15
Progress 53%
Client Library and Advanced Networking
Learning Objectives
- • Master connection pooling for high-performance clients
- • Implement intelligent retry logic with exponential backoff
- • Build circuit breakers to prevent cascading failures
- • Use pipelining to achieve 10x throughput improvements
- • Handle network errors and timeouts gracefully
- • Create production-ready client libraries
Lesson 8.1: Client Implementation
Connection Management
A client needs to manage connections to the database server efficiently. Let's start with basic connection handling and evolve to production-grade pooling.
Basic Connection
// Simple client without pooling
type Client struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
}
func NewClient(addr string) (*Client, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
return &Client{
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
}, nil
}
func (c *Client) Get(key string) (string, error) {
// Send GET command
cmd := fmt.Sprintf("*2\r\n$3\r\nGET\r\n$%d\r\n%s\r\n", len(key), key)
if _, err := c.writer.WriteString(cmd); err != nil {
return "", err
}
if err := c.writer.Flush(); err != nil {
return "", err
}
// Parse response
marker, _ := c.reader.ReadByte()
if marker == '-' {
// Error response
line, _ := bufio.NewReader(c.reader).ReadString('\n')
return "", fmt.Errorf(line)
}
if marker == '$' {
// Bulk string
line, _ := bufio.NewReader(c.reader).ReadString('\n')
length, _ := strconv.Atoi(strings.TrimSpace(line))
if length == -1 {
return "", nil // Null
}
data := make([]byte, length)
c.reader.Read(data)
c.reader.ReadBytes('\n') // Read trailing \r\n
return string(data), nil
}
return "", fmt.Errorf("unexpected response type: %c", marker)
}
func (c *Client) Close() error {
return c.conn.Close()
} Problem: One connection per client means:
- • Limited concurrency (connection blocks on read)
- • New connection overhead for each operation
- • No connection reuse
Connection Pooling
Connection Pool: Maintain multiple reusable connections
// Pool of connections to single server
type ConnPool struct {
addr string
conns chan net.Conn
maxConns int
idleTimeout time.Duration
closed bool
mu sync.Mutex
}
func NewConnPool(addr string, maxConns int) *ConnPool {
return &ConnPool{
addr: addr,
conns: make(chan net.Conn, maxConns),
maxConns: maxConns,
idleTimeout: 5 * time.Minute,
}
}
// Get retrieves connection from pool or creates new
func (cp *ConnPool) Get(ctx context.Context) (net.Conn, error) {
select {
case conn := <-cp.conns:
// Connection available from pool
// Test if still alive (optional)
if err := cp.testConn(conn); err == nil {
return conn, nil
}
conn.Close()
// Fall through to create new
case <-ctx.Done():
return nil, ctx.Err()
default:
}
// Create new connection
conn, err := net.Dial("tcp", cp.addr)
if err != nil {
return nil, err
}
return conn, nil
}
// Put returns connection to pool
func (cp *ConnPool) Put(conn net.Conn) error {
cp.mu.Lock()
if cp.closed {
cp.mu.Unlock()
return conn.Close()
}
cp.mu.Unlock()
select {
case cp.conns <- conn:
return nil
default:
// Pool full, close connection
return conn.Close()
}
}
// testConn verifies connection is still alive
func (cp *ConnPool) testConn(conn net.Conn) error {
// Set short timeout for test
conn.SetDeadline(time.Now().Add(100 * time.Millisecond))
defer conn.SetDeadline(time.Time)
// Send PING (simple test)
fmt.Fprintf(conn, "*1\r\n$4\r\nPING\r\n")
// Read response
reader := bufio.NewReader(conn)
resp, err := reader.ReadString('\n')
return err
} Pooled Client
type PooledClient struct {
pool *ConnPool
timeout time.Duration
}
func NewPooledClient(addr string, maxConns int) *PooledClient {
return &PooledClient{
pool: NewConnPool(addr, maxConns),
timeout: 5 * time.Second,
}
}
func (pc *PooledClient) Get(key string) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), pc.timeout)
defer cancel()
conn, err := pc.pool.Get(ctx)
if err != nil {
return "", err
}
defer pc.pool.Put(conn)
// Send command and read response
return pc.sendCommand(conn, "GET", key)
}
func (pc *PooledClient) Close() error {
return pc.pool.Close()
} Retry Logic and Exponential Backoff
Problem: Transient failures (temporary network issues)
Attempt 1: Connection refused
Attempt 2 (wait 10ms): Connection refused
Attempt 3 (wait 100ms): Success! // RetryConfig configures retry behavior
type RetryConfig struct {
MaxAttempts int
InitialWait time.Duration
MaxWait time.Duration
}
// Retry executes operation with exponential backoff
func Retry(cfg RetryConfig, fn func() error) error {
var lastErr error
wait := cfg.InitialWait
for attempt := 0; attempt < cfg.MaxAttempts; attempt++ {
if attempt > 0 {
time.Sleep(wait)
// Exponential backoff: wait = wait * 2, capped at MaxWait
if wait < cfg.MaxWait {
wait *= 2
if wait > cfg.MaxWait {
wait = cfg.MaxWait
}
}
}
err := fn()
if err == nil {
return nil
}
// Check if error is retryable
if !isRetryable(err) {
return err
}
lastErr = err
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
// isRetryable determines if error should trigger retry
func isRetryable(err error) bool {
if err == nil {
return false
}
// Network errors are retryable
if ne, ok := err.(net.Error); ok {
return ne.Temporary() || ne.Timeout()
}
// Check error message
errStr := err.Error()
retryable := []string{
"connection refused",
"connection reset",
"i/o timeout",
"temporary failure",
}
for _, msg := range retryable {
if strings.Contains(strings.ToLower(errStr), msg) {
return true
}
}
return false
} Circuit Breaker Pattern
Problem: Retry loop can overwhelm failing server
Server crashes:
Client retries → Sends traffic to dead server
More clients retry → Thundering herd
Server recovers but overloaded Solution: Circuit Breaker - Stops sending requests when failures exceed threshold
// CircuitBreaker prevents cascading failures
type CircuitBreaker struct {
maxFailures int
resetTimeout time.Duration
failures int
lastFailTime time.Time
state CircuitState
mu sync.RWMutex
}
type CircuitState int
const (
CircuitClosed CircuitState = iota // Normal operation
CircuitOpen // Too many failures
CircuitHalfOpen // Testing recovery
)
func NewCircuitBreaker(maxFailures int, resetTimeout time.Duration) *CircuitBreaker {
return &CircuitBreaker{
maxFailures: maxFailures,
resetTimeout: resetTimeout,
state: CircuitClosed,
}
}
// Do executes operation, managing circuit state
func (cb *CircuitBreaker) Do(fn func() error) error {
cb.mu.Lock()
// Check if should reset from Open to HalfOpen
if cb.state == CircuitOpen {
if time.Since(cb.lastFailTime) > cb.resetTimeout {
cb.state = CircuitHalfOpen
cb.failures = 0
} else {
cb.mu.Unlock()
return ErrCircuitOpen
}
}
cb.mu.Unlock()
// Execute operation
err := fn()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
cb.failures++
cb.lastFailTime = time.Now()
if cb.failures >= cb.maxFailures {
cb.state = CircuitOpen
return fmt.Errorf("circuit open after %d failures: %w", cb.failures, err)
}
return err
}
// Success: reset circuit
cb.failures = 0
if cb.state == CircuitHalfOpen {
cb.state = CircuitClosed
}
return nil
} Lesson 8.2: Advanced Networking Features
Pipelining for Throughput
Problem: Without pipelining:
Client sends command 1 →
← Server sends response 1
Client sends command 2 →
← Server sends response 2
Client sends command 3 →
← Server sends response 3
Round-trip latency: 3 × 10ms = 30ms Solution: Pipelining sends multiple commands before waiting for responses
Client sends commands 1,2,3 →
← Server sends responses 1,2,3
Latency: 1 × 10ms = 10ms (3x faster!) // Pipeline sends multiple commands without waiting
type Pipeline struct {
conn net.Conn
writer *bufio.Writer
reader *bufio.Reader
commands []*Command
}
type Command struct {
Name string
Args []string
Response chan interface
Error chan error
}
func NewPipeline(conn net.Conn) *Pipeline {
return &Pipeline{
conn: conn,
writer: bufio.NewWriter(conn),
reader: bufio.NewReader(conn),
commands: make([]*Command, 0),
}
}
// Add queues command for execution
func (p *Pipeline) Add(name string, args ...string) <-chan interface {
cmd := &Command{
Name: name,
Args: args,
Response: make(chan interface, 1),
Error: make(chan error, 1),
}
p.commands = append(p.commands, cmd)
return cmd.Response
}
// Execute sends all queued commands and reads responses
func (p *Pipeline) Execute(ctx context.Context) error {
// Send all commands
for _, cmd := range p.commands {
req := p.formatCommand(cmd)
if _, err := p.writer.WriteString(req); err != nil {
return err
}
}
if err := p.writer.Flush(); err != nil {
return err
}
// Read all responses
for _, cmd := range p.commands {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
resp, err := p.readResponse()
if err != nil {
cmd.Error <- err
} else {
cmd.Response <- resp
}
}
p.commands = p.commands[:0] // Clear for next pipeline
return nil
} Multiplexing Connections
Problem: Multiple goroutines on same connection conflicts
// BAD: Race condition
conn := shared connection
go func() {
conn.Write(command1)
conn.Read(response1)
}()
go func() {
conn.Write(command2)
conn.Read(response2)
}()
// Commands/responses may interleave! Solution: Multiplex with request ID matching
// Multiplexer allows concurrent operations on single connection
type Multiplexer struct {
conn net.Conn
requestID uint64
responses map[uint64]chan interface
mu sync.Mutex
}
// Request with unique ID
type MuxRequest struct {
ID uint64
Name string
Args []string
}
// Send sends request and returns channel for response
func (m *Multiplexer) Send(name string, args ...string) (<-chan interface, error) {
m.mu.Lock()
requestID := m.requestID
m.requestID++
respCh := make(chan interface, 1)
m.responses[requestID] = respCh
m.mu.Unlock()
// Send request with ID
req := fmt.Sprintf("%d %s %s\r\n", requestID, name, strings.Join(args, " "))
_, err := m.conn.Write([]byte(req))
if err != nil {
m.mu.Lock()
delete(m.responses, requestID)
m.mu.Unlock()
return nil, err
}
return respCh, nil
} Request Timeouts
// Timeout ensures requests don't hang forever
func (pc *PooledClient) GetWithTimeout(key string, timeout time.Duration) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
resultCh := make(chan string, 1)
errCh := make(chan error, 1)
go func() {
val, err := pc.Get(key)
if err != nil {
errCh <- err
} else {
resultCh <- val
}
}()
select {
case result := <-resultCh:
return result, nil
case err := <-errCh:
return "", err
case <-ctx.Done():
return "", fmt.Errorf("timeout exceeded")
}
} Lesson 8.3: Error Handling
Network Error Types
import "errors"
import "net"
// Classify network errors
func classifyError(err error) string {
if err == nil {
return "none"
}
if errors.Is(err, io.EOF) {
return "connection_closed"
}
if errors.Is(err, context.DeadlineExceeded) {
return "timeout"
}
if ne, ok := err.(net.Error); ok {
if ne.Timeout() {
return "timeout"
}
if ne.Temporary() {
return "temporary"
}
return "permanent"
}
return "unknown"
}
// Common network errors
var (
ErrConnectionRefused = fmt.Errorf("connection refused")
ErrConnectionReset = fmt.Errorf("connection reset")
ErrTimeout = fmt.Errorf("operation timeout")
ErrCircuitOpen = fmt.Errorf("circuit breaker open")
) Partial Failure Handling
// Multi-key operation with partial failures
func (pc *PooledClient) MGet(keys ...string) (map[string]string, error) {
results := make(map[string]string)
var lastErr error
for _, key := range keys {
val, err := pc.Get(key)
if err != nil {
lastErr = err
continue
}
results[key] = val
}
// Return partial results even if some failed
return results, lastErr
} Client-Side Retries
// Transient vs permanent errors
func isTransient(err error) bool {
if err == nil {
return false
}
if ne, ok := err.(net.Error); ok {
return ne.Temporary() || ne.Timeout()
}
// Check for specific errors
errMsg := err.Error()
transient := []string{
"ECONNREFUSED",
"ETIMEDOUT",
"ECONNRESET",
"temporary failure",
}
for _, msg := range transient {
if strings.Contains(errMsg, msg) {
return true
}
}
return false
}
// Retry with backoff
func (pc *PooledClient) GetWithRetry(key string, maxRetries int) (string, error) {
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
backoff := time.Duration(math.Pow(2, float64(attempt-1))) * 100 * time.Millisecond
time.Sleep(backoff)
}
val, err := pc.Get(key)
if err == nil {
return val, nil
}
if !isTransient(err) {
return "", err // Permanent error, don't retry
}
lastErr = err
}
return "", fmt.Errorf("max retries exceeded: %w", lastErr)
} Lab 8.1: Build Production-Ready Client Library
Objective
Implement a feature-complete, production-grade client library with pooling, retries, and advanced features.
Requirements
- • Connection Pooling: Configurable pool size, connection reuse, idle cleanup
- • Retry Logic: Exponential backoff, configurable max attempts, transient error detection
- • Circuit Breaker: Prevent cascading failures, automatic recovery, state tracking
- • Pipelining: Batch multiple commands, single flush to server, collect all responses
- • Error Handling: Proper error types, timeout handling, partial failure support
- • Testing: Test with real server, test error scenarios, test concurrent operations
Starter Code
package client
import (
"bufio"
"context"
"net"
"sync"
"time"
)
// Client is a production-ready database client
type Client struct {
// TODO: Add connection pool, circuit breaker, retry config
}
// NewClient creates a new client
// TODO: Implement with sensible defaults
func NewClient(addr string, opts ...Option) (*Client, error) {
return nil, nil
}
// Get retrieves a value with automatic retry and timeout
// TODO: Implement with connection pooling
func (c *Client) Get(key string) (string, error) {
return "", nil
}
// Set stores a value
// TODO: Implement
func (c *Client) Set(key, value string) error {
return nil
}
// Delete removes a key
// TODO: Implement
func (c *Client) Delete(key string) error {
return nil
}
// Pipeline returns a new pipeline for batching
// TODO: Implement
func (c *Client) Pipeline() *Pipeline {
return nil
}
// Close gracefully closes the client
// TODO: Implement
func (c *Client) Close() error {
return nil
}
// Option configures client behavior
type Option func(*Client)
// WithPoolSize sets connection pool size
func WithPoolSize(size int) Option {
return func(c *Client) {
// TODO: Implement
}
}
// WithTimeout sets operation timeout
func WithTimeout(t time.Duration) Option {
return func(c *Client) {
// TODO: Implement
}
}
// WithRetry configures retry behavior
func WithRetry(maxAttempts int, initialWait time.Duration) Option {
return func(c *Client) {
// TODO: Implement
}
} Test Template
func TestClientPooling(t *testing.T) {
// Start test server
server := startTestServer()
defer server.Stop()
// Create client with pool size 5
client := NewClient(server.Addr(), WithPoolSize(5))
defer client.Close()
// Concurrent operations
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", idx)
client.Set(key, "value")
val, _ := client.Get(key)
assert.Equal(t, "value", val)
}(i)
}
wg.Wait()
}
func TestClientRetry(t *testing.T) {
// Use flaky server that fails then succeeds
server := startFlakyServer(failCount: 2)
defer server.Stop()
client := NewClient(server.Addr(), WithRetry(5, 10*time.Millisecond))
// Should succeed after retries
val, err := client.Get("key")
assert.NoError(t, err)
}
func BenchmarkClientThroughput(b *testing.B) {
server := startTestServer()
defer server.Stop()
client := NewClient(server.Addr())
defer client.Close()
b.ResetTimer()
for i := 0; i < b.N; i++ {
client.Set("key", "value")
}
} Acceptance Criteria
- ✅ All tests pass
- ✅ Connection pooling working
- ✅ Retries succeed with flaky servers
- ✅ Circuit breaker prevents cascading failures
- ✅ Pipelining achieves 10x throughput improvement
- ✅ 100K+ operations/sec on single client
- ✅ > 85% code coverage
- ✅ No goroutine leaks
Summary: Week 8 Complete
By completing Week 8, you've learned and implemented:
1. Connection Management
- • Simple connections
- • Connection pooling with reuse
- • Idle connection cleanup
- • Connection testing
2. Retry Logic
- • Exponential backoff
- • Transient error detection
- • Max retry limits
- • Backoff strategies
3. Circuit Breaker Pattern
- • Prevents cascading failures
- • State management
- • Automatic recovery
- • Failure threshold tracking
4. Advanced Features
- • Pipelining: 10x throughput
- • Multiplexing: Concurrent ops
- • Timeouts: Prevent hanging
- • Keep-alive: Detect dead conns
Metrics Achieved:
- ✅ 100K+ ops/sec with pooling
- ✅ 10x throughput with pipelining
- ✅ Sub-100ms failover with circuit breaker
- ✅ <1% error rate with retries
- ✅ Support for 1000+ concurrent clients
Module 4 Complete: Full Client-Server System
Congratulations! You now have a complete client-server system:
✅ Week 7: TCP Server
- • RESP protocol implementation
- • 10,000+ concurrent connections
- • Command execution framework
- • Graceful shutdown
✅ Week 8: Client Library
- • Connection pooling (5-10x throughput)
- • Intelligent retries (99%+ success rate)
- • Circuit breaking (prevents cascades)
- • Pipelining (10x throughput)
Ready for Module 5?
Next we'll implement advanced features including transactions with MVCC, replication, and high availability.
Continue to Module 5: Advanced Features →