Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- JVM
- flet
- 파이썬
- 소프트웨어공학
- write by chatGPT
- write by GPT-4
- 자바네트워크
- Database
- 유닉스
- kotlin
- 자바
- 역학
- 인프라
- jpa
- 고전역학
- spring data jpa
- android
- spring integration
- 시스템
- 자바암호
- Java
- 웹 크롤링
- python
- oracle
- NIO
- 리눅스
- chatGPT's answer
- 데이터베이스
- 코틀린
- GPT-4's answer
Archives
- Today
- Total
Akashic Records
Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해 본문
Spring Integration for Beginners
Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해
Andrew's Akashic Records 2025. 1. 6. 13:47728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
2.2 Service Activator와 Message Handler
2.3 Splitter와 Aggregator
메시지 분할(Split)과 집계(Aggregate) 이해
Spring Integration에서 Splitter와 Aggregator는 메시지를 분할하고 다시 결합하는 데 사용되는 강력한 컴포넌트입니다. 이 기능은 대규모 데이터 처리, 병렬 처리, 집계가 필요한 워크플로우에 매우 유용합니다.
1.1 Splitter란?
Splitter는 단일 메시지를 여러 개의 메시지로 나누는 데 사용됩니다.
사용 사례
- 파일 내용을 한 줄씩 처리.
- 리스트 데이터를 개별 항목으로 분할.
- 데이터 스트림을 여러 부분으로 나눠 병렬 처리.
Splitter 동작 방식
- 입력 메시지:
- Payload가 리스트, 배열, 문자열, JSON, XML 등 반복 가능한 구조.
- 출력 메시지:
- 입력 메시지의 각 항목을 개별 메시지로 분할.
- 헤더 유지:
- 기본적으로 분할된 메시지는 원본 메시지의 헤더를 유지.
Splitter 예제
1. Splitter 구성
@Bean
public IntegrationFlow splitterFlow() {
return IntegrationFlow.from("inputChannel")
.split()
.channel("splitOutputChannel")
.get();
}
- split(): Payload가 리스트, 배열, 문자열 등인 경우 자동으로 분할.
2. 메시지 예제
- 입력 메시지:
- { "payload": ["Task1", "Task2", "Task3"], "headers": {"source": "UserRequest"} }
- 출력 메시지:
- Task1, Task2, Task3
1.2 Aggregator란?
Aggregator는 여러 메시지를 집계하여 단일 메시지로 결합합니다.
사용 사례
- 병렬 처리된 결과를 다시 결합.
- 분할된 데이터의 최종 집계.
- 계산 결과의 합산.
Aggregator 동작 방식
- 입력 메시지:
- 동일한 correlationKey를 가진 메시지.
- 집계 조건:
- 일정 수의 메시지 도착.
- 시간 초과.
- 사용자 정의 조건.
- 출력 메시지:
- 집계된 메시지 데이터를 단일 메시지로 결합.
Aggregator 예제
1. Aggregator 구성
@Bean
public IntegrationFlow aggregatorFlow() {
return IntegrationFlow.from("splitOutputChannel")
.aggregate()
.channel("aggregatedOutputChannel")
.get();
}
- aggregate(): 동일한 correlationKey를 기준으로 메시지를 집계.
2. 메시지 예제
- 입력 메시지:
- Task1, Task2, Task3
- 출력 메시지:
- { "payload": ["Task1", "Task2", "Task3"], "headers": {"source": "UserRequest"} }
1.3 Splitter와 Aggregator를 활용한 메시지 플로우
예제: 주문 처리 시스템
- 목표:
- 다중 상품 주문을 개별 상품 처리로 분할.
- 각 상품의 처리 결과를 집계하여 최종 처리 결과를 생성.
1. Splitter와 Aggregator 플로우 정의
package kr.co.thekeytech.spring.eai.spltagr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
@Configuration
public class OrderProcessingConfig {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingConfig.class);
@Bean
public IntegrationFlow orderProcessingFlow() {
return IntegrationFlow.from("spltagr.order.input.channel")
.handle((payload, headers) -> {
logger.info("========> Before Handle Headers: {}", headers);
logger.info("========> Before Handle Payload: {}", payload);
return payload;
})
.split() // 메시지 분할
.handle("orderItemService", "processOrder") // 개별 항목 처리
.aggregate() // 처리 결과 집계
.handle((payload, headers) -> {
logger.info("========> After Handle Headers: {}", headers);
logger.info("========> After Handle Payload: {}", payload);
return payload;
})
.channel("spltagr.order.output.channel")
.get();
}
}
2. 서비스 클래스
package kr.co.thekeytech.spring.eai.spltagr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class OrderItemService {
private static final Logger logger = LoggerFactory.getLogger(OrderItemService.class);
public String processOrder(String orderPayload) {
logger.info("========> OrderItemService processOrder Item: {}", orderPayload);
return "UpdatedOrder, " + orderPayload;
}
}
3. Gateway 코드
package kr.co.thekeytech.spring.eai.spltagr;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
import java.util.List;
@MessagingGateway
public interface SpltagrGateway {
@Gateway(requestChannel = "spltagr.order.input.channel")// inputChannel을 명시적으로 지정
void sendMessage(List<String> payload, @Header("orderId") String orderId);
}
4. 메시지 전송
package kr.co.thekeytech.spring.eai.spltagr;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
class OrderProcessingConfigTest {
@Autowired
private SpltagrGateway spltagrGateway;
@Autowired
@Qualifier("spltagr.order.output.channel")
private MessageChannel orderOutputFlowChannel;
@Test
void testFlow() {
QueueChannel outputQueue = new QueueChannel();
((DirectChannel) orderOutputFlowChannel).subscribe(outputQueue::send);
// 주문 메시지 생성 및 전송
spltagrGateway.sendMessage(List.of("Item 1", "Item 2", "Item 3"), "ID12345678");
// 결과 출력
Message<?> outputMessage = outputQueue.receive(1000);
assertThat(outputMessage).isNotNull();
System.out.println("## Final Flow Output: " + outputMessage.getPayload());
}
}
5. 결과
2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> Before Handle Headers: {replyChannel=nullChannel, errorChannel=, id=a85a4349-67e1-661d-8fe2-2d2fef8649f5, orderId=ID12345678, timestamp=1736137448798}
2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> Before Handle Payload: [Item 1, Item 2, Item 3]
2025-01-06 13:24:08 - k.c.t.s.eai.spltagr.OrderItemService - ========> OrderItemService processOrder Item: Item 1
2025-01-06 13:24:08 - k.c.t.s.eai.spltagr.OrderItemService - ========> OrderItemService processOrder Item: Item 2
2025-01-06 13:24:08 - k.c.t.s.eai.spltagr.OrderItemService - ========> OrderItemService processOrder Item: Item 3
2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> After Handle Headers: {replyChannel=nullChannel, sequenceNumber=3, errorChannel=, orderId=ID12345678, sequenceSize=3, correlationId=f483b8a6-eb61-71d0-53d8-78f00eb14ad5, id=43b72b6c-72d5-c4de-a7a1-eecd5018d700, timestamp=1736137448822}
2025-01-06 13:24:08 - k.c.t.s.e.s.OrderProcessingConfig - ========> After Handle Payload: [UpdatedOrder, Item 1, UpdatedOrder, Item 2, UpdatedOrder, Item 3]
## Final Flow Output: [UpdatedOrder, Item 1, UpdatedOrder, Item 2, UpdatedOrder, Item 3]
1.4 활용 사례
- 데이터 파이프라인:
- 대량의 데이터를 처리 단위로 나누고 병렬로 처리한 결과를 집계.
- 파일 처리:
- 파일의 각 줄을 개별 메시지로 분할하여 처리 후 결합.
- 복합 작업 관리:
- 여러 작업의 결과를 취합하여 최종 보고서 생성.
728x90
Splitter-Aggregator 패턴의 실제 활용
Splitter-Aggregator 패턴은 데이터를 작은 단위로 분할하고, 각 단위를 처리한 후 결과를 다시 결합하여 최종 결과를 생성하는 패턴입니다. 이 패턴은 병렬 처리, 데이터 집계, 대규모 처리 워크플로우에서 매우 유용하게 사용됩니다.
2.1 Splitter-Aggregator 패턴의 주요 구성 요소
- Splitter:
- 메시지를 작은 단위로 분할하여 병렬 처리.
- 입력 데이터가 리스트, 배열, 문자열, JSON, XML 등의 반복 가능한 구조일 때 사용.
- Processor:
- 분할된 메시지 각각에 대해 처리 로직을 실행.
- 예: 데이터 변환, 비즈니스 로직 수행.
- Aggregator:
- 분할된 메시지를 다시 결합하여 단일 메시지로 반환.
- 집계 조건:
- 모든 메시지가 처리될 때까지 기다림.
- 특정 시간 초과.
- 사용자 정의 조건.
2.2 Splitter-Aggregator 패턴 활용 사례
위 "예제: 주문 처리 시스템"을 활용 패턴으로 수정합니다.
1.1 Config 설정
Splitter는 주문 메시지를 각 상품으로 분할합니다.
package kr.co.thekeytech.spring.eai.spltagr;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
@Configuration
public class OrderProcessingConfig {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingConfig.class);
@Bean
public IntegrationFlow orderSplit() {
return IntegrationFlow.from("spltagr.order.split.channel")
.handle((payload, headers) -> {
logger.info("========> Before Handle Headers: {}", headers);
logger.info("========> Before Handle Payload: {}", payload);
return payload;
})
.split() // 메시지 분할
.channel("spltagr.order.process.channel")
.get();
}
@Bean
public IntegrationFlow orderProcess(OrderItemService orderItemService) {
return IntegrationFlow.from("spltagr.order.process.channel")
.handle(orderItemService, "processOrder") // 개별 항목 처리
.aggregate() // 처리 결과 집계
.transform(String.class, String::toUpperCase)
.handle((payload, headers) -> {
logger.info("========> After Handle Headers: {}", headers);
logger.info("========> After Handle Payload: {}", payload);
return payload;
})
.channel("spltagr.order.output.channel")
.get();
}
}
1.2 Gateway
package kr.co.thekeytech.spring.eai.spltagr;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
import java.util.List;
@MessagingGateway
public interface SpltagrGateway {
@Gateway(requestChannel = "spltagr.order.split.channel")// inputChannel을 명시적으로 지정
void sendMessagSplit(List<String> payload, @Header("orderId") String orderId);
}
1.3 테스트 코드
package kr.co.thekeytech.spring.eai.spltagr;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
class OrderProcessingConfigTest {
@Autowired
private SpltagrGateway spltagrGateway;
@Autowired
@Qualifier("spltagr.order.output.channel")
private MessageChannel orderOutputFlowChannel;
@Test
void testSplitFlow() {
QueueChannel outputQueue = new QueueChannel();
((DirectChannel) orderOutputFlowChannel).subscribe(outputQueue::send);
// 주문 메시지 생성 및 전송
spltagrGateway.sendMessagSplit(List.of("Item 1", "Item 2", "Item 3"), "ID12345678");
// 결과 출력
Message<?> outputMessage = outputQueue.receive(1000);
assertThat(outputMessage).isNotNull();
System.out.println("## Final Flow Output: " + outputMessage.getPayload());
}
}
Splitter-Aggregator 패턴은 Spring Integration에서 대규모 데이터 처리나 병렬 처리가 필요한 워크플로우에 적합한 솔루션을 제공합니다. Splitter로 데이터를 분할하고, 각 단위를 처리한 후 Aggregator로 집계하여 최종 결과를 생성할 수 있습니다.
728x90
'Spring Integration for Beginners' 카테고리의 다른 글
실용적인 통합 시나리오- HTTP와 REST 통합 (0) | 2025.01.08 |
---|---|
Splitter와 Aggregator - 예제: XML 데이터 처리 (0) | 2025.01.07 |
Service Activator와 Message Handler - Custom Message Handler 작성 (2) | 2025.01.03 |
Service Activator와 Message Handler - Service Activator를 사용한 비즈니스 로직 구현 (0) | 2025.01.02 |
Message Filter와 Router - 동적 라우팅 구현 (0) | 2024.12.27 |
Comments