Spring Kafka

Complete guide to Spring Kafka: the Spring framework for integrating Apache Kafka into Java applications. Learn producers, consumers, error handling, transactions, and build event-driven microservices.

Table of Contents

1. What is Spring Kafka?

Spring Kafka is a Spring Framework project that provides integration with Apache Kafka, a distributed streaming platform. It simplifies Kafka producer and consumer development by providing a high-level abstraction over the native Kafka client API, following Spring's familiar patterns and conventions.

Spring Kafka builds on top of Spring Framework and Spring Boot, providing:

  • KafkaTemplate for sending messages (producer)
  • @KafkaListener annotation for receiving messages (consumer)
  • Auto-configuration for Kafka producers and consumers
  • Transaction support for Kafka operations
  • Error handling and retry mechanisms
  • Testing utilities for Kafka integration
  • Integration with Spring Boot Actuator for monitoring

Spring Kafka abstracts away the complexity of the native Kafka client API while maintaining full access to Kafka's powerful features like partitioning, replication, and exactly-once semantics.

2. Why Use Spring Kafka?

  • Spring Integration: Seamlessly integrates with Spring Framework and Spring Boot.
  • Simplified API: High-level abstractions reduce boilerplate code.
  • Auto-Configuration: Spring Boot auto-configures Kafka producers and consumers.
  • Annotation-Based: Use @KafkaListener for declarative message consumption.
  • Transaction Support: Built-in support for Kafka transactions.
  • Error Handling: Comprehensive error handling and retry mechanisms.
  • Testing Support: Embedded Kafka for testing without external dependencies.
  • Monitoring: Integration with Spring Boot Actuator for metrics and health checks.
  • Reactive Support: Support for reactive programming with Spring WebFlux.

3. Apache Kafka Concepts

Understanding core Kafka concepts is essential for working with Spring Kafka:

3.1 Topics and Partitions

  • Topic: A category or feed name to which messages are published.
  • Partition: Topics are divided into partitions for scalability and parallelism.
  • Offset: Unique identifier for each message within a partition.

3.2 Producers and Consumers

  • Producer: Publishes messages to Kafka topics.
  • Consumer: Subscribes to topics and processes messages.
  • Consumer Group: Group of consumers that work together to consume messages.

3.3 Brokers and Clusters

  • Broker: A Kafka server that stores and serves messages.
  • Cluster: Collection of brokers working together.
  • Replication: Copies of partitions across multiple brokers for fault tolerance.
graph TB subgraph "Kafka Cluster" B1[Broker 1] B2[Broker 2] B3[Broker 3] end subgraph "Topics" T1[orders-topic
Partition 0, 1, 2] T2[users-topic
Partition 0, 1] end subgraph "Producers" P1[Order Producer] P2[User Producer] end subgraph "Consumers" C1[Order Consumer
Group: order-processors] C2[Order Consumer
Group: order-processors] C3[User Consumer
Group: user-processors] end P1 -->|Publish| T1 P2 -->|Publish| T2 T1 -->|Distribute| B1 T1 -->|Distribute| B2 T1 -->|Distribute| B3 T2 -->|Distribute| B1 T2 -->|Distribute| B2 T1 -->|Consume| C1 T1 -->|Consume| C2 T2 -->|Consume| C3 style P1 fill:#e1f5ff,stroke:#0273bd,stroke-width:2px style P2 fill:#e1f5ff,stroke:#0273bd,stroke-width:2px style C1 fill:#fff4e1,stroke:#f57c00,stroke-width:2px style C2 fill:#fff4e1,stroke:#f57c00,stroke-width:2px style C3 fill:#fff4e1,stroke:#f57c00,stroke-width:2px style T1 fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px style T2 fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px

4. Spring Kafka Architecture

Spring Kafka provides a layered architecture that abstracts Kafka's complexity:

4.1 Architecture Layers

  1. Application Layer: Your business logic and services.
  2. Spring Kafka Layer: KafkaTemplate, @KafkaListener, and configuration.
  3. Kafka Client Layer: Native Kafka producer and consumer APIs.
  4. Kafka Broker Layer: Kafka cluster and brokers.
