Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 시스템
- spring integration
- 리눅스
- jpa
- 데이터베이스
- 자바
- 역학
- spring data jpa
- write by GPT-4
- JVM
- 파이썬
- NIO
- Java
- android
- 인프라
- 코틀린
- oracle
- chatGPT's answer
- Database
- 웹 크롤링
- 자바암호
- kotlin
- GPT-4's answer
- 고전역학
- python
- 자바네트워크
- 소프트웨어공학
- write by chatGPT
- flet
- 유닉스
Archives
- Today
- Total
기억을 지배하는 기록
실용적인 통합 시나리오- 데이터베이스와의 통합 본문
Spring Integration for Beginners
실용적인 통합 시나리오- 데이터베이스와의 통합
Andrew's Akashic Records 2025. 1. 10. 14:36728x90
3. 실용적인 통합 시나리오
3.1 HTTP와 REST 통합
3.2 CSV 파일 데이터를 읽고, Splitter로 처리
3.3 데이터베이스와의 통합
데이터베이스와의 통합
주된 기능은 데이터 삽입, 변환, 저장, 그리고 주기적인 데이터 폴링 작업을 수행합니다.
1. DBProcessingConfig 클래스
DB와의 통합을 설정하는 주요 구성 클래스로, 데이터 흐름을 정의하고 JPA 기반 작업을 설정합니다.
- orderDBInboundFlow
- 역할: 데이터가 입력 채널(db.order.input.channel)로 들어오면, 데이터를 처리한 후 저장 채널(db.order.save.channel)로 전달합니다.
- 작업 흐름:
- AddItemOrderRequest를 ItemOrder 엔티티로 변환.
- 변환된 데이터를 db.order.save.channel로 전달.
- orderDBSaveFlow
- 역할: 데이터를 저장하고, 저장 결과를 변환한 후 로그를 기록합니다.
- 작업 흐름:
- DBOrderService의 joinItemOrder 메서드를 호출하여 ItemOrder 엔티티를 저장.
- 저장된 ItemOrder를 ItemOrderResponse로 변환.
- 결과를 로깅.
- jpaExecutor 및 jpaPollingChannelAdapter
- 역할: 주기적으로 JPQL 쿼리를 실행하여 특정 조건(sendYn = 'N')에 맞는 데이터를 가져옵니다.
- 설정:
- setJpaQuery: JPQL 쿼리를 정의.
- setDeleteAfterPoll: 데이터를 삭제하지 않고 유지.
- setExpectSingleResult: 다중 결과를 반환하도록 설정.
- pollingFlow
- 역할: JPA로 조회된 데이터를 변환 및 처리하여 출력 채널(db.order.output.channel)로 전달.
- 작업 흐름:
- JPA로 데이터를 조회.
- 조회 결과(List<ItemOrder>)를 List<ItemOrderResponse>로 변환.
- 결과를 로그에 기록 후 출력 채널로 전달.
2. DBOrderGateway
Spring Integration Gateway 인터페이스로, 클라이언트와 Spring Integration 흐름 간의 통신을 제공합니다.
- 기능:
- sendAddOrderRequest: 클라이언트가 데이터를 입력 채널(db.order.input.channel)로 보낼 수 있는 엔트리 포인트 제공.
- 요청 데이터를 ItemOrderResponse로 변환하여 반환.
3. DBOrderService
비즈니스 로직을 처리하는 서비스 계층으로, 데이터 저장 및 조회를 담당합니다.
- 주요 메서드:
- joinItemOrder: 입력 데이터를 저장.
- getItemOrders: 모든 ItemOrder를 조회.
- getItemOrder: 특정 ID로 ItemOrder를 조회.
- deleteItemOrder: 특정 ID로 데이터를 삭제.
- 특징:
- @Transactional을 사용하여 데이터베이스 작업을 트랜잭션 단위로 처리.
- 데이터 저장 후 ItemOrderResponse로 변환하여 반환.
4. DBProcessingConfigTest
테스트 클래스로, 설정된 통합 흐름을 검증합니다.
- 테스트 흐름:
- DBOrderGateway를 통해 AddItemOrderRequest를 db.order.input.channel로 전송.
- 반환된 ItemOrderResponse가 기대한 결과인지 검증.
- 출력 채널(db.order.output.channel)에서 데이터를 수신.
- 결과를 JSON으로 변환하여 로그 출력.
코드의 주요 흐름
- 입력 데이터 흐름:
- 클라이언트 → DBOrderGateway → db.order.input.channel → 데이터 변환 및 저장 → db.order.save.channel → 저장 결과 반환.
- 데이터 저장:
- DBOrderService → joinItemOrder → 데이터베이스에 저장 → 저장 결과 반환.
- 폴링 및 처리:
- JPA Polling → pollingFlow → db.order.output.channel로 결과 전송.
- 테스트 및 검증:
- 데이터 전송 및 저장, 출력 검증 → 메시지를 JSON으로 변환하여 최종 확인.
코드의 장점
- 구조화된 통합 흐름: 입력, 변환, 저장, 출력이 명확히 정의됨.
- 재사용 가능한 구성 요소: 각 작업이 독립적으로 설정되어 유지보수 용이.
- Spring Integration의 강력한 기능 활용:
- Gateway를 통해 통합 흐름과 클라이언트 간의 간단한 통신.
- JPA Polling으로 주기적인 데이터 조회.
- 트랜잭션 보장: 저장 및 수정 작업에 @Transactional 사용.
- 테스트 가능성: 통합 테스트가 포함되어 흐름의 동작을 보장.
728x90
전체코드
1. build.gradle
// Database
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.integration:spring-integration-jpa'
runtimeOnly 'com.h2database:h2'
2. application.yml
spring:
application:
name: eai
jpa:
show-sql: true
properties:
hibernate:
format_sql: true
defer-datasource-initialization: true
datasource:
url: jdbc:h2:mem:testdb
username: sa
h2:
console:
enabled: true
logging:
level:
org.springframework.integration: INFO
kr.co.thekeytech.spring.eai: INFO
3. ItemOrder Entity
package kr.co.thekeytech.spring.eai.domain;
import jakarta.persistence.*;
import lombok.*;
import org.springframework.data.annotation.CreatedDate;
import org.springframework.data.annotation.LastModifiedDate;
import java.io.Serializable;
import java.time.LocalDateTime;
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@Getter
@Entity
public class ItemOrder implements Serializable {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", updatable = false)
private Long id; // DB 테이블의 'id' 컬럼과 매칭
@Column(name="customer_name", nullable = false)
private String customerName;
@Column(name="item_name", nullable = false)
private String itemName;
@Column(name="quantity", nullable = false)
private int quantity;
@Column(name="send_yn", nullable = false)
private String sendYn = "N";
@CreatedDate // 엔티티가 생성될 때 생성 시간 저장
@Column(name ="created_at")
private LocalDateTime createdAt;
@LastModifiedDate // 엔티티가 수정될 때 수정 시간 저장
@Column(name ="updated_at")
private LocalDateTime updatedAt;
@Builder
public ItemOrder(String customerName, String itemName, int quantity) {
this.customerName = customerName;
this.itemName = itemName;
this.quantity = quantity;
}
public void update(int quantity, String sendYn) {
this.quantity = quantity;
this.sendYn = sendYn;
}
}
4. ItemOrder Repository
package kr.co.thekeytech.spring.eai.repository;
import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import org.springframework.data.jpa.repository.JpaRepository;
import java.util.List;
public interface ItemOrderRepository extends JpaRepository<ItemOrder, Long> {
List<ItemOrder> findBySendYn(String status);
}
5. ItemOrder Add Request DTO
package kr.co.thekeytech.spring.eai.dto;
import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
@NoArgsConstructor // 기본 생성자 추가
@AllArgsConstructor // 모든 필드 값을 파라미터로 받는 생성자 추가
@Getter
public class AddItemOrderRequest {
private String customerName;
private String itemName;
private int quantity;
public ItemOrder toEntity() { // 생성자를 사용해 객체 생성
return ItemOrder.builder()
.customerName(customerName)
.itemName(itemName)
.quantity(quantity)
.build();
}
@Override
public String toString() {
return "AddItemOrderRequest{" +
"customerName='" + customerName + '\'' +
", itemName='" + itemName + '\'' +
", quantity=" + quantity +
'}';
}
}
6. ItemOrder Response DTO
package kr.co.thekeytech.spring.eai.dto;
import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import lombok.Getter;
@Getter
public class ItemOrderResponse {
private final Long id; // DB 테이블의 'id' 컬럼과 매칭
private final String customerName;
private final String itemName;
private final int quantity;
private final String sendYn;
public ItemOrderResponse(ItemOrder itemOrder) {
this.id = itemOrder.getId();
this.customerName = itemOrder.getCustomerName();
this.itemName = itemOrder.getItemName();
this.quantity = itemOrder.getQuantity();
this.sendYn = itemOrder.getSendYn();
}
@Override
public String toString() {
return "ItemOrderResponse{" +
"id=" + id +
", customerName='" + customerName + '\'' +
", itemName='" + itemName + '\'' +
", quantity=" + quantity +
", sendYn='" + sendYn + '\'' +
'}';
}
}
7. Gateway code
package kr.co.thekeytech.spring.eai.db;
import kr.co.thekeytech.spring.eai.dto.AddItemOrderRequest;
import kr.co.thekeytech.spring.eai.dto.ItemOrderResponse;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface DBOrderGateway {
@Gateway(requestChannel = "db.order.input.channel") // inputChannel을 명시적으로 지정
ItemOrderResponse sendAddOrderRequest(AddItemOrderRequest addItemOrderRequest);
}
8. Java DSL Code
package kr.co.thekeytech.spring.eai.db;
import jakarta.persistence.EntityManagerFactory;
import kr.co.thekeytech.spring.eai.domain.ItemOrder;
import kr.co.thekeytech.spring.eai.dto.AddItemOrderRequest;
import kr.co.thekeytech.spring.eai.dto.ItemOrderResponse;
import kr.co.thekeytech.spring.eai.service.DBOrderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.jpa.core.JpaExecutor;
import org.springframework.integration.jpa.inbound.JpaPollingChannelAdapter;
import java.util.List;
@Configuration
public class DBProcessingConfig {
private static final Logger logger = LoggerFactory.getLogger(DBProcessingConfig.class);
@Bean
public IntegrationFlow orderDBInboundFlow() {
return IntegrationFlow.from("db.order.input.channel")
.handle((payload, headers) -> {
logger.info("========> Receive Handle Headers: {}", headers);
logger.info("========> Receive Handle Payload: {}", payload);
return payload;
})
.transform(AddItemOrderRequest.class, AddItemOrderRequest::toEntity)
.channel("db.order.save.channel") // 데이터를 처리할 채널
.get();
}
@Bean
public IntegrationFlow orderDBSaveFlow(DBOrderService orderService) {
return IntegrationFlow.from("db.order.save.channel")
.handle(orderService, "joinItemOrder")
.transform(ItemOrder.class, ItemOrderResponse::new)
.handle((payload, headers) -> {
logger.info("========> DB Save Headers: {}", headers);
logger.info("========> DB Save Payload: {}", payload);
return payload;
})
.get();
}
@Bean
public JpaExecutor jpaExecutor(EntityManagerFactory entityManagerFactory) {
JpaExecutor jpaExecutor = new JpaExecutor(entityManagerFactory);
jpaExecutor.setJpaQuery("SELECT o FROM ItemOrder o WHERE o.sendYn = 'N'"); // JPQL 쿼리
jpaExecutor.setDeleteAfterPoll(false); // 데이터를 삭제하지 않음
jpaExecutor.setExpectSingleResult(false); // 다수의 결과를 기대
return jpaExecutor;
}
@Bean
public JpaPollingChannelAdapter jpaPollingChannelAdapter(JpaExecutor jpaExecutor) {
return new JpaPollingChannelAdapter(jpaExecutor);
}
@Bean
public IntegrationFlow pollingFlow(JpaPollingChannelAdapter jpaPollingChannelAdapter) {
return IntegrationFlow.from(jpaPollingChannelAdapter,
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(List.class, payload -> {
if (payload instanceof List<?> items && items.stream().allMatch(ItemOrder.class::isInstance)) {
return items.stream()
.map(item -> new ItemOrderResponse((ItemOrder) item))
.toList();
} else {
throw new IllegalArgumentException("Invalid payload type. Expected List<ItemOrder>");
}
})
.handle((payload, headers) -> {
logger.info("========> JpaPollingChannelAdapter Handle Headers: {}", headers);
logger.info("========> JpaPollingChannelAdapter Payload: {}", payload);
return payload;
})
.channel("db.order.output.channel")
.get();
}
}
9. Test Code
package kr.co.thekeytech.spring.eai.db;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.thekeytech.spring.eai.dto.AddItemOrderRequest;
import kr.co.thekeytech.spring.eai.dto.ItemOrderResponse;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
class DBProcessingConfigTest {
@Autowired
private DBOrderGateway dbOrderGateway;
@Autowired
@Qualifier("db.order.output.channel")
private MessageChannel dbOrderOutputChannel;
@Test
void testFlow() throws JsonProcessingException {
AddItemOrderRequest addItemOrderRequest =
new AddItemOrderRequest("Andrew", "Apple TV", 1);
ItemOrderResponse itemOrderResponse = dbOrderGateway.sendAddOrderRequest(addItemOrderRequest);
assertThat(itemOrderResponse).isNotNull();
assertThat(itemOrderResponse).isInstanceOf(ItemOrderResponse.class);
System.out.println("## Final Object: \n" + itemOrderResponse);
QueueChannel outputQueue = new QueueChannel();
((DirectChannel) dbOrderOutputChannel).subscribe(outputQueue::send);
// 결과 출력
Message<?> outputMessage = outputQueue.receive(10000);
assertThat(outputMessage).isNotNull();
// JSON 파싱
ObjectMapper objectMapper = new ObjectMapper();
String objectJson = objectMapper.writeValueAsString(outputMessage.getPayload());
System.out.println("## Final OutPut: \n" + objectJson);
}
}
10.출력 Log
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> Receive Handle Headers: {replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, id=60cbd921-204b-95f3-1968-9db4b83bcf68, timestamp=1736484806046}
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> Receive Handle Payload: AddItemOrderRequest{customerName='Andrew', itemName='Apple TV', quantity=1}
Hibernate:
insert
into
item_order
(created_at, customer_name, item_name, quantity, send_yn, updated_at, id)
values
(?, ?, ?, ?, ?, ?, default)
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> DB Save Headers: {replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@d2161b6, id=148288ec-5370-949a-74b7-60b9791d7d2a, timestamp=1736484806220}
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> DB Save Payload: ItemOrderResponse{id=1, customerName='Andrew', itemName='Apple TV', quantity=1, sendYn='N'}
## Final Object:
ItemOrderResponse{id=1, customerName='Andrew', itemName='Apple TV', quantity=1, sendYn='N'}
2025-01-10 13:53:26 - o.s.i.channel.DirectChannel - Channel 'eai.db.order.output.channel' has 1 subscriber(s).
Hibernate:
select
io1_0.id,
io1_0.created_at,
io1_0.customer_name,
io1_0.item_name,
io1_0.quantity,
io1_0.send_yn,
io1_0.updated_at
from
item_order io1_0
where
io1_0.send_yn='N'
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> JpaPollingChannelAdapter Handle Headers: {id=5d5329df-5b05-eca6-e13a-fc794ba7f295, timestamp=1736484806902}
2025-01-10 13:53:26 - k.c.t.s.eai.db.DBProcessingConfig - ========> JpaPollingChannelAdapter Payload: [ItemOrderResponse{id=1, customerName='Andrew', itemName='Apple TV', quantity=1, sendYn='N'}]
## Final OutPut:
[{"id":1,"customerName":"Andrew","itemName":"Apple TV","quantity":1,"sendYn":"N"}]
728x90
'Spring Integration for Beginners' 카테고리의 다른 글
주요 Spring Integration 어노테이션 (0) | 2025.01.15 |
---|---|
실용적인 통합 시나리오- CSV 파일 데이터를 읽고, Splitter로 처리 (0) | 2025.01.09 |
실용적인 통합 시나리오- HTTP와 REST 통합 (0) | 2025.01.08 |
Splitter와 Aggregator - 예제: XML 데이터 처리 (0) | 2025.01.07 |
Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해 (0) | 2025.01.06 |
Comments