기억을 지배하는 기록

실용적인 통합 시나리오- 데이터베이스와의 통합 본문

Spring Integration for Beginners

실용적인 통합 시나리오- 데이터베이스와의 통합

Andrew's Akashic Records 2025. 1. 10. 14:36
728x90
3. 실용적인 통합 시나리오
3.1 HTTP와 REST 통합
3.2 CSV 파일 데이터를 읽고, Splitter로 처리
3.3 데이터베이스와의 통합

Spring Integration for Backend Developers

데이터베이스와의 통합

주된 기능은 데이터 삽입, 변환, 저장, 그리고 주기적인 데이터 폴링 작업을 수행합니다.

1. DBProcessingConfig 클래스

DB와의 통합을 설정하는 주요 구성 클래스로, 데이터 흐름을 정의하고 JPA 기반 작업을 설정합니다.

  1. orderDBInboundFlow
    • 역할: 데이터가 입력 채널(db.order.input.channel)로 들어오면, 데이터를 처리한 후 저장 채널(db.order.save.channel)로 전달합니다.
    • 작업 흐름:
      1. AddItemOrderRequest를 ItemOrder 엔티티로 변환.
      2. 변환된 데이터를 db.order.save.channel로 전달.
  2. orderDBSaveFlow
    • 역할: 데이터를 저장하고, 저장 결과를 변환한 후 로그를 기록합니다.
    • 작업 흐름:
      1. DBOrderService의 joinItemOrder 메서드를 호출하여 ItemOrder 엔티티를 저장.
      2. 저장된 ItemOrder를 ItemOrderResponse로 변환.
      3. 결과를 로깅.
  3. jpaExecutor 및 jpaPollingChannelAdapter
    • 역할: 주기적으로 JPQL 쿼리를 실행하여 특정 조건(sendYn = 'N')에 맞는 데이터를 가져옵니다.
    • 설정:
      • setJpaQuery: JPQL 쿼리를 정의.
      • setDeleteAfterPoll: 데이터를 삭제하지 않고 유지.
      • setExpectSingleResult: 다중 결과를 반환하도록 설정.
  4. pollingFlow
    • 역할: JPA로 조회된 데이터를 변환 및 처리하여 출력 채널(db.order.output.channel)로 전달.
    • 작업 흐름:
      1. JPA로 데이터를 조회.
      2. 조회 결과(List<ItemOrder>)를 List<ItemOrderResponse>로 변환.
      3. 결과를 로그에 기록 후 출력 채널로 전달.

2. DBOrderGateway

Spring Integration Gateway 인터페이스로, 클라이언트와 Spring Integration 흐름 간의 통신을 제공합니다.

  • 기능:
    • sendAddOrderRequest: 클라이언트가 데이터를 입력 채널(db.order.input.channel)로 보낼 수 있는 엔트리 포인트 제공.
    • 요청 데이터를 ItemOrderResponse로 변환하여 반환.

3. DBOrderService

비즈니스 로직을 처리하는 서비스 계층으로, 데이터 저장 및 조회를 담당합니다.

  • 주요 메서드:
    • joinItemOrder: 입력 데이터를 저장.
    • getItemOrders: 모든 ItemOrder를 조회.
    • getItemOrder: 특정 ID로 ItemOrder를 조회.
    • deleteItemOrder: 특정 ID로 데이터를 삭제.
  • 특징:
    • @Transactional을 사용하여 데이터베이스 작업을 트랜잭션 단위로 처리.
    • 데이터 저장 후 ItemOrderResponse로 변환하여 반환.

4. DBProcessingConfigTest

테스트 클래스로, 설정된 통합 흐름을 검증합니다.

  • 테스트 흐름:
    1. DBOrderGateway를 통해 AddItemOrderRequest를 db.order.input.channel로 전송.
    2. 반환된 ItemOrderResponse가 기대한 결과인지 검증.
    3. 출력 채널(db.order.output.channel)에서 데이터를 수신.
    4. 결과를 JSON으로 변환하여 로그 출력.

