Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 유닉스
- android
- 자바암호
- Database
- 파이썬
- Java
- 고전역학
- oracle
- 코틀린
- 인프라
- NIO
- JVM
- kotlin
- 소프트웨어공학
- chatGPT's answer
- write by chatGPT
- 자바
- write by GPT-4
- 자바네트워크
- 데이터베이스
- 역학
- jpa
- 리눅스
- python
- GPT-4's answer
- 시스템
- 웹 크롤링
- spring integration
- spring data jpa
- flet
Archives
- Today
- Total
Akashic Records
Service Activator와 Message Handler - Custom Message Handler 작성 본문
Spring Integration for Beginners
Service Activator와 Message Handler - Custom Message Handler 작성
Andrew's Akashic Records 2025. 1. 3. 13:54728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
2.2 Service Activator와 Message Handler
Custom Message Handler 작성
2.1 Message Handler란?
Spring Integration에서 Message Handler는 메시지를 소비하고, 비즈니스 로직을 수행하는 역할을 합니다. Spring Integration이 제공하는 기본 Service Activator 외에도, Custom Message Handler를 구현하여 특별한 요구사항에 맞는 메시지 처리를 정의할 수 있습니다.
2.2 Custom Message Handler 작성의 필요성
- 특수한 비즈니스 로직: 기본 제공되는 Service Activator로 해결할 수 없는 맞춤형 로직 필요.
- 고급 메시지 처리: 메시지의 Payload와 Headers를 조합하거나 외부 서비스와 복잡한 연동을 구현.
- 재사용 가능성: 공통적인 메시지 처리 로직을 여러 플로우에서 재사용 가능.
2.3 Custom Message Handler 구현 방법
Spring Integration의 Custom Message Handler는 org.springframework.messaging.MessageHandler 인터페이스를 구현하여 작성합니다. 이 인터페이스는 handleMessage 메서드를 제공하며, 메시지 처리 로직을 정의할 수 있습니다.
2.4 예제: 주문 상태 처리
시나리오
- 주문 메시지를 받아 주문 상태를 업데이트.
- 처리된 메시지를 다음 단계로 전달.
- 에러가 발생하면 로깅 및 별도 처리.
1. Custom Message Handler 구현
package kr.co.thekeytech.spring.eai.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;
@Component
public class CustomMessageHandler implements MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomMessageHandler.class);
@Override
public void handleMessage(Message<?> message) {
try {
// 메시지 Payload 및 Headers 읽기
String payload = (String) message.getPayload();
String orderId = message.getHeaders().get("orderId", String.class);
// 비즈니스 로직 실행: 주문 상태 처리
logger.info("============> Processing order [{}]: {}", orderId, payload);
// 처리 완료 후 추가 작업 (예: DB 업데이트)
String updatedStatus = "Processed Order: " + payload;
logger.info("============> Order [{}] updated to: {}", orderId, updatedStatus);
} catch (Exception e) {
logger.error("Error processing order message: {}", message, e);
throw e; // 에러 발생 시 상위 플로우로 전달
}
}
}
2. Spring Integration 플로우 설정
package kr.co.thekeytech.spring.eai.handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
@Configuration
public class CustomMessageHandlerConfig {
private static final Logger logger = LoggerFactory.getLogger(CustomMessageHandlerConfig.class);
@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
return IntegrationFlow.from("handler.order.input.channel")
.handle((payload, headers) -> {
logger.info("========> Before Handle Headers: {}", headers);
logger.info("========> Before Handle Payload: {}", payload);
return payload;
})
.handle(customMessageHandler) // Custom Message Handler 호출
.get();
}
}
3. Gateway 설정
package kr.co.thekeytech.spring.eai.handler;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway
public interface CustomMessageGateway {
@Gateway(requestChannel = "handler.order.input.channel")// inputChannel을 명시적으로 지정
void sendMessage(String message, @Header("orderId") String orderId);
}
4. 테스트 케이스
package kr.co.thekeytech.spring.eai.handler;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class CustomMessageHandlerConfigTest {
@Autowired
private CustomMessageGateway cmGateway;
@Test
void testFlow() {
// 주문 메시지 생성 및 전송
cmGateway.sendMessage("Order details: ID12345", "ID12345");
// 출력은 없습니다.
}
}
5. 실행 결과
로깅 출력:
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandlerConfig - ========> Before Handle Headers: {replyChannel=nullChannel, errorChannel=, id=2de0af38-ea5b-d57b-2939-13c4b55da115, orderId=ID12345, timestamp=1735879357545}
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandlerConfig - ========> Before Handle Payload: Order details: ID12345
...
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandler - ============> Processing order [ID12345]: Order details: ID12345
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandler - ============> Order [ID12345] updated to: Processed Order: Order details: ID12345
728x90
Custom Message Handler 메시지의 단방향성
Spring Integration에서 handle 메서드로 MessageHandler를 사용하면 기본적으로 메시지 처리가 종료되며, 그 이후에 outputChannel을 설정하려고 하면 오류가 발생합니다.
.handle(customMessageHandler) // Custom Message Handler
.channel("outputChannel") // 이 부분에서 오류 발생
- customMessageHandler는 MessageHandler를 구현하며, 메시지를 처리하는 최종 엔드포인트로 간주됩니다.
- Spring Integration에서 MessageHandler는 메시지를 소비하고 종료되므로, 이후 channel을 설정할 수 없습니다.
해결 방법
1. 플로우를 분리
customMessageHandler 이후의 로직이 필요하다면, 이를 별도의 플로우로 분리하여 처리합니다.
@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
return IntegrationFlow.from("inputChannel")
.handle(customMessageHandler) // 메시지 처리 후 종료
.get();
}
@Bean
public IntegrationFlow outputProcessingFlow() {
return IntegrationFlow.from("outputChannel") // 별도 플로우에서 처리
.handle(message -> {
logger.info("Further processing: {}", message.getPayload());
})
.get();
}
2. Custom Message Handler를 Transformer로 변경
MessageHandler 대신 Transformer를 사용하면, 메시지를 처리한 후 반환값을 다음 플로우로 전달할 수 있습니다.
@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
return IntegrationFlow.from("inputChannel")
.transform(customMessageHandler) // MessageHandler 대신 Transformer로 사용
.channel("outputChannel") // 결과를 outputChannel로 전달
.get();
}
CustomMessageHandler 수정:
@Component
public class CustomMessageHandler implements GenericTransformer<Message<?>, Message<?>> {
@Override
public Message<?> transform(Message<?> message) {
// 메시지 처리 로직
String updatedPayload = "Processed: " + message.getPayload();
return MessageBuilder.withPayload(updatedPayload).copyHeaders(message.getHeaders()).build();
}
}
3. publishSubscribeChannel로 메시지 복제
메시지를 처리한 후 다른 채널로 복제하려면, publishSubscribeChannel을 사용하여 메시지를 여러 플로우로 분배할 수 있습니다.
@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
return IntegrationFlow.from("inputChannel")
.publishSubscribeChannel(subFlow -> subFlow
.subscribe(flow -> flow.handle(customMessageHandler))
.subscribe(flow -> flow.channel("outputChannel")))
.get();
}
728x90
'Spring Integration for Beginners' 카테고리의 다른 글
Splitter와 Aggregator - 예제: XML 데이터 처리 (0) | 2025.01.07 |
---|---|
Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해 (0) | 2025.01.06 |
Service Activator와 Message Handler - Service Activator를 사용한 비즈니스 로직 구현 (0) | 2025.01.02 |
Message Filter와 Router - 동적 라우팅 구현 (0) | 2024.12.27 |
Message Filter와 Router - 조건에 따른 메시지 필터링 (2) | 2024.12.26 |
Comments