Complete guide to Spring Integration: the Spring framework for Enterprise Application Integration (EAI). Learn messaging patterns, channels, endpoints, and adapters to integrate disparate systems seamlessly.
Table of Contents
1. What is Spring Integration?
Spring Integration is an extension of the Spring Framework that provides a lightweight messaging framework for Enterprise Application Integration (EAI). It enables you to build messaging-based applications using a simple, declarative model based on the Enterprise Integration Patterns (EIP) described in the book "Enterprise Integration Patterns" by Gregor Hohpe and Bobby Woolf.
Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns. It provides a framework for integrating disparate systems through messaging, allowing you to connect different applications, services, and data sources without tight coupling.
The framework provides a simple, consistent model for building integration solutions while maintaining Spring's core principles: dependency injection, aspect-oriented programming, and the ability to test components in isolation.
2. Why Use Spring Integration?
- Enterprise Integration Patterns: Implements proven EIP patterns like channels, routers, transformers, and filters.
- Loose Coupling: Decouples systems through messaging, reducing dependencies between components.
- Spring Ecosystem: Seamlessly integrates with Spring Framework, Spring Boot, and other Spring projects.
- Multiple Protocols: Supports various protocols including HTTP, JMS, FTP, SFTP, TCP, UDP, WebSocket, and more.
- Declarative Configuration: Configure integration flows using XML, Java configuration, or Spring Boot auto-configuration.
- Testing Support: Built-in testing framework for unit and integration testing of message flows.
- Error Handling: Comprehensive error handling and retry mechanisms for robust integration solutions.
- Monitoring: Integration with Spring Boot Actuator for monitoring and metrics.
3. Spring Integration Architecture
Spring Integration follows a messaging-based architecture where components communicate through messages sent over channels:
3.1 Architecture Layers
- Application Layer: Your business logic and services.
- Integration Layer: Spring Integration components (channels, endpoints, adapters).
- Transport Layer: External systems and protocols (HTTP, JMS, FTP, etc.).
This architecture allows you to:
- Decouple business logic from integration concerns
- Compose complex integration flows from simple components
- Test integration flows independently
- Switch between different transports without changing business logic
4. Core Concepts
4.1 Messages
A Message is the fundamental unit of data in Spring Integration. It consists of:
- Payload: The actual data being transferred (any Java object)
- Headers: Metadata about the message (ID, timestamp, correlation ID, etc.)
// Creating a message programmatically
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
Message<String> message = MessageBuilder
.withPayload("Hello, Spring Integration!")
.setHeader("messageId", "12345")
.setHeader("timestamp", System.currentTimeMillis())
.setHeader("priority", 5)
.build();
// Accessing message components
String payload = message.getPayload();
Object messageId = message.getHeaders().get("messageId");
Object timestamp = message.getHeaders().get("timestamp");
4.1.1 Message Headers
Spring Integration provides several standard headers:
ID: Unique identifier for the messageTIMESTAMP: When the message was createdCORRELATION_ID: Used for correlating related messagesREPLY_CHANNEL: Channel for sending repliesERROR_CHANNEL: Channel for error messagesSEQUENCE_NUMBER: Position in a sequenceSEQUENCE_SIZE: Total size of the sequence
4.2 Channels
A Channel is the pipe that connects message producers and consumers. It decouples components and provides asynchronous messaging capabilities.
4.2.1 Channel Types
- DirectChannel: Synchronous, point-to-point channel
- QueueChannel: Asynchronous, buffered channel
- PublishSubscribeChannel: Broadcasts messages to all subscribers
- ExecutorChannel: Asynchronous channel using thread pool
- PriorityChannel: Orders messages by priority
- RendezvousChannel: Synchronous handoff between sender and receiver
// Java Configuration
@Configuration
@EnableIntegration
public class ChannelConfig {
@Bean
public DirectChannel inputChannel() {
return new DirectChannel();
}
@Bean
public QueueChannel queueChannel() {
return new QueueChannel(10); // Buffer size: 10
}
@Bean
public PublishSubscribeChannel pubSubChannel() {
return new PublishSubscribeChannel();
}
@Bean
public ExecutorChannel executorChannel() {
return new ExecutorChannel(Executors.newFixedThreadPool(5));
}
}
4.3 Endpoints
Endpoints are components that connect channels to application logic or external systems. They consume messages from input channels and produce messages to output channels.
4.3.1 Endpoint Types
- Service Activator: Invokes a method on a Spring bean
- Transformer: Transforms message payload or headers
- Router: Routes messages to different channels based on conditions
- Filter: Filters messages based on conditions
- Splitter: Splits one message into multiple messages
- Aggregator: Combines multiple messages into one
- Gateway: Provides a simple interface for sending/receiving messages
4.4 Message Flow
Messages flow through a Spring Integration application following this pattern:
- Message is created (by an adapter, gateway, or programmatically)
- Message is sent to an input channel
- Endpoint consumes the message from the channel
- Endpoint processes the message (transforms, routes, filters, etc.)
- Processed message is sent to an output channel
- Next endpoint consumes and processes, or message reaches final destination
5. Channels in Detail
5.1 DirectChannel
DirectChannel is a point-to-point channel that delivers messages synchronously to a single subscriber. If multiple subscribers exist, it uses a round-robin load balancing strategy.
@Configuration
@EnableIntegration
public class DirectChannelConfig {
@Bean
public DirectChannel directChannel() {
DirectChannel channel = new DirectChannel();
// Optional: Set load balancer
channel.setLoadBalancer(new RoundRobinLoadBalancer());
return channel;
}
@ServiceActivator(inputChannel = "directChannel")
public String processMessage(String payload) {
return "Processed: " + payload;
}
}
5.2 QueueChannel
QueueChannel provides asynchronous, buffered messaging. Messages are stored in a queue until a consumer is available.
@Bean
public QueueChannel queueChannel() {
// Unbounded queue
return new QueueChannel();
// Or bounded queue with capacity
// return new QueueChannel(100);
}
@ServiceActivator(inputChannel = "queueChannel")
public void processAsync(Message<String> message) {
// Process message asynchronously
System.out.println("Processing: " + message.getPayload());
}
5.3 PublishSubscribeChannel
PublishSubscribeChannel broadcasts messages to all subscribers. Useful for event-driven architectures.
@Bean
public PublishSubscribeChannel pubSubChannel() {
return new PublishSubscribeChannel();
}
@ServiceActivator(inputChannel = "pubSubChannel")
public void subscriber1(String message) {
System.out.println("Subscriber 1: " + message);
}
@ServiceActivator(inputChannel = "pubSubChannel")
public void subscriber2(String message) {
System.out.println("Subscriber 2: " + message);
}
5.4 ExecutorChannel
ExecutorChannel provides asynchronous processing using a thread pool executor.
@Bean
public ExecutorChannel executorChannel() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("si-executor-");
executor.initialize();
return new ExecutorChannel(executor);
}
6. Endpoints in Detail
6.1 Service Activator
Service Activator invokes a method on a Spring bean to process messages.
@Component
public class OrderService {
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "processedOrderChannel")
public Order processOrder(Order order) {
// Process order
order.setStatus("PROCESSED");
return order;
}
// Access full message
@ServiceActivator(inputChannel = "orderChannel")
public void processOrderWithHeaders(Message<Order> message) {
Order order = message.getPayload();
String orderId = (String) message.getHeaders().get("orderId");
// Process order
}
}
6.2 Transformer
Transformer converts message payload or headers from one format to another.
@Component
public class OrderTransformer {
@Transformer(inputChannel = "stringChannel", outputChannel = "orderChannel")
public Order transformStringToOrder(String jsonString) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(jsonString, Order.class);
} catch (Exception e) {
throw new MessageTransformationException("Failed to transform", e);
}
}
@Transformer(inputChannel = "orderChannel", outputChannel = "xmlChannel")
public String transformOrderToXml(Order order) {
// Convert Order to XML
return convertToXml(order);
}
}
6.3 Router
Router routes messages to different channels based on message content or headers.
@Component
public class OrderRouter {
@Router(inputChannel = "orderChannel")
public String routeOrder(Order order) {
switch (order.getType()) {
case "PREMIUM":
return "premiumOrderChannel";
case "STANDARD":
return "standardOrderChannel";
default:
return "defaultOrderChannel";
}
}
// Using header-based routing
@Router(inputChannel = "orderChannel")
public String routeByHeader(@Header("orderType") String orderType) {
return orderType + "OrderChannel";
}
// Multiple channels
@Router(inputChannel = "orderChannel")
public List<String> routeToMultipleChannels(Order order) {
List<String> channels = new ArrayList<>();
channels.add("orderProcessingChannel");
if (order.isUrgent()) {
channels.add("urgentOrderChannel");
}
return channels;
}
}
6.4 Filter
Filter allows or blocks messages based on conditions.
@Component
public class OrderFilter {
@Filter(inputChannel = "orderChannel", outputChannel = "validOrderChannel")
public boolean filterOrders(Order order) {
// Only allow orders with valid status
return order.getStatus() != null &&
!order.getStatus().equals("CANCELLED");
}
@Filter(inputChannel = "orderChannel", outputChannel = "validOrderChannel",
discardChannel = "invalidOrderChannel")
public boolean filterWithDiscard(Order order) {
return order.getAmount() > 0;
}
}
6.5 Splitter
Splitter breaks one message into multiple messages.
@Component
public class OrderSplitter {
@Splitter(inputChannel = "orderChannel", outputChannel = "orderItemChannel")
public List<OrderItem> splitOrder(Order order) {
return order.getItems();
}
// Using Iterator
@Splitter(inputChannel = "orderChannel", outputChannel = "orderItemChannel")
public Iterator<OrderItem> splitOrderIterator(Order order) {
return order.getItems().iterator();
}
}
6.6 Aggregator
Aggregator combines multiple messages into a single message.
@Component
public class OrderAggregator {
@Aggregator(inputChannel = "orderItemChannel", outputChannel = "completeOrderChannel")
public Order aggregateOrders(List<Message<OrderItem>> messages) {
Order order = new Order();
List<OrderItem> items = new ArrayList<>();
for (Message<OrderItem> message : messages) {
items.add(message.getPayload());
}
order.setItems(items);
return order;
}
// Correlation strategy
@CorrelationStrategy
public String correlateBy(OrderItem item) {
return item.getOrderId();
}
// Release strategy
@ReleaseStrategy
public boolean release(List<Message<OrderItem>> messages) {
// Release when we have 10 items or after 5 seconds
return messages.size() >= 10;
}
}
6.7 Gateway
Gateway provides a simple interface for sending and receiving messages without dealing with channels directly.
@MessagingGateway
public interface OrderGateway {
@Gateway(requestChannel = "orderChannel", replyChannel = "orderResponseChannel")
Order processOrder(Order order);
@Gateway(requestChannel = "orderChannel")
void sendOrder(Order order); // Fire and forget
@Gateway(requestChannel = "orderChannel",
requestTimeout = 5000,
replyTimeout = 10000)
Order processOrderWithTimeout(Order order);
}
// Usage
@Service
public class OrderService {
@Autowired
private OrderGateway orderGateway;
public void processOrder(Order order) {
Order processed = orderGateway.processOrder(order);
// Use processed order
}
}
7. Adapters
Adapters connect Spring Integration to external systems and protocols. They handle the complexity of different transport mechanisms.
7.1 HTTP Adapter
HTTP adapters enable integration with REST APIs and web services.
// Inbound HTTP (receiving HTTP requests)
@Configuration
@EnableIntegration
public class HttpInboundConfig {
@Bean
public HttpRequestHandlingMessagingGateway httpInboundGateway() {
HttpRequestHandlingMessagingGateway gateway =
new HttpRequestHandlingMessagingGateway(true);
gateway.setRequestMapping(createRequestMapping());
gateway.setRequestChannel(httpRequestChannel());
gateway.setReplyChannel(httpResponseChannel());
return gateway;
}
@Bean
public RequestMapping createRequestMapping() {
RequestMapping mapping = new RequestMapping();
mapping.setPathPatterns("/api/orders");
mapping.setMethods(RequestMethod.POST);
return mapping;
}
@Bean
public DirectChannel httpRequestChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel httpResponseChannel() {
return new DirectChannel();
}
}
// Outbound HTTP (sending HTTP requests)
@Bean
public HttpRequestExecutingMessageHandler httpOutboundGateway() {
HttpRequestExecutingMessageHandler handler =
new HttpRequestExecutingMessageHandler("http://external-api.com/orders");
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
@ServiceActivator(inputChannel = "httpOutboundChannel")
public HttpRequestExecutingMessageHandler httpOutboundGateway() {
return new HttpRequestExecutingMessageHandler("http://external-api.com/orders");
}
7.2 JMS Adapter
JMS adapters integrate with message queues like ActiveMQ, RabbitMQ, or IBM MQ.
// JMS Inbound (receiving from queue)
@Bean
public JmsMessageDrivenEndpoint jmsInboundAdapter(
ConnectionFactory connectionFactory) {
JmsMessageDrivenEndpoint adapter =
new JmsMessageDrivenEndpoint(
new JmsDestinationPollingSource(
new DefaultMessageListenerContainer()));
adapter.setOutputChannel(jmsInputChannel());
return adapter;
}
// JMS Outbound (sending to queue)
@Bean
@ServiceActivator(inputChannel = "jmsOutboundChannel")
public JmsSendingMessageHandler jmsOutboundAdapter(
ConnectionFactory connectionFactory) {
JmsSendingMessageHandler handler =
new JmsSendingMessageHandler(
new JmsTemplate(connectionFactory));
handler.setDestinationName("orders.queue");
return handler;
}
7.3 File Adapter
File adapters read from and write to the file system.
// File Inbound (reading files)
@Bean
@InboundChannelAdapter(value = "fileInputChannel",
poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File("/input"));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
// File Outbound (writing files)
@Bean
@ServiceActivator(inputChannel = "fileOutputChannel")
public FileWritingMessageHandler fileWritingMessageHandler() {
FileWritingMessageHandler handler =
new FileWritingMessageHandler(new File("/output"));
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setExpectReply(false);
return handler;
}
7.4 FTP/SFTP Adapter
FTP and SFTP adapters integrate with remote file systems.
// FTP Inbound
@Bean
@InboundChannelAdapter(value = "ftpInputChannel",
poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpInboundAdapter() {
FtpInboundFileSynchronizer synchronizer =
new FtpInboundFileSynchronizer(createFtpSessionFactory());
synchronizer.setRemoteDirectory("/remote/orders");
synchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(synchronizer);
source.setLocalDirectory(new File("/local/orders"));
source.setAutoCreateLocalDirectory(true);
return source;
}
@Bean
public DefaultFtpSessionFactory createFtpSessionFactory() {
DefaultFtpSessionFactory factory = new DefaultFtpSessionFactory();
factory.setHost("ftp.example.com");
factory.setPort(21);
factory.setUsername("user");
factory.setPassword("password");
return factory;
}
// FTP Outbound
@Bean
@ServiceActivator(inputChannel = "ftpOutputChannel")
public FtpMessageHandler ftpOutboundAdapter() {
FtpMessageHandler handler =
new FtpMessageHandler(createFtpSessionFactory());
handler.setRemoteDirectoryExpression(
new LiteralExpression("/remote/orders"));
return handler;
}
7.5 Database Adapter
Database adapters integrate with databases for polling and writing.
// Database Inbound (polling)
@Bean
@InboundChannelAdapter(value = "dbInputChannel",
poller = @Poller(fixedDelay = "5000"))
public MessageSource<Object> jdbcPollingChannelAdapter(DataSource dataSource) {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM orders WHERE status = 'PENDING'");
adapter.setUpdateSql("UPDATE orders SET status = 'PROCESSING' WHERE id = :id");
return adapter;
}
// Database Outbound (writing)
@Bean
@ServiceActivator(inputChannel = "dbOutputChannel")
public JdbcMessageHandler jdbcOutboundAdapter(DataSource dataSource) {
JdbcMessageHandler handler =
new JdbcMessageHandler(dataSource,
"INSERT INTO processed_orders (id, status) VALUES (:payload.id, :payload.status)");
return handler;
}
8. Project Setup
8.1 Maven Dependencies
<dependencies>
<!-- Spring Integration Core -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>6.2.0</version>
</dependency>
<!-- Spring Integration HTTP -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
<version>6.2.0</version>
</dependency>
<!-- Spring Integration JMS -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>6.2.0</version>
</dependency>
<!-- Spring Integration File -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>6.2.0</version>
</dependency>
<!-- Spring Integration FTP -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>6.2.0</version>
</dependency>
<!-- Spring Integration JDBC -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>6.2.0</version>
</dependency>
<!-- Spring Boot Starter Integration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
</dependencies>
8.2 Spring Boot Auto-Configuration
Spring Boot provides auto-configuration for Spring Integration. Simply add the starter dependency and enable integration:
@SpringBootApplication
@EnableIntegration
public class IntegrationApplication {
public static void main(String[] args) {
SpringApplication.run(IntegrationApplication.class, args);
}
}
9. Configuration
9.1 Java Configuration
Spring Integration supports Java-based configuration using annotations and Java configuration classes.
@Configuration
@EnableIntegration
public class IntegrationConfig {
@Bean
public DirectChannel inputChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel outputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "inputChannel", outputChannel = "outputChannel")
public MessageHandler serviceActivator() {
return message -> {
String payload = (String) message.getPayload();
System.out.println("Processing: " + payload);
};
}
}
9.2 XML Configuration
Spring Integration also supports XML-based configuration for legacy applications.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd">
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:service-activator
input-channel="inputChannel"
output-channel="outputChannel"
ref="orderService"
method="processOrder"/>
<bean id="orderService" class="com.example.OrderService"/>
</beans>
9.3 DSL Configuration
Spring Integration DSL provides a fluent API for configuring integration flows.
@Configuration
@EnableIntegration
public class IntegrationDSLConfig {
@Bean
public IntegrationFlow orderProcessingFlow() {
return IntegrationFlows
.from("orderInputChannel")
.transform(Order.class, order -> {
order.setStatus("PROCESSING");
return order;
})
.filter(Order.class, order -> order.getAmount() > 0)
.route(Order.class,
order -> order.getType(),
mapping -> mapping
.subFlowMapping("PREMIUM", sf -> sf
.channel("premiumOrderChannel"))
.subFlowMapping("STANDARD", sf -> sf
.channel("standardOrderChannel")))
.get();
}
}
10. Real-World Examples
10.1 Order Processing Pipeline
A complete order processing pipeline that receives orders via HTTP, validates them, routes based on type, processes, and sends notifications.
@Configuration
@EnableIntegration
public class OrderProcessingFlow {
// HTTP Inbound Gateway
@Bean
public IntegrationFlow httpInboundFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/api/orders")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(Order.class))
.channel("orderInputChannel")
.get();
}
// Order Validation
@Bean
public IntegrationFlow orderValidationFlow() {
return IntegrationFlows
.from("orderInputChannel")
.filter(Order.class, order ->
order.getItems() != null && !order.getItems().isEmpty(),
filter -> filter.discardChannel("invalidOrderChannel"))
.channel("validOrderChannel")
.get();
}
// Order Routing
@Bean
public IntegrationFlow orderRoutingFlow() {
return IntegrationFlows
.from("validOrderChannel")
.route(Order.class,
order -> order.getType(),
mapping -> mapping
.channelMapping("PREMIUM", "premiumOrderChannel")
.channelMapping("STANDARD", "standardOrderChannel")
.defaultOutputChannel("defaultOrderChannel"))
.get();
}
// Premium Order Processing
@Bean
public IntegrationFlow premiumOrderFlow() {
return IntegrationFlows
.from("premiumOrderChannel")
.transform(Order.class, order -> {
order.setPriority(1);
return order;
})
.handle(OrderService.class, "processPremiumOrder")
.channel("notificationChannel")
.get();
}
// Notification
@Bean
public IntegrationFlow notificationFlow() {
return IntegrationFlows
.from("notificationChannel")
.handle(NotificationService.class, "sendNotification")
.get();
}
}
10.2 File Processing with Error Handling
A file processing flow that reads files, processes them, and handles errors gracefully.
@Configuration
@EnableIntegration
public class FileProcessingFlow {
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File("/input"))
.patternFilter("*.csv")
.preventDuplicates(true),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.transform(this::parseCsv)
.split()
.handle(OrderService.class, "processOrder")
.aggregate()
.handle(Files.outboundAdapter(new File("/output"))
.fileExistsMode(FileExistsMode.APPEND))
.get();
}
private List<Order> parseCsv(String csvContent) {
// Parse CSV and return list of orders
return parseCsvToOrders(csvContent);
}
@Bean
public IntegrationFlow errorHandlingFlow() {
return IntegrationFlows
.from("errorChannel")
.log(LoggingHandler.Level.ERROR)
.handle(Files.outboundAdapter(new File("/errors"))
.fileNameGenerator(message ->
"error-" + System.currentTimeMillis() + ".txt"))
.get();
}
}
11. Best Practices
- Use appropriate channels: Choose the right channel type for your use case (synchronous vs asynchronous).
- Keep endpoints focused: Each endpoint should have a single responsibility.
- Handle errors gracefully: Use error channels and retry mechanisms.
- Use transformers for format conversion: Don't mix business logic with format conversion.
- Leverage Spring Boot auto-configuration: Use Spring Boot starters when possible.
- Test integration flows: Use Spring Integration testing framework.
- Monitor message flows: Integrate with Spring Boot Actuator for monitoring.
- Use DSL for complex flows: DSL provides better readability for complex integration scenarios.
12. Testing
Spring Integration provides comprehensive testing support through the spring-integration-test module.
@SpringBootTest
@SpringIntegrationTest
public class OrderProcessingFlowTest {
@Autowired
private OrderGateway orderGateway;
@Autowired
@Qualifier("orderInputChannel")
private PollableChannel orderInputChannel;
@Autowired
@Qualifier("processedOrderChannel")
private PollableChannel processedOrderChannel;
@Test
public void testOrderProcessing() {
Order order = new Order();
order.setId("123");
order.setType("STANDARD");
orderGateway.processOrder(order);
Message<?> result = processedOrderChannel.receive(5000);
assertNotNull(result);
Order processed = (Order) result.getPayload();
assertEquals("PROCESSED", processed.getStatus());
}
@Test
public void testOrderRouting() {
Order premiumOrder = new Order();
premiumOrder.setType("PREMIUM");
orderInputChannel.send(MessageBuilder.withPayload(premiumOrder).build());
// Verify message was routed to premium channel
// ...
}
}
13. Advanced Concepts
13.1 Transaction Support
Spring Integration supports transactions for message processing.
@ServiceActivator(inputChannel = "orderChannel", outputChannel = "processedChannel")
@Transactional
public Order processOrder(Order order) {
// This method runs in a transaction
orderService.save(order);
return order;
}
13.2 Message Store
Message stores provide persistence for messages, useful for aggregators and other stateful components.
@Bean
public MessageStore messageStore(DataSource dataSource) {
return new JdbcMessageStore(dataSource);
}
@Bean
public AggregatingMessageHandler aggregator() {
AggregatingMessageHandler handler =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor());
handler.setMessageStore(messageStore);
handler.setOutputChannel(outputChannel());
return handler;
}
13.3 Retry and Error Handling
Spring Integration provides retry and error handling mechanisms.
@Bean
public IntegrationFlow orderProcessingWithRetry() {
return IntegrationFlows
.from("orderChannel")
.handle(OrderService.class, "processOrder",
e -> e.advice(retryAdvice()))
.channel("processedChannel")
.get();
}
@Bean
public RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice retry = new RequestHandlerRetryAdvice();
retry.setRetryTemplate(retryTemplate());
return retry;
}
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts(3);
template.setRetryPolicy(policy);
FixedBackOffPolicy backOff = new FixedBackOffPolicy();
backOff.setBackOffPeriod(1000);
template.setBackOffPolicy(backOff);
return template;
}
14. Conclusion
Spring Integration provides a powerful framework for building enterprise integration solutions. It implements proven Enterprise Integration Patterns and integrates seamlessly with the Spring ecosystem.
Key takeaways:
- Spring Integration enables loose coupling through messaging
- It supports multiple protocols and transports
- Configuration can be done via Java, XML, or DSL
- Comprehensive testing and monitoring support
- Follows Spring's familiar patterns and principles
Whether you're integrating with REST APIs, message queues, file systems, or databases, Spring Integration provides the tools and patterns you need to build robust, maintainable integration solutions.
0 Comments