Kafka Streams and Real-Time Processing

Master Kafka Streams: KStream vs KTable, windowing operations, stateful processing, joins, and building real-time analytics applications.

What is Kafka Streams?

Kafka Streams is a library for building real-time streaming applications. Think of it as "Spark for Kafka" - but simpler and more integrated.

Key Benefits

  • No separate cluster - runs in your application
  • Exactly-once processing - built-in
  • Fault tolerant - automatic recovery
  • Scalable - add more instances
  • Simple - just a library, not a framework

Core Abstractions: KStream vs KTable

Kafka Streams has two main data types:

KStream: The Event Stream

# KStream - continuous stream of events
user-clicks → [click1, click2, click3, click4, click5, ...]

# Each record is independent
Record 1: {user: "alice", page: "/home", timestamp: 1609459200}
Record 2: {user: "bob", page: "/products", timestamp: 1609459201}
Record 3: {user: "alice", page: "/cart", timestamp: 1609459202}

Use when: Processing individual events, real-time analytics, filtering

KTable: The State Table

# KTable - current state per key
user-profiles → {alice: {name: "Alice", age: 25}, bob: {name: "Bob", age: 30}}

# Updates overwrite previous values
Record 1: {user: "alice", name: "Alice", age: 25}
Record 2: {user: "alice", name: "Alice", age: 26}  # Updates age
Record 3: {user: "bob", name: "Bob", age: 30}

Use when: Maintaining state, lookups, aggregations

GlobalKTable: The Lookup Table

# GlobalKTable - replicated across all instances
product-catalog → {product-1: {name: "Laptop", price: 999}, ...}

# All instances have the same data
# Good for lookups, not for updates

Use when: Reference data, lookups, enrichment

Building Your First Streams App

Let's build a simple click analytics pipeline:

from kafka import KafkaProducer
from kafka.streams import KafkaStreams
from kafka.streams.kstream import KStream, KTable
import json

# 1. Create a KStream from input topic
stream = KStream.builder() \
    .stream("user-clicks") \
    .map_values(lambda value: json.loads(value))

# 2. Filter for specific pages
home_clicks = stream.filter(lambda key, value: value["page"] == "/home")

# 3. Count clicks per user
click_counts = home_clicks \
    .group_by_key() \
    .count() \
    .to_stream()

# 4. Write to output topic
click_counts.to("click-counts")

# 5. Start the application
streams = KafkaStreams(stream, config)
streams.start()

Windowing Operations

Time-based aggregations are the bread and butter of streaming:

1. Tumbling Windows

# Fixed-size, non-overlapping windows
# Window 1: [0-60s], Window 2: [60-120s], Window 3: [120-180s]

windowed_counts = stream \
    .group_by_key() \
    .windowed_by(TimeWindows.of(60000)) \  # 60 seconds
    .count()

2. Hopping Windows

# Fixed-size, overlapping windows
# Window 1: [0-60s], Window 2: [30-90s], Window 3: [60-120s]

windowed_counts = stream \
    .group_by_key() \
    .windowed_by(TimeWindows.of(60000).advance_by(30000)) \  # 60s window, 30s advance
    .count()

3. Session Windows

# Activity-based windows
# Window closes after 5 minutes of inactivity

session_counts = stream \
    .group_by_key() \
    .windowed_by(SessionWindows.with(300000)) \  # 5 minutes
    .count()

Stateful Processing with RocksDB

Kafka Streams uses RocksDB for local state storage:

# State store configuration
state_store = Stores.keyValueStoreBuilder(
    Stores.persistentKeyValueStore("user-sessions"),
    Serdes.String(),
    Serdes.String()
)

# Add state store to topology
builder.add_state_store(state_store)

# Use state store in processor
class SessionProcessor(Processor):
    def process(self, key, value):
        # Read from state
        current_session = self.state_store.get(key)
        
        # Update state
        new_session = update_session(current_session, value)
        self.state_store.put(key, new_session)
        
        # Forward to next processor
        self.context.forward(key, new_session)

Stream-Stream Joins

