Spring Integration

Complete guide to Spring Integration: the Spring framework for Enterprise Application Integration (EAI). Learn messaging patterns, channels, endpoints, and adapters to integrate disparate systems seamlessly.

Table of Contents

1. What is Spring Integration?

Spring Integration is an extension of the Spring Framework that provides a lightweight messaging framework for Enterprise Application Integration (EAI). It enables you to build messaging-based applications using a simple, declarative model based on the Enterprise Integration Patterns (EIP) described in the book "Enterprise Integration Patterns" by Gregor Hohpe and Bobby Woolf.

Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns. It provides a framework for integrating disparate systems through messaging, allowing you to connect different applications, services, and data sources without tight coupling.

The framework provides a simple, consistent model for building integration solutions while maintaining Spring's core principles: dependency injection, aspect-oriented programming, and the ability to test components in isolation.

2. Why Use Spring Integration?

  • Enterprise Integration Patterns: Implements proven EIP patterns like channels, routers, transformers, and filters.
  • Loose Coupling: Decouples systems through messaging, reducing dependencies between components.
  • Spring Ecosystem: Seamlessly integrates with Spring Framework, Spring Boot, and other Spring projects.
  • Multiple Protocols: Supports various protocols including HTTP, JMS, FTP, SFTP, TCP, UDP, WebSocket, and more.
  • Declarative Configuration: Configure integration flows using XML, Java configuration, or Spring Boot auto-configuration.
  • Testing Support: Built-in testing framework for unit and integration testing of message flows.
  • Error Handling: Comprehensive error handling and retry mechanisms for robust integration solutions.
  • Monitoring: Integration with Spring Boot Actuator for monitoring and metrics.

3. Spring Integration Architecture

Spring Integration follows a messaging-based architecture where components communicate through messages sent over channels:

3.1 Architecture Layers

  1. Application Layer: Your business logic and services.
  2. Integration Layer: Spring Integration components (channels, endpoints, adapters).
  3. Transport Layer: External systems and protocols (HTTP, JMS, FTP, etc.).

This architecture allows you to:

  • Decouple business logic from integration concerns
  • Compose complex integration flows from simple components
  • Test integration flows independently
  • Switch between different transports without changing business logic
graph TB subgraph "Application Layer" A[Business Services] B[Domain Models] end subgraph "Integration Layer" C[Message Channels] D[Message Endpoints] E[Message Transformers] F[Message Routers] G[Message Filters] end subgraph "Transport Layer" H[HTTP Adapter] I[JMS Adapter] J[FTP Adapter] K[File Adapter] L[Database Adapter] end subgraph "External Systems" M[REST API] N[Message Queue] O[File System] P[Database] end A --> C B --> C C --> D D --> E E --> F F --> G G --> C D --> H D --> I D --> J D --> K D --> L H --> M I --> N J --> O K --> O L --> P style A fill:#e1f5ff,stroke:#0273bd,stroke-width:3px style C fill:#fff4e1,stroke:#f57c00,stroke-width:2px style D fill:#fff4e1,stroke:#f57c00,stroke-width:2px style E fill:#fff4e1,stroke:#f57c00,stroke-width:2px style F fill:#fff4e1,stroke:#f57c00,stroke-width:2px style G fill:#fff4e1,stroke:#f57c00,stroke-width:2px

4. Core Concepts

4.1 Messages

A Message is the fundamental unit of data in Spring Integration. It consists of:

  • Payload: The actual data being transferred (any Java object)
  • Headers: Metadata about the message (ID, timestamp, correlation ID, etc.)
// Creating a message programmatically
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

Message<String> message = MessageBuilder
    .withPayload("Hello, Spring Integration!")
    .setHeader("messageId", "12345")
    .setHeader("timestamp", System.currentTimeMillis())
    .setHeader("priority", 5)
    .build();

// Accessing message components
String payload = message.getPayload();
Object messageId = message.getHeaders().get("messageId");
Object timestamp = message.getHeaders().get("timestamp");

4.1.1 Message Headers

Spring Integration provides several standard headers:

  • ID: Unique identifier for the message
  • TIMESTAMP: When the message was created
  • CORRELATION_ID: Used for correlating related messages
  • REPLY_CHANNEL: Channel for sending replies
  • ERROR_CHANNEL: Channel for error messages
  • SEQUENCE_NUMBER: Position in a sequence
  • SEQUENCE_SIZE: Total size of the sequence

