Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
Tags
- 파이썬
- JVM
- 자바암호
- chatGPT's answer
- 유닉스
- Database
- 시스템
- spring data jpa
- Spring boot
- 역학
- NIO
- oracle
- 리눅스
- kotlin
- 고전역학
- android
- 인프라
- 웹 크롤링
- write by GPT-4
- 데이터베이스
- 자바네트워크
- flet
- Java
- 소프트웨어공학
- GPT-4's answer
- 코틀린
- write by chatGPT
- jpa
- 자바
- python
Archives
- Today
- Total
Akashic Records
Spring Integration의 실용 사례 - 비동기 처리, email 본문
Spring Integration for Beginners
Spring Integration의 실용 사례 - 비동기 처리, email
Andrew's Akashic Records 2024. 12. 23. 09:55728x90
1. Spring Integration 소개
1.1 Spring Integration의 배경과 필요성
1.2 Spring Integration의 탄생과 역사
1.3 Spring Integration의 주요 특징
1.4 Spring Integration의 기본 개념
1.5 Spring Integration의 실용 사례
비동기 메시지 처리 및 큐 관리
Spring Integration은 비동기 메시지 처리와 메시지 큐 관리를 통해 높은 처리량과 확장성을 제공합니다. 이는 메시지가 큐(Queue)에 저장되어 처리가 필요한 시점에 소비자(Consumer)가 가져가 처리하도록 설계됩니다. 메시지 큐를 활용한 비동기 메시징은 병렬 처리, 성능 향상, 트래픽 완화 등에 효과적입니다.
구현 시나리오
목표
- 비동기 메시지 처리: 생산자(Producer)가 메시지를 큐에 저장하고, 소비자(Consumer)가 이를 처리.
- 메시지 큐 관리: 메시지가 순서대로 큐에 저장되고 소비자가 비동기적으로 처리.
- 처리 결과 로그 출력: 메시지 처리 과정을 콘솔에 출력.
구성 요소
- QueueChannel: 메시지를 저장하고 소비자가 가져갈 수 있도록 관리.
- 비동기 소비자: 큐에 저장된 메시지를 병렬로 처리.
코드 구현
package kr.co.thekeytech.spring.eai.asyncqueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.messaging.MessageChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@Configuration
@EnableIntegration
public class AsyncQueueConfig {
private final Logger logger = LoggerFactory.getLogger(AsyncQueueConfig.class);
// 1. 메시지 큐 정의 (QueueChannel)
@Bean
public MessageChannel messageQueue() {
return new QueueChannel(10); // 큐의 용량 설정 (10개 메시지 제한)
}
// 2. 생산자 (Producer)
@Bean
public IntegrationFlow producerFlow() {
return IntegrationFlow.from("producerChannel") // 입력 채널
.channel(messageQueue()) // 메시지를 큐에 저장
.get();
}
// 3. 소비자 (Consumer)
@Bean
public IntegrationFlow consumerFlow() {
return IntegrationFlow.from(messageQueue()) // 큐에서 메시지를 읽음
.handle(message -> {
logger.info("<===== Processed Message: {}", message.getPayload());
// 메시지 처리 로직 추가 가능
})
.get();
}
// 4. 병렬 소비자 설정 (비동기 처리)
@Bean
public Executor executor() {
return Executors.newFixedThreadPool(3); // 3개의 스레드로 병렬 처리
}
}
코드 설명
- 메시지 큐 정의
- QueueChannel은 메시지를 저장하고 소비자가 이를 처리할 수 있도록 관리합니다.
- 큐의 용량(new QueueChannel(10))을 설정하여 메시지 버퍼링을 제한할 수 있습니다.
- 생산자 (Producer)
- producerChannel에서 메시지를 생성하고 큐(messageQueue())에 저장합니다.
- 소비자 (Consumer)
- messageQueue()에서 메시지를 가져와 처리합니다.
- handle() 메서드를 사용하여 메시지 처리 로직을 정의합니다.
- 병렬 처리
- Executors.newFixedThreadPool(3)을 사용하여 3개의 스레드로 병렬 처리합니다.
- 스레드 풀 크기를 조정하여 처리량을 제어할 수 있습니다.
테스트
메시지 생성 코드
package kr.co.thekeytech.spring.eai.asyncqueue;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
@SpringBootTest
class AsyncQueueConfigTest {
private final Logger logger = LoggerFactory.getLogger(AsyncQueueConfigTest.class);
@Autowired
private MessageChannel producerChannel;
@Test
void send() {
try {
for (int i = 1; i <= 15; i++) {
String message = "Message " + i;
producerChannel.send(MessageBuilder.withPayload(message).build());
logger.info("=====> Sent: {}", message);
}
} catch (Exception e) {
logger.error("send error",e);
}
}
}
실행 흐름
- 생산자
- producerChannel에 15개의 메시지를 생성합니다.
- 메시지는 QueueChannel로 전달되고 저장됩니다.
- 소비자
- 소비자는 비동기로 메시지를 가져와 처리합니다.
- 병렬 처리가 활성화되어 3개의 스레드가 동시에 메시지를 처리합니다.
- 출력 결과
- 메시지가 비동기로 처리되며 처리된 순서는 스레드에 따라 달라질 수 있습니다.
출력 예시:
Sent: Message 1
Sent: Message 2
...
Sent: Message 15
Processed Message: Message 1
Processed Message: Message 2
Processed Message: Message 3
Processed Message: Message 4
...
확장 및 응용
- 큐 용량 조정
- QueueChannel(10)에서 큐 용량을 설정하여 메시지 제한을 관리.
- 오버플로우 처리
- 큐 용량 초과 시 추가 메시지를 대기하거나 오류를 처리하는 로직 추가.
- 재시도 로직
- 처리 실패 메시지를 다시 큐에 추가하는 재시도 메커니즘 구현.
- 비동기 작업 응용
- 대규모 데이터 처리, 외부 시스템과의 비동기 통신, 비즈니스 이벤트 처리 등에 적용 가능.
728x90
실시간 모니터링과 알림 시스템
Spring Integration을 활용하면 실시간 모니터링과 알림 시스템을 효율적으로 구축할 수 있습니다. 이 시스템은 데이터의 흐름을 감지하고, 이상 징후나 특정 이벤트 발생 시 실시간으로 알림을 보낼 수 있도록 설계됩니다.
실시간 모니터링 및 알림 시스템 구성 시나리오
목표
- 특정 디렉터리에서 새로운 파일이 생성되면 이를 실시간으로 감지.
- 생성된 파일의 정보를 모니터링 시스템에 전달.
- 특정 조건에 따라 알림 시스템(예: 이메일, Slack 등)으로 알림을 보냄.
구성 요소
- 파일 모니터링: 파일 생성, 수정, 삭제 등의 이벤트를 감지.
- 조건 필터링: 특정 조건에 따라 이벤트를 필터링.
- 알림 전송: 이메일, 메시징 시스템(Slack, Teams 등)으로 알림.
구현 예제
의존성
Spring Boot 프로젝트에서 실행하려면 아래 의존성을 추가해야 합니다.
implementation 'org.springframework.integration:spring-integration-mail'
아래는 Spring Integration을 사용하여 실시간 모니터링과 알림 시스템을 구축하는 코드입니다.
package kr.co.thekeytech.spring.eai.monitor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.core.GenericTransformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.mail.dsl.Mail;
import java.io.File;
import java.util.Date;
@Configuration
public class MonitoringConfig {
// 1. 파일 모니터링: 특정 디렉터리 감시
@Bean
public IntegrationFlow fileMonitoringFlow() {
return IntegrationFlow.from(Files.inboundAdapter(new File("/monitoring-directory"))
.autoCreateDirectory(true) // 디렉터리 자동 생성
.preventDuplicates(true) // 중복 파일 감지 방지
.patternFilter("*.txt")) // .txt 파일만 감지
.transform(new FileToAlertTransformer()) // 파일 정보를 알림 메시지로 변환
.channel("alertChannel") // 알림 채널로 전달
.get();
}
// 2. 알림 처리: 이메일로 알림 전송
@Bean
public IntegrationFlow alertNotificationFlow() {
return IntegrationFlow.from("alertChannel")
.filter(String.class, payload -> payload.contains("ALERT")) // 조건 필터링
.enrichHeaders(Mail.headers()
.subject("Monitoring Alert").from("xxxxx@gmail.com").to("zzzzzz@thekeytech.co.kr"))
.handle(Mail.outboundAdapter("smtp.gmail.com")
.port(587)
.credentials("xxxxxx@gmail.com", "passwordhere")
.javaMailProperties(p -> {
p.put("mail.smtp.auth", "true"); // SMTP 인증 활성화
p.put("mail.smtp.starttls.enable", "true"); // STARTTLS 활성화
p.put("mail.smtp.starttls.required", "true"); // STARTTLS 필수 설정
})
)
.get();
}
// 3. 파일 정보를 알림 메시지로 변환하는 변환기
public static class FileToAlertTransformer implements GenericTransformer<File, String> {
@Override
public String transform(File file) {
return String.format("ALERT: New file detected - %s at %s",
file.getName(), new Date());
}
}
}
코드 설명
- 파일 모니터링
- Files.inboundAdapter를 사용하여 /monitoring-directory 디렉터리를 실시간으로 감시합니다.
- .txt 파일만 감지하며, 중복 파일 이벤트를 방지하기 위해 preventDuplicates(true)를 설정했습니다.
- 파일 정보를 알림 메시지로 변환
- GenericTransformer를 사용하여 감지된 파일 정보를 알림 메시지로 변환합니다.
- 메시지는 String 형식으로 alertChannel에 전달됩니다.
- 조건 필터링
- filter를 사용하여 메시지 내용에 "ALERT" 문자열이 포함된 경우에만 처리하도록 설정합니다.
- 알림 전송
- Mail.outboundAdapter를 사용하여 이메일로 알림을 전송합니다.
- SMTP 서버 정보(smtp.example.com)와 인증 정보를 사용하여 이메일을 발송합니다.
테스트
1. 파일 생성 테스트
- /monitoring-directory에 test.txt 파일 생성.
- 시스템이 파일을 감지하고 ALERT 메시지를 생성.
- 이메일을 통해 알림 전송.
알림 메시지 예시
ALERT: New file detected - test.txt at Fri Dec 20 14:30:00 UTC 2024
확장 가능성
- Slack 또는 Teams 알림
- 이메일 대신 Slack 또는 Microsoft Teams API를 사용하여 알림을 보낼 수 있습니다.
- Http.outboundGateway를 활용해 API 호출을 추가합니다.
- 조건 필터링 확장
- 파일 크기, 파일 이름 패턴 등 특정 조건에 따라 알림을 필터링합니다.
- 예: filter(File.class, file -> file.length() > 1024).
- 데이터베이스 로깅
- 감지된 이벤트를 데이터베이스에 기록하여 이력을 관리합니다.
- Jdbc.outboundAdapter를 활용.
- SMS 알림
- SMS API(예: Twilio)를 활용하여 중요한 알림을 전송.
728x90
'Spring Integration for Beginners' 카테고리의 다른 글
Spring Integration의 실용 사례 - File, RestAPI (1) | 2024.12.20 |
---|---|
Spring Integration의 기본 개념 (1) | 2024.12.19 |
Spring Integration의 주요 특징 (3) | 2024.12.18 |
Spring Integration의 탄생과 역사 (0) | 2024.12.17 |
Spring Integration의 배경과 필요성 (1) | 2024.12.16 |
Comments