Join two streams based on keys and time windows:

# Join clicks with purchases
clicks = KStream.builder().stream("clicks")
purchases = KStream.builder().stream("purchases")

# Join within 1 hour window
joined = clicks.join(
    purchases,
    lambda click, purchase: {
        "user": click["user"],
        "clicked_product": click["product"],
        "purchased_product": purchase["product"],
        "time_diff": purchase["timestamp"] - click["timestamp"]
    },
    JoinWindows.of(3600000)  # 1 hour
)

Stream-Table Joins

Enrich stream data with lookup table:

# Enrich clicks with user profiles
clicks = KStream.builder().stream("clicks")
user_profiles = KTable.builder().table("user-profiles")

# Left join (keep all clicks, even without profile)
enriched_clicks = clicks.left_join(
    user_profiles,
    lambda click, profile: {
        "user": click["user"],
        "page": click["page"],
        "user_name": profile["name"] if profile else "Unknown",
        "user_age": profile["age"] if profile else None
    }
)

Exactly-Once Processing

Kafka Streams provides exactly-once semantics out of the box:

Enable Exactly-Once

config = {
    "processing.guarantee": "exactly_once",
    "enable.idempotence": True,
    "acks": "all"
}

How it works:

  1. Transactional producers ensure exactly-once writes
  2. State stores are transactional
  3. Output topics are written transactionally
  4. Recovery replays from last committed state

Error Handling and Dead Letter Queues

What happens when processing fails?

def safe_process(record):
    try:
        return process_record(record)
    except Exception as e:
        # Send to dead letter queue
        dead_letter_producer.send("dead-letter-queue", {
            "original_record": record,
            "error": str(e),
            "timestamp": time.time()
        })
        return None

# Apply error handling
processed = stream.map_values(safe_process).filter(lambda k, v: v is not None)

Monitoring and Metrics

Kafka Streams exposes metrics for monitoring:

Key Metrics

  • records-consumed-rate - Input throughput
  • records-produced-rate - Output throughput
  • process-latency-avg - Processing time
  • state-store-size - State store size
  • task-created-rate - Rebalancing frequency

Production Best Practices

Configuration

# Application config
application.id = my-streams-app
bootstrap.servers = kafka:9092
processing.guarantee = exactly_once
state.dir = /tmp/kafka-streams

# Performance tuning
num.stream.threads = 4
commit.interval.ms = 10000
cache.max.bytes.buffering = 10485760

Deployment

  • Use Docker - Consistent environments
  • Set resource limits - Prevent OOM kills
  • Monitor state store size - Can grow large
  • Plan for rebalancing - Graceful shutdowns

Real-World Example: Real-Time Analytics

Let's build a complete analytics pipeline:

# 1. Read user events
events = KStream.builder().stream("user-events")

# 2. Filter and transform
page_views = events \
    .filter(lambda k, v: v["event_type"] == "page_view") \
    .map_values(lambda v: {
        "user_id": v["user_id"],
        "page": v["page"],
        "timestamp": v["timestamp"]
    })

# 3. Count views per page (1-minute windows)
page_counts = page_views \
    .group_by(lambda k, v: v["page"]) \
    .windowed_by(TimeWindows.of(60000)) \
    .count()

# 4. Calculate top pages
top_pages = page_counts \
    .to_stream() \
    .group_by(lambda k, v: "all") \
    .aggregate(
        initializer=lambda: {},
        aggregator=lambda key, value, agg: update_top_pages(agg, value),
        materialized_as="top-pages-store"
    )

# 5. Output results
top_pages.to("top-pages")

Key Takeaways

  1. KStream for events, KTable for state - choose the right abstraction
  2. Windowing is powerful - time-based aggregations are key
  3. State stores enable complex logic - but monitor their size
  4. Exactly-once is built-in - use it for critical applications
  5. Monitor and test thoroughly - streaming apps are complex

Next Steps

Ready to manage data schemas? Check out our next lesson on Schema Registry and Data Governance where we'll learn how to handle schema evolution in production.