Message Filter와 Router - 조건에 따른 메시지 필터링 본문
Spring Integration for Beginners
Message Filter와 Router - 조건에 따른 메시지 필터링
2024. 12. 26. 14:53
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
조건에 따른 메시지 필터링
1.1 Message Filter란?
Message Filter는 메시지 처리 과정에서 특정 조건에 따라 메시지를 선택적으로 전달하거나 폐기하는 역할을 합니다.
- 필터링 조건: 메시지의 Payload, Header, 또는 기타 메타데이터를 기반으로 정의됩니다.
- 주요 특징:
- 조건에 부합하지 않는 메시지는 전달되지 않고 처리 과정에서 제거됩니다.
- 처리 로직과 조건 정의를 명확히 분리할 수 있어 유연한 설계가 가능합니다.
Spring Integration에서 filter DSL 메서드를 사용하여 쉽게 구현할 수 있습니다.
1.2 Message Filter의 기본 구성
Message Filter는 다음과 같은 요소로 구성됩니다:
- 입력 채널: 필터링 대상 메시지가 전달되는 채널.
- 조건(Expression): 메시지를 필터링하는 기준.
- 출력 채널: 조건을 만족하는 메시지가 전달되는 채널.
- 제거 메시지 처리(Optional): 조건에 맞지 않아 제거된 메시지를 별도로 처리.
1.3 Spring Integration에서의 Message Filter 사용법
1.3.1 간단한 메시지 필터링
public class MessageFilterConfig {
public IntegrationFlow filterExampleFlow() {
return IntegrationFlow.from("inputChannel")
// 조건에 따라 메시지 필터링
.filter(String.class, payload -> payload.contains("important"))
public MessageChannel inputChannel() {
return new DirectChannel();
public MessageChannel outputChannel() {
return new DirectChannel();
- 조건: 메시지 Payload에 "important" 문자열이 포함된 경우만 전달.
- 결과: 조건에 맞지 않는 메시지는 전달되지 않음.
1.3.2 필터 조건 확장
- 조건을 payload, headers, metadata를 조합하여 설정할 수 있습니다.
- 예: 메시지의 Header 값이 특정 값일 때만 전달.
.filter(Message.class, message ->
1.4 조건에 맞지 않는 메시지 처리
조건에 맞지 않는 메시지를 별도로 처리하려면 discardChannel을 사용할 수 있습니다.
- discardChannel: 필터 조건에 부합하지 않는 메시지를 전달할 채널.
public IntegrationFlow filterWithDiscardFlow() {
return IntegrationFlow.from("inputChannel")
.filter(String.class, payload -> payload.length() > 5,
spec -> spec.discardChannel("discardChannel"))
public IntegrationFlow discardHandlerFlow() {
return IntegrationFlow.from("discardChannel")
.handle(message -> System.out.println("Discarded: " + message.getPayload()))
- 메시지 Payload 길이가 5 이하인 경우 discardChannel로 전달됩니다.
- 조건에 맞지 않는 메시지는 discardChannel에서 별도로 처리됩니다.
1.5 메시지 필터의 실용 사례
1.5.1 주문 데이터 필터링
조건: 주문 금액이 100 이상인 데이터만 처리.
public class OrderFilterConfig {
public IntegrationFlow orderFilterFlow() {
return IntegrationFlow.from("orderInputChannel")
.filter(Order.class, order -> order.getAmount() >= 100)
public MessageChannel orderInputChannel() {
return new DirectChannel();
public MessageChannel validOrderChannel() {
return new DirectChannel();
// 주문 클래스
class Order {
private String orderId;
private double amount;
// Getters and Setters
- 주문 데이터 중 금액이 100 이상인 데이터만 validOrderChannel로 전달됩니다.
- 조건에 맞지 않는 메시지는 처리되지 않습니다.
1.5.2 고객 데이터 필터링
조건: 고객 등급이 VIP인 데이터만 처리.
public class CustomerFilterConfig {
public IntegrationFlow customerFilterFlow() {
return IntegrationFlow.from("customerInputChannel")
.filter(Customer.class, customer -> "VIP".equals(customer.getGrade()))
public MessageChannel customerInputChannel() {
return new DirectChannel();
public MessageChannel vipCustomerChannel() {
return new DirectChannel();
// 고객 클래스
class Customer {
private String name;
private String grade;
// Getters and Setters
1.6 장점 및 활용 방안
- 유연성: 조건 기반 메시지 처리를 간편하게 구현.
- 비즈니스 로직 분리: 필터링 조건과 실제 처리 로직을 분리하여 유지보수성 향상.
- 불필요한 메시지 제거: 메시지 큐의 부하를 줄이고, 중요한 데이터에 집중.
활용 방안
- 알림 시스템: 우선순위가 높은 알림만 처리.
- 데이터 필터링: 대량의 데이터 중 필요한 데이터만 추출.
- 실시간 로그 처리: 특정 패턴이 포함된 로그만 분석.
1.7 Filter Sample Code & Test Code
Filter 설정
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
public class FilterStudyConfig {
// 1. Filter Flow 설정
public IntegrationFlow filterFlow() {
return IntegrationFlow.from("inputFilterChannel")
// 조건: 메시지 내용이 "VALID" 포함
.filter(String.class, payload -> payload.startsWith("VALID"),
spec -> spec.discardChannel("discardFilterChannel")) // 부적합 메시지 처리 채널
// 조건: 메시지 헤더 'level'가 HIGH인 경우만 처리
.filter(Message.class, m -> "HIGH".equals(m.getHeaders().get("level")),
spec -> spec.discardChannel("discardFilterChannel"))
.channel("outputFilterChannel") // 적합한 메시지를 출력 채널로 전달
// 2. 메시지 채널 정의
public MessageChannel inputFilterChannel() {
return new DirectChannel();
public MessageChannel outputFilterChannel() {
return new DirectChannel();
public MessageChannel discardFilterChannel() {
return new DirectChannel();
Gateway 설정
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
public interface FilterStudyGateway {
@Gateway(requestChannel = "inputFilterChannel") // inputChannel을 명시적으로 지정
void sendMessage(String message, @Header("level") String level);
Test 코드
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import static org.assertj.core.api.Assertions.assertThat;
class FilterStudyConfigTest {
private MessageChannel inputFilterChannel;
private MessageChannel outputFilterChannel;
private MessageChannel discardFilterChannel;
private FilterStudyGateway filterStudyGateway;
void testFilterFlow() {
// 1. Output 및 Discard 채널 설정
QueueChannel outputQueue = new QueueChannel();
QueueChannel discardQueue = new QueueChannel();
((DirectChannel) outputFilterChannel).subscribe(outputQueue::send);
((DirectChannel) discardFilterChannel).subscribe(discardQueue::send);
// 2. 메시지 전송
filterStudyGateway.sendMessage("VALID MESSAGE", "HIGH");
filterStudyGateway.sendMessage("VALID MESSAGE", "LOW");
filterStudyGateway.sendMessage("INVALID MESSAGE", "HIGH");
// 3. 결과 검증
Message<?> outputMessage = outputQueue.receive(1000);
assertThat(outputMessage.getPayload()).isEqualTo("VALID MESSAGE");
Message<?> discardMessage1 = discardQueue.receive(1000);
assertThat(discardMessage1.getPayload()).isEqualTo("VALID MESSAGE");
Message<?> discardMessage2 = discardQueue.receive(1000);
assertThat(discardMessage2.getPayload()).isEqualTo("INVALID MESSAGE");
