Kafka Streams Tutorial: Real-Time Data Processing with Java
Learn Kafka Streams with hands-on examples. Build real-time data processing applications, windowing operations, and stream-table joins in this comprehensive tutorial.
Kafka Streams Tutorial: Real-Time Data Processing with Java
Kafka Streams is a powerful library for building real-time data processing applications. In this comprehensive tutorial, you’ll learn how to build streaming applications that can process millions of events per second.
What is Kafka Streams?
Kafka Streams is a client library for building applications and microservices that process data in real-time. It provides a simple and powerful API for stream processing operations like filtering, transforming, aggregating, and joining data streams.
Key Features
- Real-time Processing: Process data as it arrives
- Fault Tolerance: Built-in fault tolerance and recovery
- Scalability: Scale horizontally across multiple instances
- Exactly-once Processing: Guarantee each record is processed exactly once
- Integration: Seamlessly integrates with Kafka topics
Setting Up Your Development Environment
Prerequisites
- Java 8 or higher
- Apache Kafka 2.8+
- Maven or Gradle
- IDE (IntelliJ IDEA or Eclipse)
Maven Dependencies
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
Basic Kafka Streams Application
Let’s start with a simple word count application:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
public class WordCountApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Create a stream from the input topic
KStream<String, String> textLines = builder.stream("text-input");
// Process the stream
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count();
// Send results to output topic
wordCounts.toStream().to("word-count-output");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Understanding KStream and KTable
KStream
A KStream represents a stream of records where each record is an independent entity:
KStream<String, String> userEvents = builder.stream("user-events");
// Each record is processed independently
userEvents
.filter((key, value) -> value.contains("purchase"))
.mapValues(value -> "PROCESSED: " + value)
.to("processed-events");
KTable
A KTable represents a changelog stream where each record represents an update:
KTable<String, String> userProfiles = builder.table("user-profiles");
// Updates are merged based on key
userProfiles
.filter((key, value) -> value.contains("premium"))
.toStream()
.to("premium-users");
Windowing Operations
Windowing allows you to group records by time windows:
Tumbling Windows
Fixed-size, non-overlapping windows:
KStream<String, String> orders = builder.stream("orders");
orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count()
.toStream()
.map((windowedKey, count) -> new KeyValue<>(
windowedKey.key() + "@" + windowedKey.window().start(),
count.toString()
))
.to("order-counts-5min");
Hopping Windows
Overlapping windows with fixed size and advance interval:
orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(10))
.advanceBy(Duration.ofMinutes(2)))
.count()
.toStream()
.to("order-counts-hopping");
Session Windows
Windows based on activity periods:
orders
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.count()
.toStream()
.to("order-counts-session");
Stream-Table Joins
Stream-Table Join
Join a stream with a table:
KStream<String, String> orders = builder.stream("orders");
KTable<String, String> customers = builder.table("customers");
KStream<String, String> enrichedOrders = orders
.leftJoin(customers, (order, customer) -> {
if (customer != null) {
return order + " | Customer: " + customer;
}
return order + " | Customer: Unknown";
});
enrichedOrders.to("enriched-orders");
Stream-Stream Join
Join two streams:
KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");
KStream<String, String> orderPayments = orders
.join(payments,
(order, payment) -> order + " | Payment: " + payment,
JoinWindows.of(Duration.ofMinutes(5)));
orderPayments.to("order-payments");
Real-World Example: E-commerce Analytics
Let’s build a real-time analytics system for an e-commerce platform:
public class EcommerceAnalytics {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-analytics");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// Process user events
KStream<String, String> userEvents = builder.stream("user-events");
// Real-time user activity count
userEvents
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count()
.toStream()
.map((windowedKey, count) -> new KeyValue<>(
"user-activity-" + windowedKey.window().start(),
count.toString()
))
.to("user-activity-counts");
// Top products by sales
KStream<String, String> purchases = userEvents
.filter((key, value) -> value.contains("purchase"));
purchases
.map((key, value) -> {
// Extract product ID from purchase event
String productId = extractProductId(value);
return new KeyValue<>(productId, "1");
})
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count()
.toStream()
.map((windowedKey, count) -> new KeyValue<>(
"top-products-" + windowedKey.window().start(),
windowedKey.key() + ":" + count
))
.to("top-products");
// Revenue tracking
purchases
.map((key, value) -> {
double amount = extractAmount(value);
return new KeyValue<>("revenue", String.valueOf(amount));
})
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate(
() -> 0.0,
(key, value, aggregate) -> aggregate + Double.parseDouble(value),
Materialized.as("revenue-store")
)
.toStream()
.map((windowedKey, revenue) -> new KeyValue<>(
"revenue-" + windowedKey.window().start(),
String.valueOf(revenue)
))
.to("revenue-stream");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static String extractProductId(String event) {
// Implementation to extract product ID
return "product-123";
}
private static double extractAmount(String event) {
// Implementation to extract amount
return 99.99;
}
}
Stateful Operations
State Stores
Kafka Streams provides built-in state stores for maintaining state:
// Create a state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("user-session-store"),
Serdes.String(),
Serdes.Long()
);
builder.addStateStore(storeBuilder);
// Use the state store
KStream<String, String> userSessions = builder.stream("user-sessions");
userSessions
.groupByKey()
.aggregate(
() -> 0L,
(key, value, aggregate) -> aggregate + 1,
Materialized.as("user-session-store")
)
.toStream()
.to("session-counts");
Error Handling and Monitoring
Error Handling
KStream<String, String> input = builder.stream("input-topic");
input
.mapValues(value -> {
try {
return processValue(value);
} catch (Exception e) {
// Log error and send to dead letter queue
log.error("Error processing value: " + value, e);
return "ERROR: " + e.getMessage();
}
})
.to("output-topic");
Monitoring
// Add metrics
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
props.put(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");
// Monitor consumer lag
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.setUncaughtExceptionHandler((thread, exception) -> {
log.error("Uncaught exception in stream processing", exception);
});
Testing Kafka Streams Applications
Unit Testing
@Test
public void testWordCount() {
StreamsBuilder builder = new StreamsBuilder();
// Build your topology
KStream<String, String> input = builder.stream("input");
KTable<String, Long> wordCounts = input
.flatMapValues(text -> Arrays.asList(text.split(" ")))
.groupBy((key, word) -> word)
.count();
// Test the topology
TopologyTestDriver testDriver = new TopologyTestDriver(
builder.build(),
props
);
TestInputTopic<String, String> inputTopic = testDriver
.createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
TestOutputTopic<String, Long> outputTopic = testDriver
.createOutputTopic("output", Serdes.String().deserializer(), Serdes.Long().deserializer());
// Send test data
inputTopic.pipeInput("key", "hello world hello");
// Verify results
assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("hello", 2L));
assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("world", 1L));
testDriver.close();
}
Best Practices
1. Choose the Right Windowing Strategy
- Use tumbling windows for fixed-time aggregations
- Use hopping windows for overlapping analysis
- Use session windows for user behavior analysis
2. Optimize State Stores
// Use appropriate state store type
Materialized.as("store-name")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
.withCachingEnabled()
.withLoggingEnabled();
3. Handle Backpressure
// Configure processing guarantees
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
4. Monitor Performance
- Track processing latency
- Monitor consumer lag
- Set up alerts for failures
Common Pitfalls
1. Memory Issues
- Use appropriate window sizes
- Clean up old state stores
- Monitor memory usage
2. Ordering Issues
- Understand partition-level ordering
- Use appropriate partitioning strategies
3. State Management
- Handle state store failures
- Implement proper cleanup logic
Next Steps
This tutorial covered the basics of Kafka Streams, but there’s much more to explore:
- Advanced Windowing: Sliding windows, custom windowing
- Complex Joins: Multi-stream joins, temporal joins
- State Management: Custom state stores, state migration
- Performance Tuning: Optimization techniques
- Production Deployment: Monitoring, scaling, fault tolerance
Ready to master Kafka Streams and build production-ready streaming applications? Check out our comprehensive Apache Kafka Mastery Course that covers everything from fundamentals to advanced stream processing patterns.
This article is part of our Stream Processing series. Subscribe to get the latest Kafka and streaming insights delivered to your inbox.