Akashic Records

Message Filter와 Router - 조건에 따른 메시지 필터링 본문

Spring Integration for Beginners

Message Filter와 Router - 조건에 따른 메시지 필터링

Andrew's Akashic Records 2024. 12. 26. 14:53
728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router

Spring Integration for Backend Developers

조건에 따른 메시지 필터링

1.1 Message Filter란?

Message Filter메시지 처리 과정에서 특정 조건에 따라 메시지를 선택적으로 전달하거나 폐기하는 역할을 합니다.

  • 필터링 조건: 메시지의 Payload, Header, 또는 기타 메타데이터를 기반으로 정의됩니다.
  • 주요 특징:
    • 조건에 부합하지 않는 메시지는 전달되지 않고 처리 과정에서 제거됩니다.
    • 처리 로직과 조건 정의를 명확히 분리할 수 있어 유연한 설계가 가능합니다.

Spring Integration에서 filter DSL 메서드를 사용하여 쉽게 구현할 수 있습니다.

1.2 Message Filter의 기본 구성

Message Filter는 다음과 같은 요소로 구성됩니다:

  1. 입력 채널: 필터링 대상 메시지가 전달되는 채널.
  2. 조건(Expression): 메시지를 필터링하는 기준.
  3. 출력 채널: 조건을 만족하는 메시지가 전달되는 채널.
  4. 제거 메시지 처리(Optional): 조건에 맞지 않아 제거된 메시지를 별도로 처리.

1.3 Spring Integration에서의 Message Filter 사용법

1.3.1 간단한 메시지 필터링

@Configuration
public class MessageFilterConfig {

    @Bean
    public IntegrationFlow filterExampleFlow() {
        return IntegrationFlow.from("inputChannel")
                // 조건에 따라 메시지 필터링
                .filter(String.class, payload -> payload.contains("important"))
                .channel("outputChannel")
                .get();
    }

    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outputChannel() {
        return new DirectChannel();
    }
}

 

설명:

  • 조건: 메시지 Payload에 "important" 문자열이 포함된 경우만 전달.
  • 결과: 조건에 맞지 않는 메시지는 전달되지 않음.

1.3.2 필터 조건 확장

  • 조건을 payload, headers, metadata를 조합하여 설정할 수 있습니다.
  • 예: 메시지의 Header 값이 특정 값일 때만 전달.
.filter(Message.class, message -> 
    "critical".equals(message.getHeaders().get("priority"))
)

 

1.4 조건에 맞지 않는 메시지 처리

조건에 맞지 않는 메시지를 별도로 처리하려면 discardChannel을 사용할 수 있습니다.

  • discardChannel: 필터 조건에 부합하지 않는 메시지를 전달할 채널.
@Bean
public IntegrationFlow filterWithDiscardFlow() {
    return IntegrationFlow.from("inputChannel")
            .filter(String.class, payload -> payload.length() > 5,
                    spec -> spec.discardChannel("discardChannel"))
            .channel("outputChannel")
            .get();
}

@Bean
public IntegrationFlow discardHandlerFlow() {
    return IntegrationFlow.from("discardChannel")
            .handle(message -> System.out.println("Discarded: " + message.getPayload()))
            .get();
}

설명:

  • 메시지 Payload 길이가 5 이하인 경우 discardChannel로 전달됩니다.
  • 조건에 맞지 않는 메시지는 discardChannel에서 별도로 처리됩니다.

1.5 메시지 필터의 실용 사례

1.5.1 주문 데이터 필터링

조건: 주문 금액이 100 이상인 데이터만 처리.

@Configuration
public class OrderFilterConfig {

    @Bean
    public IntegrationFlow orderFilterFlow() {
        return IntegrationFlow.from("orderInputChannel")
                .filter(Order.class, order -> order.getAmount() >= 100)
                .channel("validOrderChannel")
                .get();
    }

    @Bean
    public MessageChannel orderInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel validOrderChannel() {
        return new DirectChannel();
    }
}

// 주문 클래스
class Order {
    private String orderId;
    private double amount;

    // Getters and Setters
}

 

설명:

  • 주문 데이터 중 금액이 100 이상인 데이터만 validOrderChannel로 전달됩니다.
  • 조건에 맞지 않는 메시지는 처리되지 않습니다.

1.5.2 고객 데이터 필터링

조건: 고객 등급이 VIP인 데이터만 처리.

@Configuration
public class CustomerFilterConfig {

