Featured
Share:

Kafka Streams Tutorial: Real-Time Data Processing with Java

Learn Kafka Streams with hands-on examples. Build real-time data processing applications, windowing operations, and stream-table joins in this comprehensive tutorial.

Custom Ad Space (post-banner)

Kafka Streams Tutorial: Real-Time Data Processing with Java

Kafka Streams is a powerful library for building real-time data processing applications. In this comprehensive tutorial, you’ll learn how to build streaming applications that can process millions of events per second.

What is Kafka Streams?

Kafka Streams is a client library for building applications and microservices that process data in real-time. It provides a simple and powerful API for stream processing operations like filtering, transforming, aggregating, and joining data streams.

Key Features

  • Real-time Processing: Process data as it arrives
  • Fault Tolerance: Built-in fault tolerance and recovery
  • Scalability: Scale horizontally across multiple instances
  • Exactly-once Processing: Guarantee each record is processed exactly once
  • Integration: Seamlessly integrates with Kafka topics

Setting Up Your Development Environment

Prerequisites

  • Java 8 or higher
  • Apache Kafka 2.8+
  • Maven or Gradle
  • IDE (IntelliJ IDEA or Eclipse)

Maven Dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.4.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
</dependencies>

Basic Kafka Streams Application

Let’s start with a simple word count application:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

import java.util.Properties;

public class WordCountApp {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        
        // Create a stream from the input topic
        KStream<String, String> textLines = builder.stream("text-input");
        
        // Process the stream
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count();
        
        // Send results to output topic
        wordCounts.toStream().to("word-count-output");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Understanding KStream and KTable

KStream

A KStream represents a stream of records where each record is an independent entity:

KStream<String, String> userEvents = builder.stream("user-events");

// Each record is processed independently
userEvents
    .filter((key, value) -> value.contains("purchase"))
    .mapValues(value -> "PROCESSED: " + value)
    .to("processed-events");

KTable

A KTable represents a changelog stream where each record represents an update:

KTable<String, String> userProfiles = builder.table("user-profiles");

// Updates are merged based on key
userProfiles
    .filter((key, value) -> value.contains("premium"))
    .toStream()
    .to("premium-users");

Windowing Operations

Windowing allows you to group records by time windows:

Tumbling Windows

Fixed-size, non-overlapping windows:

KStream<String, String> orders = builder.stream("orders");

orders
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
    .count()
    .toStream()
    .map((windowedKey, count) -> new KeyValue<>(
        windowedKey.key() + "@" + windowedKey.window().start(),
        count.toString()
    ))
    .to("order-counts-5min");

Hopping Windows

Overlapping windows with fixed size and advance interval:

orders
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(10))
                          .advanceBy(Duration.ofMinutes(2)))
    .count()
    .toStream()
    .to("order-counts-hopping");

Session Windows

Windows based on activity periods:

orders
    .groupByKey()
    .windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
    .count()
    .toStream()
    .to("order-counts-session");

Stream-Table Joins

Stream-Table Join

Join a stream with a table:

KStream<String, String> orders = builder.stream("orders");
KTable<String, String> customers = builder.table("customers");

KStream<String, String> enrichedOrders = orders
    .leftJoin(customers, (order, customer) -> {
        if (customer != null) {
            return order + " | Customer: " + customer;
        }
        return order + " | Customer: Unknown";
    });

enrichedOrders.to("enriched-orders");

Stream-Stream Join

Join two streams:

KStream<String, String> orders = builder.stream("orders");
KStream<String, String> payments = builder.stream("payments");

KStream<String, String> orderPayments = orders
    .join(payments, 
          (order, payment) -> order + " | Payment: " + payment,
          JoinWindows.of(Duration.ofMinutes(5)));

orderPayments.to("order-payments");

Real-World Example: E-commerce Analytics

Let’s build a real-time analytics system for an e-commerce platform:

public class EcommerceAnalytics {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "ecommerce-analytics");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        
        // Process user events
        KStream<String, String> userEvents = builder.stream("user-events");
        
        // Real-time user activity count
        userEvents
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .count()
            .toStream()
            .map((windowedKey, count) -> new KeyValue<>(
                "user-activity-" + windowedKey.window().start(),
                count.toString()
            ))
            .to("user-activity-counts");
        
        // Top products by sales
        KStream<String, String> purchases = userEvents
            .filter((key, value) -> value.contains("purchase"));
        
        purchases
            .map((key, value) -> {
                // Extract product ID from purchase event
                String productId = extractProductId(value);
                return new KeyValue<>(productId, "1");
            })
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
            .count()
            .toStream()
            .map((windowedKey, count) -> new KeyValue<>(
                "top-products-" + windowedKey.window().start(),
                windowedKey.key() + ":" + count
            ))
            .to("top-products");
        
        // Revenue tracking
        purchases
            .map((key, value) -> {
                double amount = extractAmount(value);
                return new KeyValue<>("revenue", String.valueOf(amount));
            })
            .groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .aggregate(
                () -> 0.0,
                (key, value, aggregate) -> aggregate + Double.parseDouble(value),
                Materialized.as("revenue-store")
            )
            .toStream()
            .map((windowedKey, revenue) -> new KeyValue<>(
                "revenue-" + windowedKey.window().start(),
                String.valueOf(revenue)
            ))
            .to("revenue-stream");
        
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
        
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
    
    private static String extractProductId(String event) {
        // Implementation to extract product ID
        return "product-123";
    }
    
    private static double extractAmount(String event) {
        // Implementation to extract amount
        return 99.99;
    }
}