코드의 주요 흐름

  1. 입력 데이터 흐름:
    • 클라이언트 → DBOrderGateway → db.order.input.channel → 데이터 변환 및 저장 → db.order.save.channel → 저장 결과 반환.
  2. 데이터 저장:
    • DBOrderService → joinItemOrder → 데이터베이스에 저장 → 저장 결과 반환.
  3. 폴링 및 처리:
    • JPA Polling → pollingFlow → db.order.output.channel로 결과 전송.
  4. 테스트 및 검증:
    • 데이터 전송 및 저장, 출력 검증 → 메시지를 JSON으로 변환하여 최종 확인.

코드의 장점

  1. 구조화된 통합 흐름: 입력, 변환, 저장, 출력이 명확히 정의됨.
  2. 재사용 가능한 구성 요소: 각 작업이 독립적으로 설정되어 유지보수 용이.
  3. Spring Integration의 강력한 기능 활용:
    • Gateway를 통해 통합 흐름과 클라이언트 간의 간단한 통신.
    • JPA Polling으로 주기적인 데이터 조회.
  4. 트랜잭션 보장: 저장 및 수정 작업에 @Transactional 사용.
  5. 테스트 가능성: 통합 테스트가 포함되어 흐름의 동작을 보장.
728x90

전체코드

1. build.gradle

// Database
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.integration:spring-integration-jpa'
runtimeOnly 'com.h2database:h2'

 

2. application.yml

spring:
  application:
    name: eai

  jpa:
    show-sql: true
    properties:
      hibernate:
        format_sql: true
    defer-datasource-initialization: true

  datasource:
    url: jdbc:h2:mem:testdb
    username: sa

  h2:
    console:
      enabled: true

logging:
  level:
    org.springframework.integration: INFO
    kr.co.thekeytech.spring.eai: INFO

 

3. ItemOrder Entity

package kr.co.thekeytech.spring.eai.domain;

import jakarta.persistence.*;
import lombok.*;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;

import java.io.Serializable;
import java.time.LocalDateTime;

@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@Entity
public class ItemOrder implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id", updatable = false)
    private Long id; // DB 테이블의 'id' 컬럼과 매칭

    @Column(name="customer_name", nullable = false)
    private String customerName;

    @Column(name="item_name", nullable = false)
    private String itemName;

    @Column(name="quantity", nullable = false)
    private int quantity;

    @Column(name="send_yn", nullable = false)
    private String sendYn = "N";

    @CreatedDate // 엔티티가 생성될 때 생성 시간 저장
    @Column(name ="created_at")
    private LocalDateTime createdAt;

    @LastModifiedDate // 엔티티가 수정될 때 수정 시간 저장
    @Column(name ="updated_at")
    private LocalDateTime updatedAt;

    @Builder
    public ItemOrder(String customerName, String itemName, int quantity) {
        this.customerName = customerName;
        this.itemName = itemName;
        this.quantity = quantity;
    }

    public void update(int quantity, String sendYn) {
        this.quantity = quantity;
        this.sendYn = sendYn;
    }
}

4. ItemOrder Repository

package kr.co.thekeytech.spring.eai.repository;

import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import org.springframework.data.jpa.repository.JpaRepository;

import java.util.List;

public interface ItemOrderRepository extends JpaRepository<ItemOrder, Long> {
    List<ItemOrder> findBySendYn(String status);
}

5. ItemOrder Add Request DTO

package kr.co.thekeytech.spring.eai.dto;

import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;

@NoArgsConstructor // 기본 생성자 추가
@AllArgsConstructor // 모든 필드 값을 파라미터로 받는 생성자 추가
@Getter
public class AddItemOrderRequest {

    private String customerName;
    private String itemName;
    private int quantity;

    public ItemOrder toEntity() { // 생성자를 사용해 객체 생성
        return ItemOrder.builder()
                .customerName(customerName)
                .itemName(itemName)
                .quantity(quantity)
                .build();
    }

    @Override
    public String toString() {
        return "AddItemOrderRequest{" +
                "customerName='" + customerName + '\'' +
                ", itemName='" + itemName + '\'' +
                ", quantity=" + quantity +
                '}';
    }
}

