본문 바로가기
Spring Integration for Beginners

Splitter와 Aggregator - 예제: XML 데이터 처리

by Andrew's Akashic Records 2025. 1. 7.
728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
2.2 Service Activator와 Message Handler
2.3 Splitter와 Aggregator
2.4 예제: XML 데이터 처리

Spring Integration for Backend Developers

XML 데이터 처리 워크플로우

목표

  • 대규모 XML 파일을 개별 요소로 분할(Splitter).
  • 분할된 데이터를 처리(Processor).
  • 처리된 결과를 집계(Aggregator)하여 최종 XML을 생성.

1. 사용 시나리오

입력

  • 대규모 XML 파일:
    <?xml version="1.0" encoding="utf-8"?>
    <orders>
        <order id="1">
            <item>Item1</item>
            <quantity>10</quantity>
        </order>
        <order id="2">
            <item>Item2</item>
            <quantity>5</quantity>
        </order>
        <order id="3">
            <item>Item3</item>
            <quantity>15</quantity>
        </order>
    </orders>

출력

  • 처리된 결과를 포함하는 최종 XML 파일:
    <?xml version="1.0" encoding="utf-8"?>
    <processedOrders>
        <processedOrder id="1">
            <item>Item1</item>
            <quantity>10</quantity>
    		<status>Processed</status>
    	</processedOrder>
        <processedOrder id="2">
            <item>Item2</item>
            <quantity>5</quantity>
    	<status>Processed</status>
        </processedOrder>
        <processedOrder id="3">
            <item>Item3</item>
            <quantity>15</quantity>
    	<status>Processed</status>
        </processedOrder>
    </processedOrders>

2. Spring Integration 구성

2.1 Splitter 구현

Splitter는 XML 파일을 개별 <order> 태그로 분할합니다.

@Bean
public IntegrationFlow xmlSplitterFlow() {
    return IntegrationFlow.from("xml.order.split.channel")
            .handle((payload, headers) -> {
                logger.info("========> Before Handle Headers:{}", headers);
                logger.info("========> Before Handle Payload:{}{}", System.lineSeparator(), payload);
                return payload;
            })
            .split(new XPathMessageSplitter("/orders/order")) // XPath로 XML 분할
            .channel("xml.order.process.channel")
            .get();
}

2.2 Processor 구현

Processor는 분할된 각 <order> 데이터를 처리하여 <processedOrder>로 변환합니다.

package kr.co.thekeytech.spring.eai.spltagr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class OrderItemService {

    private static final Logger logger = LoggerFactory.getLogger(OrderItemService.class);

    public String xmlProcessor(String orderXml) {
        logger.info("========> OrderItemService xmlProcessor orderXml: {}", orderXml);
        // 간단한 처리: <order>를 <processedOrder>로 변환

        // XML 선언문 제거
        String cleanedXml = orderXml.replaceFirst("<\\?xml.*\\?>", "");
        return cleanedXml.replace("<order", "<processedOrder")
                .replace("</order>", "<status>Processed</status></processedOrder>");
    }
}

2.3 Aggregator 구현

Aggregator는 처리된 결과를 집계하여 최종 XML 파일을 생성합니다.

@Bean
public IntegrationFlow xmlAggregatorFlow(OrderItemService orderItemService) {
    return IntegrationFlow.from("xml.order.process.channel")
            .handle(orderItemService, "xmlProcessor") // 개별 항목 처리
            .aggregate(a -> a.outputProcessor(group -> {
                StringBuilder finalXml = new StringBuilder("<?xml version=\"1.0\" encoding=\"utf-8\"?>"+System.lineSeparator());
                finalXml.append("<processedOrders>"+System.lineSeparator());
                group.getMessages().forEach(msg -> finalXml.append(msg.getPayload()+System.lineSeparator()));
                finalXml.append("</processedOrders>");
                return finalXml.toString();
            }))
            .handle((payload, headers) -> {
                logger.info("========> After Handle Headers:{}", headers);
                logger.info("========> After Handle Payload:{}{}", System.lineSeparator(), payload);
                return payload;
            })
            .channel("xml.order.output.channel") // 최종 결과 전달
            .get();
}

2.4 Gateway 구현

 

package kr.co.thekeytech.spring.eai.spltagr;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

import java.util.List;

@MessagingGateway
public interface SpltagrGateway {
    @Gateway(requestChannel = "xml.order.split.channel")// inputChannel을 명시적으로 지정
    void sendMessagXml(String payload,  @Header("orderId") String orderId);
}

3. 테스트 코드

입력 메시지 전송

package kr.co.thekeytech.spring.eai.spltagr;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
class XmlOrderProcessingConfigTest {

    @Autowired
    private SpltagrGateway spltagrGateway;

    @Autowired
    @Qualifier("xml.order.output.channel")
    private MessageChannel orderOutputFlowChannel;

    @Test
    void testSplitFlow() {
        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) orderOutputFlowChannel).subscribe(outputQueue::send);

        String inputXml = """
        <?xml version="1.0" encoding="utf-8"?>
        <orders>
            <order id="1">
                <item>Item1</item>
                <quantity>10</quantity>
            </order>
            <order id="2">
                <item>Item2</item>
                <quantity>5</quantity>
            </order>
            <order id="3">
                <item>Item3</item>
                <quantity>15</quantity>
            </order>
        </orders>
        """;

        // 주문 메시지 생성 및 전송
        spltagrGateway.sendMessagXml(inputXml, "ID12345678");

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        System.out.println("## Final XML: \n" + outputMessage.getPayload());
    }
}

4. 결과

처리된 최종 XML

<?xml version="1.0" encoding="utf-8"?>
<processedOrders>
<processedOrder id="1">
        <item>Item1</item>
        <quantity>10</quantity>
    <status>Processed</status></processedOrder>
<processedOrder id="2">
        <item>Item2</item>
        <quantity>5</quantity>
    <status>Processed</status></processedOrder>
<processedOrder id="3">
        <item>Item3</item>
        <quantity>15</quantity>
    <status>Processed</status></processedOrder>
</processedOrders>
728x90

5. 주요 구성 요소 설명

1. XPathMessageSplitter

  • Splitter로서 XML 데이터를 XPath를 사용해 특정 노드로 분할.

2. Processor

  • 각 <order> 데이터를 <processedOrder>로 변환.
  • 간단한 문자열 변환을 사용했지만 실제로는 XML 라이브러리(e.g., JAXB, DOM)를 사용할 수 있음.

3. Aggregator

  • 처리된 메시지들을 결합하여 최종 XML을 생성.
  • 집계 조건:
    • 모든 메시지가 처리될 때까지 기다림.
    • StringBuilder를 사용하여 메시지를 조합.

728x90