4.2 Channels

A Channel is the pipe that connects message producers and consumers. It decouples components and provides asynchronous messaging capabilities.

4.2.1 Channel Types

  • DirectChannel: Synchronous, point-to-point channel
  • QueueChannel: Asynchronous, buffered channel
  • PublishSubscribeChannel: Broadcasts messages to all subscribers
  • ExecutorChannel: Asynchronous channel using thread pool
  • PriorityChannel: Orders messages by priority
  • RendezvousChannel: Synchronous handoff between sender and receiver
// Java Configuration
@Configuration
@EnableIntegration
public class ChannelConfig {
    
    @Bean
    public DirectChannel inputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public QueueChannel queueChannel() {
        return new QueueChannel(10); // Buffer size: 10
    }
    
    @Bean
    public PublishSubscribeChannel pubSubChannel() {
        return new PublishSubscribeChannel();
    }
    
    @Bean
    public ExecutorChannel executorChannel() {
        return new ExecutorChannel(Executors.newFixedThreadPool(5));
    }
}

4.3 Endpoints

Endpoints are components that connect channels to application logic or external systems. They consume messages from input channels and produce messages to output channels.

4.3.1 Endpoint Types

  • Service Activator: Invokes a method on a Spring bean
  • Transformer: Transforms message payload or headers
  • Router: Routes messages to different channels based on conditions
  • Filter: Filters messages based on conditions
  • Splitter: Splits one message into multiple messages
  • Aggregator: Combines multiple messages into one
  • Gateway: Provides a simple interface for sending/receiving messages

4.4 Message Flow

Messages flow through a Spring Integration application following this pattern:

  1. Message is created (by an adapter, gateway, or programmatically)
  2. Message is sent to an input channel
  3. Endpoint consumes the message from the channel
  4. Endpoint processes the message (transforms, routes, filters, etc.)
  5. Processed message is sent to an output channel
  6. Next endpoint consumes and processes, or message reaches final destination
sequenceDiagram participant Source as Message Source participant Channel1 as Input Channel participant Endpoint as Message Endpoint participant Channel2 as Output Channel participant Target as Message Target Source->>Channel1: Send Message Channel1->>Endpoint: Deliver Message Endpoint->>Endpoint: Process Message Endpoint->>Channel2: Send Processed Message Channel2->>Target: Deliver Message

5. Channels in Detail

5.1 DirectChannel

DirectChannel is a point-to-point channel that delivers messages synchronously to a single subscriber. If multiple subscribers exist, it uses a round-robin load balancing strategy.

@Configuration
@EnableIntegration
public class DirectChannelConfig {
    
    @Bean
    public DirectChannel directChannel() {
        DirectChannel channel = new DirectChannel();
        // Optional: Set load balancer
        channel.setLoadBalancer(new RoundRobinLoadBalancer());
        return channel;
    }
    
    @ServiceActivator(inputChannel = "directChannel")
    public String processMessage(String payload) {
        return "Processed: " + payload;
    }
}

5.2 QueueChannel

QueueChannel provides asynchronous, buffered messaging. Messages are stored in a queue until a consumer is available.

@Bean
public QueueChannel queueChannel() {
    // Unbounded queue
    return new QueueChannel();
    
    // Or bounded queue with capacity
    // return new QueueChannel(100);
}

@ServiceActivator(inputChannel = "queueChannel")
public void processAsync(Message<String> message) {
    // Process message asynchronously
    System.out.println("Processing: " + message.getPayload());
}

5.3 PublishSubscribeChannel

PublishSubscribeChannel broadcasts messages to all subscribers. Useful for event-driven architectures.

@Bean
public PublishSubscribeChannel pubSubChannel() {
    return new PublishSubscribeChannel();
}

@ServiceActivator(inputChannel = "pubSubChannel")
public void subscriber1(String message) {
    System.out.println("Subscriber 1: " + message);
}

@ServiceActivator(inputChannel = "pubSubChannel")
public void subscriber2(String message) {
    System.out.println("Subscriber 2: " + message);
}

