Akashic Records

실용적인 통합 시나리오- CSV 파일 데이터를 읽고, Splitter로 처리 본문

Spring Integration for Beginners

실용적인 통합 시나리오- CSV 파일 데이터를 읽고, Splitter로 처리

Andrew's Akashic Records 2025. 1. 9. 16:29
728x90
3. 실용적인 통합 시나리오
3.1 HTTP와 REST 통합
3.2 CSV 파일 데이터를 읽고, Splitter로 처리

Spring Integration for Backend Developers

CSV 파일 데이터를 읽고, Splitter로 처리

Spring Integration을 활용하여 CSV 파일을 처리하는 애플리케이션의 구성입니다. 주요 목적은 지정된 입력 디렉토리에서 CSV 파일을 읽고, 데이터를 처리한 후, 결과를 출력 디렉토리에 JSON 형식으로 저장하며, 원본 파일은 완료된 디렉토리로 이동합니다.

코드 구성 및 설명

1. 주요 디렉토리 설정

  • INPUT_DIRECTORY: CSV 파일을 감시할 디렉토리 경로.
  • OUTPUT_DIRECTORY: 처리된 파일을 저장할 디렉토리 경로.
  • COMPLETE_DIRECTORY: 완료된 원본 파일을 이동할 디렉토리 경로.

2. 파일의 총 줄 수 계산 (calculateLineCountHeader)

  • 파일의 총 줄 수를 계산하여 메시지 헤더(totalLines)에 추가합니다.
  • Spring Integration의 releaseStrategy에서 사용됩니다.

2.1. 파일 읽기 (fileReadingFlow)

  • 구성:
    • Files.inboundAdapter: 지정된 INPUT_DIRECTORY에서 CSV 파일을 감지합니다.
    • useWatchService(true): 파일 변경 이벤트 감지를 활성화합니다.
    • watchEvents: 파일 생성(CREATE) 및 삭제(DELETE) 이벤트를 감지합니다.
    • poller: 파일 감시 주기를 1초로 설정합니다.
  • 동작:
    • CSV 파일이 감지되면 파일의 총 줄 수를 계산(calculateLineCountHeader)하여 헤더에 추가합니다.
    • 감지된 파일은 csv.split.channel로 전달됩니다.
  • 로깅:
    • 감지된 파일의 헤더와 내용을 로그에 출력합니다.

2.2. 파일 분할 (fileSplitterFlow)

  • 구성:
    • FileSplitter: CSV 파일을 줄 단위로 분할합니다.
    • 분할된 데이터는 csv.aggregate.channel로 전달됩니다.
  • 동작:
    • 파일의 각 줄을 분할하고, 공백을 제거(trim)합니다.
    • 각 줄의 내용을 로그에 출력합니다.

2.3. 데이터 처리 및 집계 (dataAggregatorFlow)

  • 구성:
    • csvDataProcessor.processLine: 각 줄을 비즈니스 로직에 따라 처리합니다.
    • aggregate:
      • correlationStrategy:
        • 동일 파일의 줄을 하나의 그룹으로 묶습니다.
        • 기본적으로 해더의 "correlationId" 값을 사용하지만 없는경우  "cvs_split" 문자열를 상관관계 ID로 사용합니다.
      • releaseStrategy:
        • 파일의 모든 줄이 그룹에 도착하면 릴리스합니다(group.size() == totalLines).
      • outputProcessor:
        • 그룹화된 메시지들을 JSON 배열 형식으로 병합합니다.
    • csv.output.channel로 결과 전달.
  • 로깅:
    • 집계 완료 후 헤더와 병합된 데이터를 로그로 출력합니다.

2.4. 결과 파일 쓰기 (fileWritingFlow)

  • 구성:
    • Files.outboundAdapter: 결과 데이터를 OUTPUT_DIRECTORY에 JSON 파일로 저장합니다.
    • 파일 이름: processed_<타임스탬프>.json.
    • 디렉토리가 없으면 자동으로 생성(autoCreateDirectory(true)).
  • 동작:
    • 집계된 데이터가 JSON 형식으로 저장됩니다.

2.5. 원본 파일 이동 (fileMoveFlow)

  • 구성:
    • Files.outboundAdapter: 원본 CSV 파일을 COMPLETE_DIRECTORY로 이동합니다.
    • 이동 후 원본 파일 삭제(deleteSourceFiles(true)).
  • 동작:
    • csv.split.channel에서 처리된 파일을 이동합니다.
728x90

전체 코드

1. Java DSL Config 코드

package kr.co.thekeytech.spring.eai.csv;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.splitter.FileSplitter;

import java.io.File;

@Configuration
public class CsvFileIntegrationConfig {
    private static final String INPUT_DIRECTORY = "/spring-eai/csv/INPUT";
    private static final String OUTPUT_DIRECTORY = "/spring-eai/csv/OUTPUT";
    private static final String COMPLETE_DIRECTORY = "/spring-eai/csv/COMPLETE";

    private static final Logger logger = LoggerFactory.getLogger(CsvFileIntegrationConfig.class);