graph TB subgraph "Application Layer" A[Service Classes] B[Controllers] C[Business Logic] end subgraph "Spring Kafka Layer" D[KafkaTemplate] E[@KafkaListener] F[KafkaAdmin] G[Error Handlers] end subgraph "Kafka Client Layer" H[KafkaProducer] I[KafkaConsumer] J[ProducerRecord] K[ConsumerRecord] end subgraph "Kafka Broker Layer" L[Kafka Cluster] M[Topics] N[Partitions] end A --> D B --> D C --> E D --> H E --> I H --> J I --> K J --> L K --> L L --> M M --> N style A fill:#e1f5ff,stroke:#0273bd,stroke-width:3px style D fill:#fff4e1,stroke:#f57c00,stroke-width:2px style E fill:#fff4e1,stroke:#f57c00,stroke-width:2px style H fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px style I fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px style L fill:#e8f5e9,stroke:#2e7d32,stroke-width:2px

5. Project Setup

5.1 Maven Dependencies

<dependencies>
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>3.1.0</version>
    </dependency>
    
    <!-- Spring Boot Starter Kafka -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-kafka</artifactId>
    </dependency>
    
    <!-- JSON Serialization -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
    </dependency>
    
    <!-- Testing -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

5.2 Application Configuration

// application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
    consumer:
      group-id: my-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        spring.json.trusted.packages: "*"
    listener:
      ack-mode: manual_immediate
      concurrency: 3

6. Producer Configuration

6.1 KafkaTemplate

KafkaTemplate is the primary interface for sending messages to Kafka topics.

@Service
public class OrderProducer {
    
    private static final String TOPIC = "orders";
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    public void sendOrder(Order order) {
        kafkaTemplate.send(TOPIC, order.getId().toString(), order);
    }
    
    public void sendOrderWithCallback(Order order) {
        ListenableFuture<SendResult<String, Order>> future = 
            kafkaTemplate.send(TOPIC, order.getId().toString(), order);
        
        future.addCallback(new ListenableFutureCallback<SendResult<String, Order>>() {
            @Override
            public void onSuccess(SendResult<String, Order> result) {
                System.out.println("Sent message: " + result.getRecordMetadata());
            }
            
            @Override
            public void onFailure(Throwable ex) {
                System.err.println("Failed to send message: " + ex.getMessage());
            }
        });
    }
    
    public void sendOrderToPartition(Order order, int partition) {
        kafkaTemplate.send(TOPIC, partition, order.getId().toString(), order);
    }
    
    public void sendOrderWithTimestamp(Order order) {
        kafkaTemplate.send(TOPIC, null, System.currentTimeMillis(), 
                          order.getId().toString(), order);
    }
}

6.2 Producer Configuration

@Configuration
public class KafkaProducerConfig {
    
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public KafkaTemplate<String, Order> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

6.3 Async Producer

@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Async
    public CompletableFuture<SendResult<String, Order>> sendOrderAsync(Order order) {
        return kafkaTemplate.send("orders", order.getId().toString(), order)
            .completable();
    }
}

7. Consumer Configuration

7.1 @KafkaListener

@KafkaListener provides a declarative way to consume messages from Kafka topics.

@Component
public class OrderConsumer {
    
    @KafkaListener(topics = "orders", groupId = "order-processors")
    public void consumeOrder(Order order) {
        System.out.println("Received order: " + order);
        // Process order
    }
    
    @KafkaListener(topics = "orders", groupId = "order-processors")
    public void consumeOrderWithHeaders(Order order, 
                                       @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                                       @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                       @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("Received order from topic: " + topic + 
                          ", partition: " + partition + ", offset: " + offset);
        // Process order
    }
    
    @KafkaListener(topics = "orders", groupId = "order-processors",
                   containerFactory = "kafkaListenerContainerFactory")
    public void consumeOrderWithManualAck(Order order, Acknowledgment acknowledgment) {
        try {
            // Process order
            processOrder(order);
            acknowledgment.acknowledge();
        } catch (Exception e) {
            // Handle error, don't acknowledge
            System.err.println("Error processing order: " + e.getMessage());
        }
    }
    
    private void processOrder(Order order) {
        // Order processing logic
    }
}

7.2 Consumer Configuration

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                       StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                       JsonDeserializer.class);
        configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> 
           kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

7.3 Multiple Topics and Partitions

@Component
public class MultiTopicConsumer {
    
    @KafkaListener(topics = {"orders", "payments"}, groupId = "multi-topic-consumer")
    public void consumeFromMultipleTopics(ConsumerRecord<String, Object> record) {
        String topic = record.topic();
        Object value = record.value();
        
        if ("orders".equals(topic)) {
            processOrder((Order) value);
        } else if ("payments".equals(topic)) {
            processPayment((Payment) value);
        }
    }
    
