Akashic Records

Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해 본문

Spring Integration for Beginners

Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해

Andrew's Akashic Records 2025. 1. 6. 13:47
728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
2.2 Service Activator와 Message Handler
2.3 Splitter와 Aggregator

Spring Integration for Backend Developers

메시지 분할(Split)과 집계(Aggregate) 이해

Spring Integration에서 SplitterAggregator는 메시지를 분할하고 다시 결합하는 데 사용되는 강력한 컴포넌트입니다. 이 기능은 대규모 데이터 처리, 병렬 처리, 집계가 필요한 워크플로우에 매우 유용합니다.

1.1 Splitter란?

Splitter는 단일 메시지를 여러 개의 메시지로 나누는 데 사용됩니다.

사용 사례

  1. 파일 내용을 한 줄씩 처리.
  2. 리스트 데이터를 개별 항목으로 분할.
  3. 데이터 스트림을 여러 부분으로 나눠 병렬 처리.

Splitter 동작 방식

  1. 입력 메시지:
    • Payload가 리스트, 배열, 문자열, JSON, XML 등 반복 가능한 구조.
  2. 출력 메시지:
    • 입력 메시지의 각 항목을 개별 메시지로 분할.
  3. 헤더 유지:
    • 기본적으로 분할된 메시지는 원본 메시지의 헤더를 유지.

Splitter 예제

1. Splitter 구성

@Bean
public IntegrationFlow splitterFlow() {
    return IntegrationFlow.from("inputChannel")
            .split()
            .channel("splitOutputChannel")
            .get();
}
  • split(): Payload가 리스트, 배열, 문자열 등인 경우 자동으로 분할.

2. 메시지 예제

  • 입력 메시지:
  • { "payload": ["Task1", "Task2", "Task3"], "headers": {"source": "UserRequest"} }
  • 출력 메시지:
  • Task1, Task2, Task3

1.2 Aggregator란?

Aggregator는 여러 메시지를 집계하여 단일 메시지로 결합합니다.

사용 사례

  1. 병렬 처리된 결과를 다시 결합.
  2. 분할된 데이터의 최종 집계.
  3. 계산 결과의 합산.

Aggregator 동작 방식

  1. 입력 메시지:
    • 동일한 correlationKey를 가진 메시지.
  2. 집계 조건:
    • 일정 수의 메시지 도착.
    • 시간 초과.
    • 사용자 정의 조건.
  3. 출력 메시지:
    • 집계된 메시지 데이터를 단일 메시지로 결합.

Aggregator 예제

1. Aggregator 구성

@Bean
public IntegrationFlow aggregatorFlow() {
    return IntegrationFlow.from("splitOutputChannel")
            .aggregate()
            .channel("aggregatedOutputChannel")
            .get();
}
  • aggregate(): 동일한 correlationKey를 기준으로 메시지를 집계.

2. 메시지 예제

  • 입력 메시지:
  • Task1, Task2, Task3
  • 출력 메시지:
  • { "payload": ["Task1", "Task2", "Task3"], "headers": {"source": "UserRequest"} }

1.3 Splitter와 Aggregator를 활용한 메시지 플로우

예제: 주문 처리 시스템

  • 목표:
    1. 다중 상품 주문을 개별 상품 처리로 분할.
    2. 각 상품의 처리 결과를 집계하여 최종 처리 결과를 생성.

1. Splitter와 Aggregator 플로우 정의

package kr.co.thekeytech.spring.eai.spltagr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;

@Configuration
public class OrderProcessingConfig {

    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingConfig.class);

    @Bean
    public IntegrationFlow orderProcessingFlow() {
        return IntegrationFlow.from("spltagr.order.input.channel")
                .handle((payload, headers) -> {
                    logger.info("========> Before Handle Headers: {}", headers);
                    logger.info("========> Before Handle Payload: {}", payload);
                    return payload;
                })
                .split() // 메시지 분할
                .handle("orderItemService", "processOrder") // 개별 항목 처리
                .aggregate() // 처리 결과 집계
                .handle((payload, headers) -> {
                    logger.info("========> After Handle Headers: {}", headers);
                    logger.info("========> After Handle Payload: {}", payload);
                    return payload;
                })
                .channel("spltagr.order.output.channel")
                .get();
    }
}

2. 서비스 클래스

package kr.co.thekeytech.spring.eai.spltagr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class OrderItemService {

    private static final Logger logger = LoggerFactory.getLogger(OrderItemService.class);

    public String processOrder(String orderPayload) {
        logger.info("========> OrderItemService processOrder Item: {}", orderPayload);
        return "UpdatedOrder, " + orderPayload;
    }
}

3. Gateway 코드

package kr.co.thekeytech.spring.eai.spltagr;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

import java.util.List;

@MessagingGateway
public interface SpltagrGateway {
    @Gateway(requestChannel = "spltagr.order.input.channel")// inputChannel을 명시적으로 지정
    void sendMessage(List<String> payload,  @Header("orderId") String orderId);
}

4. 메시지 전송

package kr.co.thekeytech.spring.eai.spltagr;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
class OrderProcessingConfigTest {

    @Autowired
    private SpltagrGateway spltagrGateway;

    @Autowired
    @Qualifier("spltagr.order.output.channel")
    private MessageChannel orderOutputFlowChannel;

    @Test
    void testFlow() {
        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) orderOutputFlowChannel).subscribe(outputQueue::send);

        // 주문 메시지 생성 및 전송
        spltagrGateway.sendMessage(List.of("Item 1", "Item 2", "Item 3"), "ID12345678");

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        System.out.println("## Final Flow Output: " + outputMessage.getPayload());
    }
}