5.4 ExecutorChannel

ExecutorChannel provides asynchronous processing using a thread pool executor.

@Bean
public ExecutorChannel executorChannel() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("si-executor-");
    executor.initialize();
    
    return new ExecutorChannel(executor);
}

6. Endpoints in Detail

6.1 Service Activator

Service Activator invokes a method on a Spring bean to process messages.

@Component
public class OrderService {
    
    @ServiceActivator(inputChannel = "orderChannel", outputChannel = "processedOrderChannel")
    public Order processOrder(Order order) {
        // Process order
        order.setStatus("PROCESSED");
        return order;
    }
    
    // Access full message
    @ServiceActivator(inputChannel = "orderChannel")
    public void processOrderWithHeaders(Message<Order> message) {
        Order order = message.getPayload();
        String orderId = (String) message.getHeaders().get("orderId");
        // Process order
    }
}

6.2 Transformer

Transformer converts message payload or headers from one format to another.

@Component
public class OrderTransformer {
    
    @Transformer(inputChannel = "stringChannel", outputChannel = "orderChannel")
    public Order transformStringToOrder(String jsonString) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.readValue(jsonString, Order.class);
        } catch (Exception e) {
            throw new MessageTransformationException("Failed to transform", e);
        }
    }
    
    @Transformer(inputChannel = "orderChannel", outputChannel = "xmlChannel")
    public String transformOrderToXml(Order order) {
        // Convert Order to XML
        return convertToXml(order);
    }
}

6.3 Router

Router routes messages to different channels based on message content or headers.

@Component
public class OrderRouter {
    
    @Router(inputChannel = "orderChannel")
    public String routeOrder(Order order) {
        switch (order.getType()) {
            case "PREMIUM":
                return "premiumOrderChannel";
            case "STANDARD":
                return "standardOrderChannel";
            default:
                return "defaultOrderChannel";
        }
    }
    
    // Using header-based routing
    @Router(inputChannel = "orderChannel")
    public String routeByHeader(@Header("orderType") String orderType) {
        return orderType + "OrderChannel";
    }
    
    // Multiple channels
    @Router(inputChannel = "orderChannel")
    public List<String> routeToMultipleChannels(Order order) {
        List<String> channels = new ArrayList<>();
        channels.add("orderProcessingChannel");
        if (order.isUrgent()) {
            channels.add("urgentOrderChannel");
        }
        return channels;
    }
}

6.4 Filter

Filter allows or blocks messages based on conditions.

@Component
public class OrderFilter {
    
    @Filter(inputChannel = "orderChannel", outputChannel = "validOrderChannel")
    public boolean filterOrders(Order order) {
        // Only allow orders with valid status
        return order.getStatus() != null && 
               !order.getStatus().equals("CANCELLED");
    }
    
    @Filter(inputChannel = "orderChannel", outputChannel = "validOrderChannel", 
            discardChannel = "invalidOrderChannel")
    public boolean filterWithDiscard(Order order) {
        return order.getAmount() > 0;
    }
}

6.5 Splitter

Splitter breaks one message into multiple messages.

@Component
public class OrderSplitter {
    
    @Splitter(inputChannel = "orderChannel", outputChannel = "orderItemChannel")
    public List<OrderItem> splitOrder(Order order) {
        return order.getItems();
    }
    
    // Using Iterator
    @Splitter(inputChannel = "orderChannel", outputChannel = "orderItemChannel")
    public Iterator<OrderItem> splitOrderIterator(Order order) {
        return order.getItems().iterator();
    }
}

6.6 Aggregator

Aggregator combines multiple messages into a single message.

@Component
public class OrderAggregator {
    
    @Aggregator(inputChannel = "orderItemChannel", outputChannel = "completeOrderChannel")
    public Order aggregateOrders(List<Message<OrderItem>> messages) {
        Order order = new Order();
        List<OrderItem> items = new ArrayList<>();
        
        for (Message<OrderItem> message : messages) {
            items.add(message.getPayload());
        }
        
        order.setItems(items);
        return order;
    }
    
    // Correlation strategy
    @CorrelationStrategy
    public String correlateBy(OrderItem item) {
        return item.getOrderId();
    }
    