    @KafkaListener(topicPartitions = @TopicPartition(
        topic = "orders",
        partitions = {"0", "1"}),
        groupId = "partitioned-consumer")
    public void consumeFromSpecificPartitions(Order order) {
        // Process orders from partitions 0 and 1
        processOrder(order);
    }
    
    @KafkaListener(topicPartitions = @TopicPartition(
        topic = "orders",
        partitionOffsets = {
            @PartitionOffset(partition = "2", initialOffset = "0"),
            @PartitionOffset(partition = "3", initialOffset = "100")
        }),
        groupId = "offset-consumer")
    public void consumeWithInitialOffset(Order order) {
        // Process orders starting from specific offsets
        processOrder(order);
    }
}

7.4 Batch Consumer

@Component
public class BatchOrderConsumer {
    
    @KafkaListener(topics = "orders", groupId = "batch-consumer",
                   containerFactory = "batchKafkaListenerContainerFactory")
    public void consumeBatch(List<ConsumerRecord<String, Order>> records,
                            Acknowledgment acknowledgment) {
        System.out.println("Received batch of " + records.size() + " orders");
        
        for (ConsumerRecord<String, Order> record : records) {
            processOrder(record.value());
        }
        
        acknowledgment.acknowledge();
    }
}

@Configuration
public class BatchConsumerConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> 
           batchKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setConcurrency(3);
        return factory;
    }
}

8. Error Handling

8.1 Error Handler

Spring Kafka provides comprehensive error handling mechanisms:

@Configuration
public class KafkaErrorHandlingConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> 
           kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Error handler
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(1000L, 3L) // Retry 3 times with 1 second delay
        ));
        
        return factory;
    }
    
    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {
        return new DeadLetterPublishingRecoverer(kafkaTemplate,
            (record, ex) -> {
                return new TopicPartition("orders-dlt", record.partition());
            });
    }
}

@Component
public class OrderConsumer {
    
    @KafkaListener(topics = "orders", groupId = "order-processors",
                   errorHandler = "orderErrorHandler")
    public void consumeOrder(Order order) {
        if (order.getAmount() < 0) {
            throw new IllegalArgumentException("Invalid order amount");
        }
        processOrder(order);
    }
}

@Component
public class OrderErrorHandler implements ConsumerAwareErrorHandler {
    
    @Override
    public void handle(Exception thrownException, 
                      List<ConsumerRecord<?, ?>> records,
                      Consumer<?, ?> consumer,
                      MessageListenerContainer container) {
        System.err.println("Error processing message: " + thrownException.getMessage());
        // Custom error handling logic
    }
}

8.2 Retry Mechanism

@Configuration
public class KafkaRetryConfig {
    
    @Bean
    public RetryTopicConfiguration retryTopicConfiguration(
            KafkaTemplate<String, Order> kafkaTemplate) {
        return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 10000)
            .maxAttempts(4)
            .retryOn(IllegalArgumentException.class)
            .create(kafkaTemplate);
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Order> 
           kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Order> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // Retry with exponential backoff
        ExponentialBackOff backOff = new ExponentialBackOff();
        backOff.setInitialInterval(1000);
        backOff.setMultiplier(2);
        backOff.setMaxInterval(10000);
        
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            new DeadLetterPublishingRecoverer(kafkaTemplate()),
            backOff
        );
        errorHandler.setRetryListeners(new RetryListener() {
            @Override
            public void failedDelivery(ConsumerRecord<?, ?> record, 
                                     Exception ex, 
                                     int deliveryAttempt) {
                System.err.println("Failed delivery attempt " + deliveryAttempt + 
                                 " for record: " + record);
            }
        });
        
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

9. Transactions

Spring Kafka supports transactions for exactly-once semantics:

@Configuration
public class KafkaTransactionConfig {
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-producer-tx");
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        DefaultKafkaProducerFactory<String, Order> factory = 
            new DefaultKafkaProducerFactory<>(configProps);
        factory.setTransactionIdPrefix("order-tx-");
        return factory;
    }
    
    @Bean
    public KafkaTransactionManager<String, Order> transactionManager(
            ProducerFactory<String, Order> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }
    
    @Bean
    public KafkaTemplate<String, Order> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

@Service
public class OrderService {
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    @Transactional
    public void createOrder(Order order) {
        // Save to database
        orderRepository.save(order);
        
        // Send to Kafka in the same transaction
        kafkaTemplate.send("orders", order.getId().toString(), order);
        
        // If any operation fails, both are rolled back
    }
    
