Kafka Streams and Real-Time Processing
Master Kafka Streams: KStream vs KTable, windowing operations, stateful processing, joins, and building real-time analytics applications.
What is Kafka Streams?
Kafka Streams is a library for building real-time streaming applications. Think of it as "Spark for Kafka" - but simpler and more integrated.
Key Benefits
- No separate cluster - runs in your application
- Exactly-once processing - built-in
- Fault tolerant - automatic recovery
- Scalable - add more instances
- Simple - just a library, not a framework
Core Abstractions: KStream vs KTable
Kafka Streams has two main data types:
KStream: The Event Stream
# KStream - continuous stream of events
user-clicks → [click1, click2, click3, click4, click5, ...]
# Each record is independent
Record 1: {user: "alice", page: "/home", timestamp: 1609459200}
Record 2: {user: "bob", page: "/products", timestamp: 1609459201}
Record 3: {user: "alice", page: "/cart", timestamp: 1609459202} Use when: Processing individual events, real-time analytics, filtering
KTable: The State Table
# KTable - current state per key
user-profiles → {alice: {name: "Alice", age: 25}, bob: {name: "Bob", age: 30}}
# Updates overwrite previous values
Record 1: {user: "alice", name: "Alice", age: 25}
Record 2: {user: "alice", name: "Alice", age: 26} # Updates age
Record 3: {user: "bob", name: "Bob", age: 30} Use when: Maintaining state, lookups, aggregations
GlobalKTable: The Lookup Table
# GlobalKTable - replicated across all instances
product-catalog → {product-1: {name: "Laptop", price: 999}, ...}
# All instances have the same data
# Good for lookups, not for updates Use when: Reference data, lookups, enrichment
Building Your First Streams App
Let's build a simple click analytics pipeline:
from kafka import KafkaProducer
from kafka.streams import KafkaStreams
from kafka.streams.kstream import KStream, KTable
import json
# 1. Create a KStream from input topic
stream = KStream.builder() \
.stream("user-clicks") \
.map_values(lambda value: json.loads(value))
# 2. Filter for specific pages
home_clicks = stream.filter(lambda key, value: value["page"] == "/home")
# 3. Count clicks per user
click_counts = home_clicks \
.group_by_key() \
.count() \
.to_stream()
# 4. Write to output topic
click_counts.to("click-counts")
# 5. Start the application
streams = KafkaStreams(stream, config)
streams.start() Windowing Operations
Time-based aggregations are the bread and butter of streaming:
1. Tumbling Windows
# Fixed-size, non-overlapping windows
# Window 1: [0-60s], Window 2: [60-120s], Window 3: [120-180s]
windowed_counts = stream \
.group_by_key() \
.windowed_by(TimeWindows.of(60000)) \ # 60 seconds
.count() 2. Hopping Windows
# Fixed-size, overlapping windows
# Window 1: [0-60s], Window 2: [30-90s], Window 3: [60-120s]
windowed_counts = stream \
.group_by_key() \
.windowed_by(TimeWindows.of(60000).advance_by(30000)) \ # 60s window, 30s advance
.count() 3. Session Windows
# Activity-based windows
# Window closes after 5 minutes of inactivity
session_counts = stream \
.group_by_key() \
.windowed_by(SessionWindows.with(300000)) \ # 5 minutes
.count() Stateful Processing with RocksDB
Kafka Streams uses RocksDB for local state storage:
# State store configuration
state_store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-sessions"),
Serdes.String(),
Serdes.String()
)
# Add state store to topology
builder.add_state_store(state_store)
# Use state store in processor
class SessionProcessor(Processor):
def process(self, key, value):
# Read from state
current_session = self.state_store.get(key)
# Update state
new_session = update_session(current_session, value)
self.state_store.put(key, new_session)
# Forward to next processor
self.context.forward(key, new_session) Stream-Stream Joins
Join two streams based on keys and time windows:
# Join clicks with purchases
clicks = KStream.builder().stream("clicks")
purchases = KStream.builder().stream("purchases")
# Join within 1 hour window
joined = clicks.join(
purchases,
lambda click, purchase: {
"user": click["user"],
"clicked_product": click["product"],
"purchased_product": purchase["product"],
"time_diff": purchase["timestamp"] - click["timestamp"]
},
JoinWindows.of(3600000) # 1 hour
) Stream-Table Joins
Enrich stream data with lookup table:
# Enrich clicks with user profiles
clicks = KStream.builder().stream("clicks")
user_profiles = KTable.builder().table("user-profiles")
# Left join (keep all clicks, even without profile)
enriched_clicks = clicks.left_join(
user_profiles,
lambda click, profile: {
"user": click["user"],
"page": click["page"],
"user_name": profile["name"] if profile else "Unknown",
"user_age": profile["age"] if profile else None
}
) Exactly-Once Processing
Kafka Streams provides exactly-once semantics out of the box:
Enable Exactly-Once
config = {
"processing.guarantee": "exactly_once",
"enable.idempotence": True,
"acks": "all"
} How it works:
- Transactional producers ensure exactly-once writes
- State stores are transactional
- Output topics are written transactionally
- Recovery replays from last committed state
Error Handling and Dead Letter Queues
What happens when processing fails?
def safe_process(record):
try:
return process_record(record)
except Exception as e:
# Send to dead letter queue
dead_letter_producer.send("dead-letter-queue", {
"original_record": record,
"error": str(e),
"timestamp": time.time()
})
return None
# Apply error handling
processed = stream.map_values(safe_process).filter(lambda k, v: v is not None) Monitoring and Metrics
Kafka Streams exposes metrics for monitoring:
Key Metrics
- records-consumed-rate - Input throughput
- records-produced-rate - Output throughput
- process-latency-avg - Processing time
- state-store-size - State store size
- task-created-rate - Rebalancing frequency
Production Best Practices
Configuration
# Application config
application.id = my-streams-app
bootstrap.servers = kafka:9092
processing.guarantee = exactly_once
state.dir = /tmp/kafka-streams
# Performance tuning
num.stream.threads = 4
commit.interval.ms = 10000
cache.max.bytes.buffering = 10485760 Deployment
- Use Docker - Consistent environments
- Set resource limits - Prevent OOM kills
- Monitor state store size - Can grow large
- Plan for rebalancing - Graceful shutdowns
Real-World Example: Real-Time Analytics
Let's build a complete analytics pipeline:
# 1. Read user events
events = KStream.builder().stream("user-events")
# 2. Filter and transform
page_views = events \
.filter(lambda k, v: v["event_type"] == "page_view") \
.map_values(lambda v: {
"user_id": v["user_id"],
"page": v["page"],
"timestamp": v["timestamp"]
})
# 3. Count views per page (1-minute windows)
page_counts = page_views \
.group_by(lambda k, v: v["page"]) \
.windowed_by(TimeWindows.of(60000)) \
.count()
# 4. Calculate top pages
top_pages = page_counts \
.to_stream() \
.group_by(lambda k, v: "all") \
.aggregate(
initializer=lambda: {},
aggregator=lambda key, value, agg: update_top_pages(agg, value),
materialized_as="top-pages-store"
)
# 5. Output results
top_pages.to("top-pages") Key Takeaways
- KStream for events, KTable for state - choose the right abstraction
- Windowing is powerful - time-based aggregations are key
- State stores enable complex logic - but monitor their size
- Exactly-once is built-in - use it for critical applications
- Monitor and test thoroughly - streaming apps are complex
Next Steps
Ready to manage data schemas? Check out our next lesson on Schema Registry and Data Governance where we'll learn how to handle schema evolution in production.