    // Release strategy
    @ReleaseStrategy
    public boolean release(List<Message<OrderItem>> messages) {
        // Release when we have 10 items or after 5 seconds
        return messages.size() >= 10;
    }
}

6.7 Gateway

Gateway provides a simple interface for sending and receiving messages without dealing with channels directly.

@MessagingGateway
public interface OrderGateway {
    
    @Gateway(requestChannel = "orderChannel", replyChannel = "orderResponseChannel")
    Order processOrder(Order order);
    
    @Gateway(requestChannel = "orderChannel")
    void sendOrder(Order order); // Fire and forget
    
    @Gateway(requestChannel = "orderChannel", 
             requestTimeout = 5000, 
             replyTimeout = 10000)
    Order processOrderWithTimeout(Order order);
}

// Usage
@Service
public class OrderService {
    
    @Autowired
    private OrderGateway orderGateway;
    
    public void processOrder(Order order) {
        Order processed = orderGateway.processOrder(order);
        // Use processed order
    }
}

7. Adapters

Adapters connect Spring Integration to external systems and protocols. They handle the complexity of different transport mechanisms.

7.1 HTTP Adapter

HTTP adapters enable integration with REST APIs and web services.

// Inbound HTTP (receiving HTTP requests)
@Configuration
@EnableIntegration
public class HttpInboundConfig {
    
    @Bean
    public HttpRequestHandlingMessagingGateway httpInboundGateway() {
        HttpRequestHandlingMessagingGateway gateway = 
            new HttpRequestHandlingMessagingGateway(true);
        gateway.setRequestMapping(createRequestMapping());
        gateway.setRequestChannel(httpRequestChannel());
        gateway.setReplyChannel(httpResponseChannel());
        return gateway;
    }
    
    @Bean
    public RequestMapping createRequestMapping() {
        RequestMapping mapping = new RequestMapping();
        mapping.setPathPatterns("/api/orders");
        mapping.setMethods(RequestMethod.POST);
        return mapping;
    }
    
    @Bean
    public DirectChannel httpRequestChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public DirectChannel httpResponseChannel() {
        return new DirectChannel();
    }
}

// Outbound HTTP (sending HTTP requests)
@Bean
public HttpRequestExecutingMessageHandler httpOutboundGateway() {
    HttpRequestExecutingMessageHandler handler = 
        new HttpRequestExecutingMessageHandler("http://external-api.com/orders");
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}

@ServiceActivator(inputChannel = "httpOutboundChannel")
public HttpRequestExecutingMessageHandler httpOutboundGateway() {
    return new HttpRequestExecutingMessageHandler("http://external-api.com/orders");
}

7.2 JMS Adapter

JMS adapters integrate with message queues like ActiveMQ, RabbitMQ, or IBM MQ.

// JMS Inbound (receiving from queue)
@Bean
public JmsMessageDrivenEndpoint jmsInboundAdapter(
        ConnectionFactory connectionFactory) {
    JmsMessageDrivenEndpoint adapter = 
        new JmsMessageDrivenEndpoint(
            new JmsDestinationPollingSource(
                new DefaultMessageListenerContainer()));
    adapter.setOutputChannel(jmsInputChannel());
    return adapter;
}

// JMS Outbound (sending to queue)
@Bean
@ServiceActivator(inputChannel = "jmsOutboundChannel")
public JmsSendingMessageHandler jmsOutboundAdapter(
        ConnectionFactory connectionFactory) {
    JmsSendingMessageHandler handler = 
        new JmsSendingMessageHandler(
            new JmsTemplate(connectionFactory));
    handler.setDestinationName("orders.queue");
    return handler;
}

7.3 File Adapter

File adapters read from and write to the file system.

// File Inbound (reading files)
@Bean
@InboundChannelAdapter(value = "fileInputChannel", 
                      poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    source.setDirectory(new File("/input"));
    source.setFilter(new SimplePatternFileListFilter("*.txt"));
    return source;
}

// File Outbound (writing files)
@Bean
@ServiceActivator(inputChannel = "fileOutputChannel")
public FileWritingMessageHandler fileWritingMessageHandler() {
    FileWritingMessageHandler handler = 
        new FileWritingMessageHandler(new File("/output"));
    handler.setFileExistsMode(FileExistsMode.APPEND);
    handler.setExpectReply(false);
    return handler;
}

