Akashic Records

Service Activator와 Message Handler - Custom Message Handler 작성 본문

Spring Integration for Beginners

Service Activator와 Message Handler - Custom Message Handler 작성

Andrew's Akashic Records 2025. 1. 3. 13:54
728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
2.2 Service Activator와 Message Handler

Spring Integration for Backend Developers

 

Custom Message Handler 작성

2.1 Message Handler란?

Spring Integration에서 Message Handler메시지를 소비하고, 비즈니스 로직을 수행하는 역할을 합니다. Spring Integration이 제공하는 기본 Service Activator 외에도, Custom Message Handler를 구현하여 특별한 요구사항에 맞는 메시지 처리를 정의할 수 있습니다.

2.2 Custom Message Handler 작성의 필요성

  • 특수한 비즈니스 로직: 기본 제공되는 Service Activator로 해결할 수 없는 맞춤형 로직 필요.
  • 고급 메시지 처리: 메시지의 Payload와 Headers를 조합하거나 외부 서비스와 복잡한 연동을 구현.
  • 재사용 가능성: 공통적인 메시지 처리 로직을 여러 플로우에서 재사용 가능.

2.3 Custom Message Handler 구현 방법

Spring Integration의 Custom Message Handler는 org.springframework.messaging.MessageHandler 인터페이스를 구현하여 작성합니다. 이 인터페이스는 handleMessage 메서드를 제공하며, 메시지 처리 로직을 정의할 수 있습니다.

2.4 예제: 주문 상태 처리

시나리오

  1. 주문 메시지를 받아 주문 상태를 업데이트.
  2. 처리된 메시지를 다음 단계로 전달.
  3. 에러가 발생하면 로깅 및 별도 처리.

1. Custom Message Handler 구현

package kr.co.thekeytech.spring.eai.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.stereotype.Component;

@Component
public class CustomMessageHandler implements MessageHandler {

    private static final Logger logger = LoggerFactory.getLogger(CustomMessageHandler.class);

    @Override
    public void handleMessage(Message<?> message) {
        try {
            // 메시지 Payload 및 Headers 읽기
            String payload = (String) message.getPayload();
            String orderId = message.getHeaders().get("orderId", String.class);

            // 비즈니스 로직 실행: 주문 상태 처리
            logger.info("============> Processing order [{}]: {}", orderId, payload);

            // 처리 완료 후 추가 작업 (예: DB 업데이트)
            String updatedStatus = "Processed Order: " + payload;
            logger.info("============> Order [{}] updated to: {}", orderId, updatedStatus);

        } catch (Exception e) {
            logger.error("Error processing order message: {}", message, e);
            throw e; // 에러 발생 시 상위 플로우로 전달
        }
    }
}

2. Spring Integration 플로우 설정

package kr.co.thekeytech.spring.eai.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;

@Configuration
public class CustomMessageHandlerConfig {

    private static final Logger logger = LoggerFactory.getLogger(CustomMessageHandlerConfig.class);

    @Bean
    public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
        return IntegrationFlow.from("handler.order.input.channel")
                .handle((payload, headers) -> {
                    logger.info("========> Before Handle Headers: {}", headers);
                    logger.info("========> Before Handle Payload: {}", payload);
                    return payload;
                })
                .handle(customMessageHandler) // Custom Message Handler 호출
                .get();
    }
}

3. Gateway 설정

package kr.co.thekeytech.spring.eai.handler;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway
public interface CustomMessageGateway {
    @Gateway(requestChannel = "handler.order.input.channel")// inputChannel을 명시적으로 지정
    void sendMessage(String message, @Header("orderId") String orderId);
}

4. 테스트 케이스

package kr.co.thekeytech.spring.eai.handler;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class CustomMessageHandlerConfigTest {

    @Autowired
    private CustomMessageGateway cmGateway;


    @Test
    void testFlow() {

        // 주문 메시지 생성 및 전송
        cmGateway.sendMessage("Order details: ID12345", "ID12345");
        // 출력은 없습니다.
    }
}

 

5. 실행 결과

로깅 출력:

2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandlerConfig - ========> Before Handle Headers: {replyChannel=nullChannel, errorChannel=, id=2de0af38-ea5b-d57b-2939-13c4b55da115, orderId=ID12345, timestamp=1735879357545}
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandlerConfig - ========> Before Handle Payload: Order details: ID12345
...
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandler - ============> Processing order [ID12345]: Order details: ID12345
2025-01-03 13:42:37 - k.c.t.s.e.h.CustomMessageHandler - ============> Order [ID12345] updated to: Processed Order: Order details: ID12345
  1.  
728x90

Custom Message Handler 메시지의 단방향성

 

Spring Integration에서 handle 메서드로 MessageHandler를 사용하면 기본적으로 메시지 처리가 종료되며, 그 이후에 outputChannel을 설정하려고 하면 오류가 발생합니다.

.handle(customMessageHandler) // Custom Message Handler
.channel("outputChannel")     // 이 부분에서 오류 발생
  • customMessageHandler는 MessageHandler를 구현하며, 메시지를 처리하는 최종 엔드포인트로 간주됩니다.
  • Spring Integration에서 MessageHandler는 메시지를 소비하고 종료되므로, 이후 channel을 설정할 수 없습니다.

해결 방법

1. 플로우를 분리

customMessageHandler 이후의 로직이 필요하다면, 이를 별도의 플로우로 분리하여 처리합니다.

@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
    return IntegrationFlow.from("inputChannel")
            .handle(customMessageHandler) // 메시지 처리 후 종료
            .get();
}

@Bean
public IntegrationFlow outputProcessingFlow() {
    return IntegrationFlow.from("outputChannel") // 별도 플로우에서 처리
            .handle(message -> {
                logger.info("Further processing: {}", message.getPayload());
            })
            .get();
}

2. Custom Message Handler를 Transformer로 변경

MessageHandler 대신 Transformer를 사용하면, 메시지를 처리한 후 반환값을 다음 플로우로 전달할 수 있습니다.

@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
    return IntegrationFlow.from("inputChannel")
            .transform(customMessageHandler) // MessageHandler 대신 Transformer로 사용
            .channel("outputChannel")        // 결과를 outputChannel로 전달
            .get();
}

 

CustomMessageHandler 수정:

@Component
public class CustomMessageHandler implements GenericTransformer<Message<?>, Message<?>> {

    @Override
    public Message<?> transform(Message<?> message) {
        // 메시지 처리 로직
        String updatedPayload = "Processed: " + message.getPayload();
        return MessageBuilder.withPayload(updatedPayload).copyHeaders(message.getHeaders()).build();
    }
}

3. publishSubscribeChannel로 메시지 복제

메시지를 처리한 후 다른 채널로 복제하려면, publishSubscribeChannel을 사용하여 메시지를 여러 플로우로 분배할 수 있습니다.

@Bean
public IntegrationFlow customMessageFlow(CustomMessageHandler customMessageHandler) {
    return IntegrationFlow.from("inputChannel")
            .publishSubscribeChannel(subFlow -> subFlow
                .subscribe(flow -> flow.handle(customMessageHandler))
                .subscribe(flow -> flow.channel("outputChannel")))
            .get();
}

728x90
Comments