6. ItemOrder Response DTO

package kr.co.thekeytech.spring.eai.dto;

import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import lombok.Getter;

@Getter
public class ItemOrderResponse {

    private final Long id; // DB 테이블의 'id' 컬럼과 매칭
    private final String customerName;
    private final String itemName;
    private final int quantity;
    private final String sendYn;

    public ItemOrderResponse(ItemOrder itemOrder) {
        this.id = itemOrder.getId();
        this.customerName = itemOrder.getCustomerName();
        this.itemName = itemOrder.getItemName();
        this.quantity = itemOrder.getQuantity();
        this.sendYn = itemOrder.getSendYn();
    }

    @Override
    public String toString() {
        return "ItemOrderResponse{" +
                "id=" + id +
                ", customerName='" + customerName + '\'' +
                ", itemName='" + itemName + '\'' +
                ", quantity=" + quantity +
                ", sendYn='" + sendYn + '\'' +
                '}';
    }
}

7. Gateway code

package kr.co.thekeytech.spring.eai.db;

import kr.co.thekeytech.spring.eai.dto.AddItemOrderRequest;
import kr.co.thekeytech.spring.eai.dto.ItemOrderResponse;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface DBOrderGateway {
    @Gateway(requestChannel = "db.order.input.channel") // inputChannel을 명시적으로 지정
    ItemOrderResponse sendAddOrderRequest(AddItemOrderRequest addItemOrderRequest);
}

8. Java DSL Code

package kr.co.thekeytech.spring.eai.db;

import jakarta.persistence.EntityManagerFactory;
import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import kr.co.thekeytech.spring.eai.dto.AddItemOrderRequest;
import kr.co.thekeytech.spring.eai.dto.ItemOrderResponse;
import kr.co.thekeytech.spring.eai.service.DBOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jpa.core.JpaExecutor;
import org.springframework.integration.jpa.inbound.JpaPollingChannelAdapter;

import java.util.List;

@Configuration
public class DBProcessingConfig {

    private static final Logger logger = LoggerFactory.getLogger(DBProcessingConfig.class);

    @Bean
    public IntegrationFlow orderDBInboundFlow() {
        return IntegrationFlow.from("db.order.input.channel")
                .handle((payload, headers) -> {
                    logger.info("========> Receive Handle Headers: {}", headers);
                    logger.info("========> Receive Handle Payload: {}", payload);
                    return payload;
                })
                .transform(AddItemOrderRequest.class, AddItemOrderRequest::toEntity)
                .channel("db.order.save.channel") // 데이터를 처리할 채널
                .get();
    }

    @Bean
    public IntegrationFlow orderDBSaveFlow(DBOrderService orderService) {
        return IntegrationFlow.from("db.order.save.channel")
                .handle(orderService, "joinItemOrder")
                .transform(ItemOrder.class, ItemOrderResponse::new)
                .handle((payload, headers) -> {
                    logger.info("========> DB Save Headers: {}", headers);
                    logger.info("========> DB Save Payload: {}", payload);
                    return payload;
                })
                .get();
    }

    @Bean
    public JpaExecutor jpaExecutor(EntityManagerFactory entityManagerFactory) {
        JpaExecutor jpaExecutor = new JpaExecutor(entityManagerFactory);
        jpaExecutor.setJpaQuery("SELECT o FROM ItemOrder o WHERE o.sendYn = 'N'"); // JPQL 쿼리
        jpaExecutor.setDeleteAfterPoll(false); // 데이터를 삭제하지 않음
        jpaExecutor.setExpectSingleResult(false); // 다수의 결과를 기대
        return jpaExecutor;
    }

    @Bean
    public JpaPollingChannelAdapter jpaPollingChannelAdapter(JpaExecutor jpaExecutor) {
        return new JpaPollingChannelAdapter(jpaExecutor);
    }