    @Transactional(transactionManager = "kafkaTransactionManager")
    public void sendMultipleOrders(List<Order> orders) {
        for (Order order : orders) {
            kafkaTemplate.send("orders", order.getId().toString(), order);
        }
        // All messages are sent atomically
    }
}

10. Serialization and Deserialization

10.1 JSON Serialization

@Configuration
public class KafkaSerializationConfig {
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       JsonSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
                       StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
                       JsonDeserializer.class);
        configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.model");
        configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
        
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
}

10.2 Custom Serializer

public class OrderSerializer implements Serializer<Order> {
    
    private ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration if needed
    }
    
    @Override
    public byte[] serialize(String topic, Order order) {
        try {
            return objectMapper.writeValueAsBytes(order);
        } catch (Exception e) {
            throw new SerializationException("Error serializing Order", e);
        }
    }
    
    @Override
    public void close() {
        // Cleanup if needed
    }
}

public class OrderDeserializer implements Deserializer<Order> {
    
    private ObjectMapper objectMapper = new ObjectMapper();
    
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // Configuration if needed
    }
    
    @Override
    public Order deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(data, Order.class);
        } catch (Exception e) {
            throw new SerializationException("Error deserializing Order", e);
        }
    }
    
    @Override
    public void close() {
        // Cleanup if needed
    }
}

11. Real-World Examples

11.1 Event-Driven Order Processing

Complete example of event-driven order processing:

// Order Service - Producer
@Service
public class OrderService {
    
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    public Order createOrder(OrderRequest request) {
        Order order = new Order();
        order.setUserId(request.getUserId());
        order.setItems(request.getItems());
        order.setStatus("CREATED");
        
        order = orderRepository.save(order);
        
        // Publish order created event
        OrderEvent event = new OrderEvent();
        event.setEventType("ORDER_CREATED");
        event.setOrderId(order.getId());
        event.setOrder(order);
        event.setTimestamp(Instant.now());
        
        kafkaTemplate.send("order-events", order.getId().toString(), event);
        
        return order;
    }
}

// Payment Service - Consumer
@Component
public class PaymentService {
    
    @KafkaListener(topics = "order-events", groupId = "payment-processors",
                   filter = "orderEventFilter")
    public void processOrderCreated(OrderEvent event) {
        if ("ORDER_CREATED".equals(event.getEventType())) {
            processPayment(event.getOrder());
        }
    }
    
    @Bean
    public RecordFilterStrategy<String, OrderEvent> orderEventFilter() {
        return record -> !"ORDER_CREATED".equals(record.value().getEventType());
    }
    
    private void processPayment(Order order) {
        // Process payment logic
        Payment payment = new Payment();
        payment.setOrderId(order.getId());
        payment.setAmount(order.getTotal());
        payment.setStatus("PROCESSING");
        
        // Save payment and publish event
        // ...
    }
}

// Notification Service - Consumer
@Component
public class NotificationService {
    
    @KafkaListener(topics = "order-events", groupId = "notification-processors")
    public void sendNotification(OrderEvent event) {
        if ("ORDER_CREATED".equals(event.getEventType())) {
            sendOrderConfirmation(event.getOrder());
        } else if ("ORDER_PAID".equals(event.getEventType())) {
            sendPaymentConfirmation(event.getOrder());
        }
    }
    
    private void sendOrderConfirmation(Order order) {
        // Send order confirmation email/SMS
    }
    
    private void sendPaymentConfirmation(Order order) {
        // Send payment confirmation email/SMS
    }
}

11.2 Kafka Streams Integration

// Dependencies
// <dependency>
//     <groupId>org.springframework.kafka</groupId>
//     <artifactId>spring-kafka</artifactId>
// </dependency>
// <dependency>
//     <groupId>org.apache.kafka</groupId>
//     <artifactId>kafka-streams</artifactId>
// </dependency>

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
    
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAM_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-streams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
                 Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
                 Serdes.String().getClass());
        
        return new KafkaStreamsConfiguration(props);
    }
}

@Component
public class OrderStreamProcessor {
    
    @Autowired
    private StreamsBuilder streamsBuilder;
    
    @PostConstruct
    public void buildPipeline() {
        KStream<String, Order> orders = streamsBuilder.stream("orders");
        
        // Filter high-value orders
        KStream<String, Order> highValueOrders = orders
            .filter((key, order) -> order.getTotal() > 1000);
        
        // Send to high-value orders topic
        highValueOrders.to("high-value-orders");
        
        // Count orders by status
        orders.groupBy((key, order) -> order.getStatus())
            .count()
            .toStream()
            .to("order-status-counts");
    }
}