5. 결과

2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> Before Handle Headers: {replyChannel=nullChannel, errorChannel=, id=a85a4349-67e1-661d-8fe2-2d2fef8649f5, orderId=ID12345678, timestamp=1736137448798}
2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> Before Handle Payload: [Item 1, Item 2, Item 3]

2025-01-06 13:24:08 - k.c.t.s.eai.spltagr.OrderItemService - ========> OrderItemService processOrder Item: Item 1
2025-01-06 13:24:08 - k.c.t.s.eai.spltagr.OrderItemService - ========> OrderItemService processOrder Item: Item 2
2025-01-06 13:24:08 - k.c.t.s.eai.spltagr.OrderItemService - ========> OrderItemService processOrder Item: Item 3

2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> After Handle Headers: {replyChannel=nullChannel, sequenceNumber=3, errorChannel=, orderId=ID12345678, sequenceSize=3, correlationId=f483b8a6-eb61-71d0-53d8-78f00eb14ad5, id=43b72b6c-72d5-c4de-a7a1-eecd5018d700, timestamp=1736137448822}
2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> After Handle Payload: [UpdatedOrder, Item 1, UpdatedOrder, Item 2, UpdatedOrder, Item 3]

## Final Flow Output: [UpdatedOrder, Item 1, UpdatedOrder, Item 2, UpdatedOrder, Item 3]

1.4 활용 사례

  1. 데이터 파이프라인:
    • 대량의 데이터를 처리 단위로 나누고 병렬로 처리한 결과를 집계.
  2. 파일 처리:
    • 파일의 각 줄을 개별 메시지로 분할하여 처리 후 결합.
  3. 복합 작업 관리:
    • 여러 작업의 결과를 취합하여 최종 보고서 생성.
728x90

Splitter-Aggregator 패턴의 실제 활용

Splitter-Aggregator 패턴은 데이터를 작은 단위로 분할하고, 각 단위를 처리한 후 결과를 다시 결합하여 최종 결과를 생성하는 패턴입니다. 이 패턴은 병렬 처리, 데이터 집계, 대규모 처리 워크플로우에서 매우 유용하게 사용됩니다.

2.1 Splitter-Aggregator 패턴의 주요 구성 요소

  1. Splitter:
    • 메시지를 작은 단위로 분할하여 병렬 처리.
    • 입력 데이터가 리스트, 배열, 문자열, JSON, XML 등의 반복 가능한 구조일 때 사용.
  2. Processor:
    • 분할된 메시지 각각에 대해 처리 로직을 실행.
    • 예: 데이터 변환, 비즈니스 로직 수행.
  3. Aggregator:
    • 분할된 메시지를 다시 결합하여 단일 메시지로 반환.
    • 집계 조건:
      • 모든 메시지가 처리될 때까지 기다림.
      • 특정 시간 초과.
      • 사용자 정의 조건.

2.2 Splitter-Aggregator 패턴 활용 사례

위 "예제: 주문 처리 시스템"을 활용 패턴으로 수정합니다.

1.1 Config 설정

Splitter는 주문 메시지를 각 상품으로 분할합니다.

package kr.co.thekeytech.spring.eai.spltagr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;

@Configuration
public class OrderProcessingConfig {

    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingConfig.class);


    @Bean
    public IntegrationFlow orderSplit() {
        return IntegrationFlow.from("spltagr.order.split.channel")
                .handle((payload, headers) -> {
                    logger.info("========> Before Handle Headers: {}", headers);
                    logger.info("========> Before Handle Payload: {}", payload);
                    return payload;
                })
                .split() // 메시지 분할
                .channel("spltagr.order.process.channel")
                .get();
    }

    @Bean
    public IntegrationFlow orderProcess(OrderItemService orderItemService) {
        return IntegrationFlow.from("spltagr.order.process.channel")
                .handle(orderItemService, "processOrder") // 개별 항목 처리
                .aggregate() // 처리 결과 집계
                .transform(String.class, String::toUpperCase)
                .handle((payload, headers) -> {
                    logger.info("========> After Handle Headers: {}", headers);
                    logger.info("========> After Handle Payload: {}", payload);
                    return payload;
                })
                .channel("spltagr.order.output.channel")
                .get();
    }
}

1.2 Gateway

package kr.co.thekeytech.spring.eai.spltagr;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

import java.util.List;

@MessagingGateway
public interface SpltagrGateway {
    @Gateway(requestChannel = "spltagr.order.split.channel")// inputChannel을 명시적으로 지정
    void sendMessagSplit(List<String> payload,  @Header("orderId") String orderId);
}

1.3 테스트 코드

 

package kr.co.thekeytech.spring.eai.spltagr;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
class OrderProcessingConfigTest {

    @Autowired
    private SpltagrGateway spltagrGateway;

    @Autowired
    @Qualifier("spltagr.order.output.channel")
    private MessageChannel orderOutputFlowChannel;

    @Test
    void testSplitFlow() {
        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) orderOutputFlowChannel).subscribe(outputQueue::send);

        // 주문 메시지 생성 및 전송
        spltagrGateway.sendMessagSplit(List.of("Item 1", "Item 2", "Item 3"), "ID12345678");

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        System.out.println("## Final Flow Output: " + outputMessage.getPayload());
    }
}

 

Splitter-Aggregator 패턴은 Spring Integration에서 대규모 데이터 처리나 병렬 처리가 필요한 워크플로우에 적합한 솔루션을 제공합니다. Splitter로 데이터를 분할하고, 각 단위를 처리한 후 Aggregator로 집계하여 최종 결과를 생성할 수 있습니다.

728x90
Comments