    @Bean
    public IntegrationFlow pollingFlow(JpaPollingChannelAdapter jpaPollingChannelAdapter) {
        return IntegrationFlow.from(jpaPollingChannelAdapter,
                        e -> e.poller(Pollers.fixedDelay(1000)))
                .transform(List.class, payload -> {
                    if (payload instanceof List<?> items && items.stream().allMatch(ItemOrder.class::isInstance)) {
                        return items.stream()
                                .map(item -> new ItemOrderResponse((ItemOrder) item))
                                .toList();
                    } else {
                        throw new IllegalArgumentException("Invalid payload type. Expected List<ItemOrder>");
                    }
                })
                .handle((payload, headers) -> {
                    logger.info("========> JpaPollingChannelAdapter Handle Headers: {}", headers);
                    logger.info("========> JpaPollingChannelAdapter Payload: {}", payload);

                    return payload;
                })
                .channel("db.order.output.channel")
                .get();
    }
}

9. Test Code

package kr.co.thekeytech.spring.eai.db;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.thekeytech.spring.eai.dto.AddItemOrderRequest;
import kr.co.thekeytech.spring.eai.dto.ItemOrderResponse;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
class DBProcessingConfigTest {

    @Autowired
    private DBOrderGateway dbOrderGateway;


    @Autowired
    @Qualifier("db.order.output.channel")
    private MessageChannel dbOrderOutputChannel;

    @Test
    void testFlow() throws JsonProcessingException {
        AddItemOrderRequest addItemOrderRequest =
                new AddItemOrderRequest("Andrew", "Apple TV", 1);
        ItemOrderResponse itemOrderResponse = dbOrderGateway.sendAddOrderRequest(addItemOrderRequest);

        assertThat(itemOrderResponse).isNotNull();
        assertThat(itemOrderResponse).isInstanceOf(ItemOrderResponse.class);

        System.out.println("## Final Object: \n" + itemOrderResponse);

        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) dbOrderOutputChannel).subscribe(outputQueue::send);

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(10000);
        assertThat(outputMessage).isNotNull();


        // JSON 파싱
        ObjectMapper objectMapper = new ObjectMapper();
        String objectJson = objectMapper.writeValueAsString(outputMessage.getPayload());


        System.out.println("## Final OutPut: \n" + objectJson);

    }
}

10.출력 Log

2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> Receive Handle Headers: {replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, id=60cbd921-204b-95f3-1968-9db4b83bcf68, timestamp=1736484806046}
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> Receive Handle Payload: AddItemOrderRequest{customerName='Andrew', itemName='Apple TV', quantity=1}
Hibernate: 
    insert 
    into
        item_order
        (created_at, customer_name, item_name, quantity, send_yn, updated_at, id) 
    values
        (?, ?, ?, ?, ?, ?, default)

2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> DB Save Headers: {replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, id=148288ec-5370-949a-74b7-60b9791d7d2a, timestamp=1736484806220}
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> DB Save Payload: ItemOrderResponse{id=1, customerName='Andrew', itemName='Apple TV', quantity=1, sendYn='N'}
## Final Object: 
ItemOrderResponse{id=1, customerName='Andrew', itemName='Apple TV', quantity=1, sendYn='N'}
2025-01-10 13:53:26 - o.s.i.channel.DirectChannel - Channel 'eai.db.order.output.channel' has 1 subscriber(s).
Hibernate: 
    select
        io1_0.id,
        io1_0.created_at,
        io1_0.customer_name,
        io1_0.item_name,
        io1_0.quantity,
        io1_0.send_yn,
        io1_0.updated_at 
    from
        item_order io1_0 
    where
        io1_0.send_yn='N'
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> JpaPollingChannelAdapter Handle Headers: {id=5d5329df-5b05-eca6-e13a-fc794ba7f295, timestamp=1736484806902}
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> JpaPollingChannelAdapter Payload: [ItemOrderResponse{id=1, customerName='Andrew', itemName='Apple TV', quantity=1, sendYn='N'}]
## Final OutPut: 
[{"id":1,"customerName":"Andrew","itemName":"Apple TV","quantity":1,"sendYn":"N"}]

728x90
Comments