    private long calculateLineCountHeader(File file) {
        try {
            return java.nio.file.Files.lines(file.toPath()).count(); // 파일의 총 줄 수 계산
        } catch (Exception e) {
            throw new RuntimeException("Error calculating line count for file", e);
        }
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
        return IntegrationFlow.from(Files.inboundAdapter(new File(INPUT_DIRECTORY))
                                .autoCreateDirectory(true)
                                .patternFilter("*.csv")
                                .useWatchService(true) // WatchService 활성화
                                .watchEvents(
                                        FileReadingMessageSource.WatchEventType.CREATE
                                        ,FileReadingMessageSource.WatchEventType.DELETE), // 파일 생성 이벤트만 감지,
                        e -> e.poller(Pollers.fixedDelay(1000)))
                .handle((payload, headers) -> {
                    logger.info("========> File InboundAdapter Handle Headers: {}", headers);
                    logger.info("========> File InboundAdapter Payload: {}", payload);
                    return payload;
                })
                .enrichHeaders(h -> {
                    h.headerFunction("totalLines", m -> calculateLineCountHeader((File) m.getPayload()));
                }) // 총 줄 수를 헤더에 추가
                .channel(c -> c.publishSubscribe("csv.split.channel"))
                .get();
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow.from("csv.split.channel")
                .split(new FileSplitter())
                .transform(String.class, String::trim)
                .handle((payload, headers) -> {
                    logger.info("========> Split Channel Handle Headers: {}", headers);
                    logger.info("========> Split Channel Payload: {}", payload);
                    return payload;
                })
                .channel("csv.aggregate.channel")
                .get();
    }

    @Bean
    public IntegrationFlow dataAggregatorFlow(CsvDataProcessor csvDataProcessor) {
        return IntegrationFlow.from("csv.aggregate.channel")
                .transform(csvDataProcessor, "processLine")
                .aggregate(a -> a
                        .correlationStrategy(m -> m.getHeaders().getOrDefault("correlationId", "cvs_split"))
                        .releaseStrategy(group -> {
                            long totalLines = (long) group.getOne().getHeaders().get("totalLines");
                            return group.size() == totalLines; // 모든 줄이 그룹에 들어오면 릴리스
                        }) // 그룹 크기와 상관없이 즉시 릴리스
                        .outputProcessor(group -> {
                            StringBuilder mergedData = new StringBuilder("[");
                            group.getMessages().forEach(msg -> mergedData.append(msg.getPayload()).append(","));
                            mergedData.append("]");
                            return mergedData.toString().replace(",]", "]");
                        }))
                .handle((payload, headers) -> {
                    logger.info("========> Aggregate After Handle Headers: {}", headers);
                    logger.info("========> Aggregate After Payload: {}", payload);
                    return payload;
                })
                .channel("csv.output.channel")
                .get();
    }

    @Bean
    public IntegrationFlow fileWritingFlow() {
        return IntegrationFlow.from("csv.output.channel")
                .handle(Files.outboundAdapter(new File(OUTPUT_DIRECTORY))
                        .fileNameGenerator(message -> "processed_" + System.currentTimeMillis() + ".json")
                        .autoCreateDirectory(true))
                .get();
    }

    @Bean
    public IntegrationFlow fileMoveFlow() {
        return IntegrationFlow.from("csv.split.channel")
                .handle(Files.outboundAdapter(new File(COMPLETE_DIRECTORY))
                        .autoCreateDirectory(true)
                        .deleteSourceFiles(true))
                .get();
    }
}

2. Service 코드

package kr.co.thekeytech.spring.eai.csv;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class CsvDataProcessor {
    private static final Logger logger = LoggerFactory.getLogger(CsvDataProcessor.class);

    public String processLine(String line) {
        // 예: CSV 행 데이터를 JSON으로 변환
        String[] parts = line.split(",");

        String jsonString = String.format("{\"name\": \"%s\", \"age\": \"%s\"}", parts[0], parts[1]);
        logger.info("==========> CSVDataProcessor processLine:{}", jsonString);
        return jsonString;
    }
}

 

전체 흐름

  1. 입력 감지 (fileReadingFlow):
    • INPUT_DIRECTORY에서 CSV 파일을 감지하고 csv.split.channel로 전달.
  2. 파일 분할 (fileSplitterFlow):
    • 파일을 줄 단위로 분할하고, 공백을 제거하여 csv.aggregate.channel로 전달.
  3. 데이터 처리 및 집계 (dataAggregatorFlow):
    • 각 줄을 처리 후 병합하여 JSON 배열 형식으로 변환.
    • 결과를 csv.output.channel로 전달.
  4. 결과 파일 저장 (fileWritingFlow):
    • 결과를 JSON 파일로 저장.
  5. 원본 파일 이동 및 삭제 (fileMoveFlow):
    • 처리된 CSV 파일을 COMPLETE_DIRECTORY로 이동.

주요 특징

  1. WatchService를 활용한 실시간 파일 감지:
    • 파일 생성 및 삭제를 실시간으로 감지.
  2. 유연한 데이터 처리 및 집계:
    • 각 줄을 비즈니스 로직으로 처리(csvDataProcessor.processLine).
    • 집계된 데이터를 JSON 배열로 병합.
  3. 안정적인 파일 처리:
    • 입력/출력/완료 디렉토리를 명확히 구분하여 데이터 손실 방지.
    • 원본 파일을 이동 후 삭제.
  4. 로깅 및 디버깅:
    • 각 단계의 헤더와 데이터를 로그로 출력하여 문제 해결 용이.

테스트 시나리오

  1. 입력 파일:
    • INPUT_DIRECTORY에 sample.csv 파일 생성.
    • 내용:
      John,25
      Jane,30
      Doe,40
  2. 실행 결과:
    • OUTPUT_DIRECTORY에 processed_<timestamp>.json 파일 생성.
    • 내용:
      [
          {"name": "John", "age": "25"},
          {"name": "Jane", "age": "30"},
          {"name": "Doe", "age": "40"}
      ]
  3. 원본 파일:
    • COMPLETE_DIRECTORY로 이동되며 삭제됨.

728x90
Comments