    @Bean
    public IntegrationFlow customerFilterFlow() {
        return IntegrationFlow.from("customerInputChannel")
                .filter(Customer.class, customer -> "VIP".equals(customer.getGrade()))
                .channel("vipCustomerChannel")
                .get();
    }

    @Bean
    public MessageChannel customerInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel vipCustomerChannel() {
        return new DirectChannel();
    }
}

// 고객 클래스
class Customer {
    private String name;
    private String grade;

    // Getters and Setters
}

1.6 장점 및 활용 방안

장점

  1. 유연성: 조건 기반 메시지 처리를 간편하게 구현.
  2. 비즈니스 로직 분리: 필터링 조건과 실제 처리 로직을 분리하여 유지보수성 향상.
  3. 불필요한 메시지 제거: 메시지 큐의 부하를 줄이고, 중요한 데이터에 집중.

활용 방안

  • 알림 시스템: 우선순위가 높은 알림만 처리.
  • 데이터 필터링: 대량의 데이터 중 필요한 데이터만 추출.
  • 실시간 로그 처리: 특정 패턴이 포함된 로그만 분석.
728x90

1.7 Filter Sample Code & Test Code

Filter 설정

package kr.co.thekeytech.spring.eai.filter;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

@Configuration
public class FilterStudyConfig {

    // 1. Filter Flow 설정
    @Bean
    public IntegrationFlow filterFlow() {
        return IntegrationFlow.from("inputFilterChannel")
                // 조건: 메시지 내용이 "VALID" 포함
                .filter(String.class, payload -> payload.startsWith("VALID"),
                        spec -> spec.discardChannel("discardFilterChannel")) // 부적합 메시지 처리 채널
                // 조건: 메시지 헤더 'level'가 HIGH인 경우만 처리
                .filter(Message.class, m -> "HIGH".equals(m.getHeaders().get("level")),
                        spec -> spec.discardChannel("discardFilterChannel"))
                .channel("outputFilterChannel") // 적합한 메시지를 출력 채널로 전달
                .get();
    }

    // 2. 메시지 채널 정의
    @Bean
    public MessageChannel inputFilterChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel outputFilterChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageChannel discardFilterChannel() {
        return new DirectChannel();
    }
}

 

Gateway 설정

package kr.co.thekeytech.spring.eai.filter;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway
public interface FilterStudyGateway {
    @Gateway(requestChannel = "inputFilterChannel") // inputChannel을 명시적으로 지정
    void sendMessage(String message, @Header("level") String level);
}

 

Test 코드

package kr.co.thekeytech.spring.eai.filter;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
class FilterStudyConfigTest {

    @Autowired
    private MessageChannel inputFilterChannel;

    @Autowired
    private MessageChannel outputFilterChannel;

    @Autowired
    private MessageChannel discardFilterChannel;
    @Autowired
    private FilterStudyGateway filterStudyGateway;

    @Test
    void testFilterFlow() {
        // 1. Output 및 Discard 채널 설정
        QueueChannel outputQueue = new QueueChannel();
        QueueChannel discardQueue = new QueueChannel();

        ((DirectChannel) outputFilterChannel).subscribe(outputQueue::send);
        ((DirectChannel) discardFilterChannel).subscribe(discardQueue::send);

        // 2. 메시지 전송
        filterStudyGateway.sendMessage("VALID MESSAGE", "HIGH");
        filterStudyGateway.sendMessage("VALID MESSAGE", "LOW");
        filterStudyGateway.sendMessage("INVALID MESSAGE", "HIGH");


        // 3. 결과 검증
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        assertThat(outputMessage.getPayload()).isEqualTo("VALID MESSAGE");
        assertThat(outputMessage.getHeaders().get("level")).isEqualTo("HIGH");

        Message<?> discardMessage1 = discardQueue.receive(1000);
        assertThat(discardMessage1).isNotNull();
        assertThat(discardMessage1.getPayload()).isEqualTo("VALID MESSAGE");
        assertThat(discardMessage1.getHeaders().get("level")).isEqualTo("LOW");

        Message<?> discardMessage2 = discardQueue.receive(1000);
        assertThat(discardMessage2).isNotNull();
        assertThat(discardMessage2.getPayload()).isEqualTo("INVALID MESSAGE");
        assertThat(discardMessage2.getHeaders().get("level")).isEqualTo("HIGH");
    }
}

728x90
Comments