12. Best Practices

  • Idempotent Producers: Enable idempotence for exactly-once semantics.
  • Manual Offset Management: Use manual acknowledgment for better control.
  • Error Handling: Implement proper error handling and dead letter topics.
  • Consumer Groups: Use meaningful consumer group names.
  • Partitioning: Choose appropriate partition keys for even distribution.
  • Batch Processing: Use batch consumers for better throughput.
  • Monitoring: Monitor consumer lag and producer metrics.
  • Transactions: Use transactions for exactly-once processing.
  • Serialization: Use efficient serialization formats (JSON, Avro, Protobuf).
  • Testing: Use embedded Kafka for integration testing.

13. Testing

Spring Kafka provides testing utilities including embedded Kafka:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"orders", "order-events"})
public class OrderServiceTest {
    
    @Autowired
    private OrderService orderService;
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;
    
    @Test
    public void testOrderCreation() {
        OrderRequest request = new OrderRequest();
        request.setUserId(1L);
        request.setItems(Arrays.asList(new OrderItem()));
        
        Order order = orderService.createOrder(request);
        
        assertNotNull(order);
        assertNotNull(order.getId());
    }
    
    @Test
    public void testOrderEventPublishing() throws InterruptedException {
        OrderRequest request = new OrderRequest();
        request.setUserId(1L);
        
        orderService.createOrder(request);
        
        // Verify message was published
        ConsumerRecord<String, OrderEvent> record = 
            KafkaTestUtils.getSingleRecord(consumer, "order-events");
        
        assertNotNull(record);
        assertEquals("ORDER_CREATED", record.value().getEventType());
    }
    
    @Test
    public void testOrderConsumer() {
        Order order = new Order();
        order.setId(1L);
        order.setStatus("CREATED");
        
        kafkaTemplate.send("orders", order.getId().toString(), order);
        
        // Wait for consumer to process
        // Verify order was processed
    }
}

14. Advanced Concepts

14.1 Exactly-Once Semantics

@Configuration
public class ExactlyOnceConfig {
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        configProps.put(ProducerConfig.ACKS_CONFIG, "all");
        configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        configProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
    
    @Bean
    public ConsumerFactory<String, Order> consumerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        configProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        
        return new DefaultKafkaConsumerFactory<>(configProps);
    }
}

14.2 Schema Registry Integration

// Using Confluent Schema Registry with Avro
// Dependencies
// <dependency>
//     <groupId>io.confluent</groupId>
//     <artifactId>kafka-avro-serializer</artifactId>
// </dependency>

@Configuration
public class AvroConfig {
    
    @Bean
    public ProducerFactory<String, Order> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
                       StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
                       KafkaAvroSerializer.class);
        configProps.put("schema.registry.url", "http://localhost:8081");
        
        return new DefaultKafkaProducerFactory<>(configProps);
    }
}

14.3 Reactive Kafka

// Dependencies
// <dependency>
//     <groupId>io.projectreactor.kafka</groupId>
//     <artifactId>reactor-kafka</artifactId>
// </dependency>

@Service
public class ReactiveOrderService {
    
    @Autowired
    private ReactiveKafkaProducerTemplate<String, Order> kafkaTemplate;
    
    public Mono<Void> sendOrder(Order order) {
        return kafkaTemplate.send("orders", order.getId().toString(), order)
            .then();
    }
    
    public Flux<Order> consumeOrders() {
        return kafkaTemplate.receive("orders")
            .map(record -> record.value())
            .doOnNext(order -> processOrder(order));
    }
}

15. Conclusion

Spring Kafka provides a powerful and developer-friendly way to integrate Apache Kafka into Spring applications. It simplifies Kafka development while maintaining access to Kafka's advanced features.

Key takeaways:

  • Spring Kafka simplifies Kafka producer and consumer development
  • It provides comprehensive error handling and retry mechanisms
  • Transaction support enables exactly-once semantics
  • Testing utilities make it easy to test Kafka integration
  • It integrates seamlessly with the Spring ecosystem

Whether you're building event-driven microservices, real-time data pipelines, or stream processing applications, Spring Kafka provides the tools and abstractions you need to build robust, scalable Kafka-based solutions.

Post a Comment

0 Comments