7.4 FTP/SFTP Adapter

FTP and SFTP adapters integrate with remote file systems.

// FTP Inbound
@Bean
@InboundChannelAdapter(value = "ftpInputChannel", 
                      poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpInboundAdapter() {
    FtpInboundFileSynchronizer synchronizer = 
        new FtpInboundFileSynchronizer(createFtpSessionFactory());
    synchronizer.setRemoteDirectory("/remote/orders");
    synchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
    
    FtpInboundFileSynchronizingMessageSource source = 
        new FtpInboundFileSynchronizingMessageSource(synchronizer);
    source.setLocalDirectory(new File("/local/orders"));
    source.setAutoCreateLocalDirectory(true);
    return source;
}

@Bean
public DefaultFtpSessionFactory createFtpSessionFactory() {
    DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
    factory.setHost("ftp.example.com");
    factory.setPort(21);
    factory.setUsername("user");
    factory.setPassword("password");
    return factory;
}

// FTP Outbound
@Bean
@ServiceActivator(inputChannel = "ftpOutputChannel")
public FtpMessageHandler ftpOutboundAdapter() {
    FtpMessageHandler handler = 
        new FtpMessageHandler(createFtpSessionFactory());
    handler.setRemoteDirectoryExpression(
        new LiteralExpression("/remote/orders"));
    return handler;
}

7.5 Database Adapter

Database adapters integrate with databases for polling and writing.

// Database Inbound (polling)
@Bean
@InboundChannelAdapter(value = "dbInputChannel", 
                      poller = @Poller(fixedDelay = "5000"))
public MessageSource<Object> jdbcPollingChannelAdapter(DataSource dataSource) {
    JdbcPollingChannelAdapter adapter = 
        new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM orders WHERE status = 'PENDING'");
    adapter.setUpdateSql("UPDATE orders SET status = 'PROCESSING' WHERE id = :id");
    return adapter;
}

// Database Outbound (writing)
@Bean
@ServiceActivator(inputChannel = "dbOutputChannel")
public JdbcMessageHandler jdbcOutboundAdapter(DataSource dataSource) {
    JdbcMessageHandler handler = 
        new JdbcMessageHandler(dataSource, 
            "INSERT INTO processed_orders (id, status) VALUES (:payload.id, :payload.status)");
    return handler;
}

8. Project Setup

8.1 Maven Dependencies

<dependencies>
    <!-- Spring Integration Core -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-core</artifactId>
        <version>6.2.0</version>
    </dependency>
    
    <!-- Spring Integration HTTP -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-http</artifactId>
        <version>6.2.0</version>
    </dependency>
    
    <!-- Spring Integration JMS -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-jms</artifactId>
        <version>6.2.0</version>
    </dependency>
    
    <!-- Spring Integration File -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-file</artifactId>
        <version>6.2.0</version>
    </dependency>
    
    <!-- Spring Integration FTP -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-ftp</artifactId>
        <version>6.2.0</version>
    </dependency>
    
    <!-- Spring Integration JDBC -->
    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-jdbc</artifactId>
        <version>6.2.0</version>
    </dependency>
    
    <!-- Spring Boot Starter Integration -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
</dependencies>

8.2 Spring Boot Auto-Configuration

Spring Boot provides auto-configuration for Spring Integration. Simply add the starter dependency and enable integration:

@SpringBootApplication
@EnableIntegration
public class IntegrationApplication {
    public static void main(String[] args) {
        SpringApplication.run(IntegrationApplication.class, args);
    }
}

9. Configuration

9.1 Java Configuration

Spring Integration supports Java-based configuration using annotations and Java configuration classes.

@Configuration
@EnableIntegration
public class IntegrationConfig {
    
    @Bean
    public DirectChannel inputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public DirectChannel outputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    @ServiceActivator(inputChannel = "inputChannel", outputChannel = "outputChannel")
    public MessageHandler serviceActivator() {
        return message -> {
            String payload = (String) message.getPayload();
            System.out.println("Processing: " + payload);
        };
    }
}

9.2 XML Configuration

Spring Integration also supports XML-based configuration for legacy applications.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/integration
           http://www.springframework.org/schema/integration/spring-integration.xsd">
    
    <int:channel id="inputChannel"/>
    <int:channel id="outputChannel"/>
    
    <int:service-activator 
        input-channel="inputChannel" 
        output-channel="outputChannel"
        ref="orderService" 
        method="processOrder"/>
    
    <bean id="orderService" class="com.example.OrderService"/>
</beans>

9.3 DSL Configuration

Spring Integration DSL provides a fluent API for configuring integration flows.

@Configuration
@EnableIntegration
public class IntegrationDSLConfig {
    
    @Bean
    public IntegrationFlow orderProcessingFlow() {
        return IntegrationFlows
            .from("orderInputChannel")
            .transform(Order.class, order -> {
                order.setStatus("PROCESSING");
                return order;
            })
            .filter(Order.class, order -> order.getAmount() > 0)
            .route(Order.class, 
                order -> order.getType(),
                mapping -> mapping
                    .subFlowMapping("PREMIUM", sf -> sf
                        .channel("premiumOrderChannel"))
                    .subFlowMapping("STANDARD", sf -> sf
                        .channel("standardOrderChannel")))
            .get();
    }
}

10. Real-World Examples

10.1 Order Processing Pipeline

A complete order processing pipeline that receives orders via HTTP, validates them, routes based on type, processes, and sends notifications.

@Configuration
@EnableIntegration
public class OrderProcessingFlow {
    
    // HTTP Inbound Gateway
    @Bean
    public IntegrationFlow httpInboundFlow() {
        return IntegrationFlows
            .from(Http.inboundGateway("/api/orders")
                .requestMapping(m -> m.methods(HttpMethod.POST))
                .requestPayloadType(Order.class))
            .channel("orderInputChannel")
            .get();
    }
    
    // Order Validation
    @Bean
    public IntegrationFlow orderValidationFlow() {
        return IntegrationFlows
            .from("orderInputChannel")
            .filter(Order.class, order -> 
                order.getItems() != null && !order.getItems().isEmpty(),
                filter -> filter.discardChannel("invalidOrderChannel"))
            .channel("validOrderChannel")
            .get();
    }
    
    // Order Routing
    @Bean
    public IntegrationFlow orderRoutingFlow() {
        return IntegrationFlows
            .from("validOrderChannel")
            .route(Order.class, 
                order -> order.getType(),
                mapping -> mapping
                    .channelMapping("PREMIUM", "premiumOrderChannel")
                    .channelMapping("STANDARD", "standardOrderChannel")
                    .defaultOutputChannel("defaultOrderChannel"))
            .get();
    }
    
    // Premium Order Processing
    @Bean
    public IntegrationFlow premiumOrderFlow() {
        return IntegrationFlows
            .from("premiumOrderChannel")
            .transform(Order.class, order -> {
                order.setPriority(1);
                return order;
            })
            .handle(OrderService.class, "processPremiumOrder")
            .channel("notificationChannel")
            .get();
    }
    
    // Notification
    @Bean
    public IntegrationFlow notificationFlow() {
        return IntegrationFlows
            .from("notificationChannel")
            .handle(NotificationService.class, "sendNotification")
            .get();
    }
}

10.2 File Processing with Error Handling

A file processing flow that reads files, processes them, and handles errors gracefully.

@Configuration
@EnableIntegration
public class FileProcessingFlow {
    
    @Bean
    public IntegrationFlow fileProcessingFlow() {
        return IntegrationFlows
            .from(Files.inboundAdapter(new File("/input"))
                .patternFilter("*.csv")
                .preventDuplicates(true),
                e -> e.poller(Pollers.fixedDelay(1000)))
            .transform(Files.toStringTransformer())
            .transform(this::parseCsv)
            .split()
            .handle(OrderService.class, "processOrder")
            .aggregate()
            .handle(Files.outboundAdapter(new File("/output"))
                .fileExistsMode(FileExistsMode.APPEND))
            .get();
    }
    
    private List<Order> parseCsv(String csvContent) {
        // Parse CSV and return list of orders
        return parseCsvToOrders(csvContent);
    }
    
    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows
            .from("errorChannel")
            .log(LoggingHandler.Level.ERROR)
            .handle(Files.outboundAdapter(new File("/errors"))
                .fileNameGenerator(message -> 
                    "error-" + System.currentTimeMillis() + ".txt"))
            .get();
    }
}

