Akashic Records

Splitter와 Aggregator - 예제: XML 데이터 처리 본문

Spring Integration for Beginners

Splitter와 Aggregator - 예제: XML 데이터 처리

Andrew's Akashic Records 2025. 1. 7. 10:34
728x90
2. Spring Integration 주요 컴포넌트 활용
2.1 Message Filter와 Router
2.2 Service Activator와 Message Handler
2.3 Splitter와 Aggregator
2.4 예제: XML 데이터 처리

Spring Integration for Backend Developers

XML 데이터 처리 워크플로우

목표

  • 대규모 XML 파일을 개별 요소로 분할(Splitter).
  • 분할된 데이터를 처리(Processor).
  • 처리된 결과를 집계(Aggregator)하여 최종 XML을 생성.

1. 사용 시나리오

입력

  • 대규모 XML 파일:
    <?xml version="1.0" encoding="utf-8"?>
    <orders>
        <order id="1">
            <item>Item1</item>
            <quantity>10</quantity>
        </order>
        <order id="2">
            <item>Item2</item>
            <quantity>5</quantity>
        </order>
        <order id="3">
            <item>Item3</item>
            <quantity>15</quantity>
        </order>
    </orders>

출력

  • 처리된 결과를 포함하는 최종 XML 파일:
    <?xml version="1.0" encoding="utf-8"?>
    <processedOrders>
        <processedOrder id="1">
            <item>Item1</item>
            <quantity>10</quantity>
    		<status>Processed</status>
    	</processedOrder>
        <processedOrder id="2">
            <item>Item2</item>
            <quantity>5</quantity>
    	<status>Processed</status>
        </processedOrder>
        <processedOrder id="3">
            <item>Item3</item>
            <quantity>15</quantity>
    	<status>Processed</status>
        </processedOrder>
    </processedOrders>

2. Spring Integration 구성

2.1 Splitter 구현

Splitter는 XML 파일을 개별 <order> 태그로 분할합니다.

@Bean
public IntegrationFlow xmlSplitterFlow() {
    return IntegrationFlow.from("xml.order.split.channel")
            .handle((payload, headers) -> {
                logger.info("========> Before Handle Headers:{}", headers);
                logger.info("========> Before Handle Payload:{}{}", System.lineSeparator(), payload);
                return payload;
            })
            .split(new XPathMessageSplitter("/orders/order")) // XPath로 XML 분할
            .channel("xml.order.process.channel")
            .get();
}

2.2 Processor 구현

Processor는 분할된 각 <order> 데이터를 처리하여 <processedOrder>로 변환합니다.

package kr.co.thekeytech.spring.eai.spltagr;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class OrderItemService {

    private static final Logger logger = LoggerFactory.getLogger(OrderItemService.class);

    public String xmlProcessor(String orderXml) {
        logger.info("========> OrderItemService xmlProcessor orderXml: {}", orderXml);
        // 간단한 처리: <order>를 <processedOrder>로 변환

        // XML 선언문 제거
        String cleanedXml = orderXml.replaceFirst("<\\?xml.*\\?>", "");
        return cleanedXml.replace("<order", "<processedOrder")
                .replace("</order>", "<status>Processed</status></processedOrder>");
    }
}

2.3 Aggregator 구현

Aggregator는 처리된 결과를 집계하여 최종 XML 파일을 생성합니다.

@Bean
public IntegrationFlow xmlAggregatorFlow(OrderItemService orderItemService) {
    return IntegrationFlow.from("xml.order.process.channel")
            .handle(orderItemService, "xmlProcessor") // 개별 항목 처리
            .aggregate(a -> a.outputProcessor(group -> {
                StringBuilder finalXml = new StringBuilder("<?xml version=\"1.0\" encoding=\"utf-8\"?>"+System.lineSeparator());
                finalXml.append("<processedOrders>"+System.lineSeparator());
                group.getMessages().forEach(msg -> finalXml.append(msg.getPayload()+System.lineSeparator()));
                finalXml.append("</processedOrders>");
                return finalXml.toString();
            }))
            .handle((payload, headers) -> {
                logger.info("========> After Handle Headers:{}", headers);
                logger.info("========> After Handle Payload:{}{}", System.lineSeparator(), payload);
                return payload;
            })
            .channel("xml.order.output.channel") // 최종 결과 전달
            .get();
}

2.4 Gateway 구현

 

package kr.co.thekeytech.spring.eai.spltagr;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

import java.util.List;

@MessagingGateway
public interface SpltagrGateway {
    @Gateway(requestChannel = "xml.order.split.channel")// inputChannel을 명시적으로 지정
    void sendMessagXml(String payload,  @Header("orderId") String orderId);
}

3. 테스트 코드

입력 메시지 전송

package kr.co.thekeytech.spring.eai.spltagr;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
class XmlOrderProcessingConfigTest {

    @Autowired
    private SpltagrGateway spltagrGateway;

    @Autowired
    @Qualifier("xml.order.output.channel")
    private MessageChannel orderOutputFlowChannel;

    @Test
    void testSplitFlow() {
        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) orderOutputFlowChannel).subscribe(outputQueue::send);

        String inputXml = """
        <?xml version="1.0" encoding="utf-8"?>
        <orders>
            <order id="1">
                <item>Item1</item>
                <quantity>10</quantity>
            </order>
            <order id="2">
                <item>Item2</item>
                <quantity>5</quantity>
            </order>
            <order id="3">
                <item>Item3</item>
                <quantity>15</quantity>
            </order>
        </orders>
        """;

        // 주문 메시지 생성 및 전송
        spltagrGateway.sendMessagXml(inputXml, "ID12345678");

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        System.out.println("## Final XML: \n" + outputMessage.getPayload());
    }
}

4. 결과

처리된 최종 XML

<?xml version="1.0" encoding="utf-8"?>
<processedOrders>
<processedOrder id="1">
        <item>Item1</item>
        <quantity>10</quantity>
    <status>Processed</status></processedOrder>
<processedOrder id="2">
        <item>Item2</item>
        <quantity>5</quantity>
    <status>Processed</status></processedOrder>
<processedOrder id="3">
        <item>Item3</item>
        <quantity>15</quantity>
    <status>Processed</status></processedOrder>
</processedOrders>
728x90

5. 주요 구성 요소 설명

1. XPathMessageSplitter

  • Splitter로서 XML 데이터를 XPath를 사용해 특정 노드로 분할.

2. Processor

  • 각 <order> 데이터를 <processedOrder>로 변환.
  • 간단한 문자열 변환을 사용했지만 실제로는 XML 라이브러리(e.g., JAXB, DOM)를 사용할 수 있음.

3. Aggregator

  • 처리된 메시지들을 결합하여 최종 XML을 생성.
  • 집계 조건:
    • 모든 메시지가 처리될 때까지 기다림.
    • StringBuilder를 사용하여 메시지를 조합.

728x90
Comments