Stateful Operations

State Stores

Kafka Streams provides built-in state stores for maintaining state:

// Create a state store
StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores
    .keyValueStoreBuilder(
        Stores.persistentKeyValueStore("user-session-store"),
        Serdes.String(),
        Serdes.Long()
    );

builder.addStateStore(storeBuilder);

// Use the state store
KStream<String, String> userSessions = builder.stream("user-sessions");

userSessions
    .groupByKey()
    .aggregate(
        () -> 0L,
        (key, value, aggregate) -> aggregate + 1,
        Materialized.as("user-session-store")
    )
    .toStream()
    .to("session-counts");

Error Handling and Monitoring

Error Handling

KStream<String, String> input = builder.stream("input-topic");

input
    .mapValues(value -> {
        try {
            return processValue(value);
        } catch (Exception e) {
            // Log error and send to dead letter queue
            log.error("Error processing value: " + value, e);
            return "ERROR: " + e.getMessage();
        }
    })
    .to("output-topic");

Monitoring

// Add metrics
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
props.put(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "30000");

// Monitor consumer lag
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.setUncaughtExceptionHandler((thread, exception) -> {
    log.error("Uncaught exception in stream processing", exception);
});

Testing Kafka Streams Applications

Unit Testing

@Test
public void testWordCount() {
    StreamsBuilder builder = new StreamsBuilder();
    
    // Build your topology
    KStream<String, String> input = builder.stream("input");
    KTable<String, Long> wordCounts = input
        .flatMapValues(text -> Arrays.asList(text.split(" ")))
        .groupBy((key, word) -> word)
        .count();
    
    // Test the topology
    TopologyTestDriver testDriver = new TopologyTestDriver(
        builder.build(), 
        props
    );
    
    TestInputTopic<String, String> inputTopic = testDriver
        .createInputTopic("input", Serdes.String().serializer(), Serdes.String().serializer());
    
    TestOutputTopic<String, Long> outputTopic = testDriver
        .createOutputTopic("output", Serdes.String().deserializer(), Serdes.Long().deserializer());
    
    // Send test data
    inputTopic.pipeInput("key", "hello world hello");
    
    // Verify results
    assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("hello", 2L));
    assertThat(outputTopic.readKeyValue()).isEqualTo(new KeyValue<>("world", 1L));
    
    testDriver.close();
}

Best Practices

1. Choose the Right Windowing Strategy

  • Use tumbling windows for fixed-time aggregations
  • Use hopping windows for overlapping analysis
  • Use session windows for user behavior analysis

2. Optimize State Stores

// Use appropriate state store type
Materialized.as("store-name")
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.Long())
    .withCachingEnabled()
    .withLoggingEnabled();

3. Handle Backpressure

// Configure processing guarantees
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);

4. Monitor Performance

  • Track processing latency
  • Monitor consumer lag
  • Set up alerts for failures

Common Pitfalls

1. Memory Issues

  • Use appropriate window sizes
  • Clean up old state stores
  • Monitor memory usage

2. Ordering Issues

  • Understand partition-level ordering
  • Use appropriate partitioning strategies

3. State Management

  • Handle state store failures
  • Implement proper cleanup logic

Next Steps

This tutorial covered the basics of Kafka Streams, but there’s much more to explore:

  • Advanced Windowing: Sliding windows, custom windowing
  • Complex Joins: Multi-stream joins, temporal joins
  • State Management: Custom state stores, state migration
  • Performance Tuning: Optimization techniques
  • Production Deployment: Monitoring, scaling, fault tolerance

Ready to master Kafka Streams and build production-ready streaming applications? Check out our comprehensive Apache Kafka Mastery Course that covers everything from fundamentals to advanced stream processing patterns.


This article is part of our Stream Processing series. Subscribe to get the latest Kafka and streaming insights delivered to your inbox.

Custom Ad Space (post-in-content)
A

Author Name

Senior Developer & Technical Writer

Related Posts