11. Best Practices

  • Use appropriate channels: Choose the right channel type for your use case (synchronous vs asynchronous).
  • Keep endpoints focused: Each endpoint should have a single responsibility.
  • Handle errors gracefully: Use error channels and retry mechanisms.
  • Use transformers for format conversion: Don't mix business logic with format conversion.
  • Leverage Spring Boot auto-configuration: Use Spring Boot starters when possible.
  • Test integration flows: Use Spring Integration testing framework.
  • Monitor message flows: Integrate with Spring Boot Actuator for monitoring.
  • Use DSL for complex flows: DSL provides better readability for complex integration scenarios.

12. Testing

Spring Integration provides comprehensive testing support through the spring-integration-test module.

@SpringBootTest
@SpringIntegrationTest
public class OrderProcessingFlowTest {
    
    @Autowired
    private OrderGateway orderGateway;
    
    @Autowired
    @Qualifier("orderInputChannel")
    private PollableChannel orderInputChannel;
    
    @Autowired
    @Qualifier("processedOrderChannel")
    private PollableChannel processedOrderChannel;
    
    @Test
    public void testOrderProcessing() {
        Order order = new Order();
        order.setId("123");
        order.setType("STANDARD");
        
        orderGateway.processOrder(order);
        
        Message<?> result = processedOrderChannel.receive(5000);
        assertNotNull(result);
        Order processed = (Order) result.getPayload();
        assertEquals("PROCESSED", processed.getStatus());
    }
    
    @Test
    public void testOrderRouting() {
        Order premiumOrder = new Order();
        premiumOrder.setType("PREMIUM");
        
        orderInputChannel.send(MessageBuilder.withPayload(premiumOrder).build());
        
        // Verify message was routed to premium channel
        // ...
    }
}

13. Advanced Concepts

13.1 Transaction Support

Spring Integration supports transactions for message processing.

@ServiceActivator(inputChannel = "orderChannel", outputChannel = "processedChannel")
@Transactional
public Order processOrder(Order order) {
    // This method runs in a transaction
    orderService.save(order);
    return order;
}

13.2 Message Store

Message stores provide persistence for messages, useful for aggregators and other stateful components.

@Bean
public MessageStore messageStore(DataSource dataSource) {
    return new JdbcMessageStore(dataSource);
}

@Bean
public AggregatingMessageHandler aggregator() {
    AggregatingMessageHandler handler = 
        new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor());
    handler.setMessageStore(messageStore);
    handler.setOutputChannel(outputChannel());
    return handler;
}

13.3 Retry and Error Handling

Spring Integration provides retry and error handling mechanisms.

@Bean
public IntegrationFlow orderProcessingWithRetry() {
    return IntegrationFlows
        .from("orderChannel")
        .handle(OrderService.class, "processOrder",
            e -> e.advice(retryAdvice()))
        .channel("processedChannel")
        .get();
}

@Bean
public RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice retry = new RequestHandlerRetryAdvice();
    retry.setRetryTemplate(retryTemplate());
    return retry;
}

@Bean
public RetryTemplate retryTemplate() {
    RetryTemplate template = new RetryTemplate();
    SimpleRetryPolicy policy = new SimpleRetryPolicy();
    policy.setMaxAttempts(3);
    template.setRetryPolicy(policy);
    
    FixedBackOffPolicy backOff = new FixedBackOffPolicy();
    backOff.setBackOffPeriod(1000);
    template.setBackOffPolicy(backOff);
    
    return template;
}

14. Conclusion

Spring Integration provides a powerful framework for building enterprise integration solutions. It implements proven Enterprise Integration Patterns and integrates seamlessly with the Spring ecosystem.

Key takeaways:

  • Spring Integration enables loose coupling through messaging
  • It supports multiple protocols and transports
  • Configuration can be done via Java, XML, or DSL
  • Comprehensive testing and monitoring support
  • Follows Spring's familiar patterns and principles

Whether you're integrating with REST APIs, message queues, file systems, or databases, Spring Integration provides the tools and patterns you need to build robust, maintainable integration solutions.

Post a Comment

0 Comments