Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 리눅스
- Database
- 시스템
- GPT-4's answer
- JVM
- write by chatGPT
- python
- 고전역학
- chatGPT's answer
- flet
- NIO
- 웹 크롤링
- 자바암호
- 유닉스
- oracle
- spring data jpa
- 자바네트워크
- 소프트웨어공학
- write by GPT-4
- spring integration
- 코틀린
- 역학
- 파이썬
- 자바
- 인프라
- Java
- android
- jpa
- kotlin
- 데이터베이스
Archives
- Today
- Total
Akashic Records
실용적인 통합 시나리오- HTTP와 REST 통합 본문
Spring Integration for Beginners
실용적인 통합 시나리오- HTTP와 REST 통합
Andrew's Akashic Records 2025. 1. 8. 13:02728x90
3. 실용적인 통합 시나리오
3.1 HTTP와 REST 통합
HTTP와 REST 통합
Spring Integration에서 HTTP Adapter와 REST API를 사용하여 데이터를 통합하고 전송하는 방법을 소개합니다. 이 시나리오에서는 HTTP Inbound Adapter와 Outbound Gateway를 활용하여 RESTful API와의 데이터 전송을 구현합니다.
1. 요구사항
- HTTP 요청 수신:
- 클라이언트로부터 데이터를 수신.
- RESTful API 호출:
- 수신된 데이터를 외부 REST API로 전송.
- 응답 데이터 처리:
- REST API의 응답 데이터를 처리 후 반환.
2. 시나리오
- Http Inbound로 전달 받은 블로그 postId로 블로그 정보를 가져와 내용을 수정후 다시 저장한다.
- URL "/receive"로 Http.inboundGateway을 지정한다.
- "/receive" URL에 Post 방식으로 호출되면 Request Body에 postId 값만 문자열로 전달 한다.
- GET "https://jsonplaceholder.typicode.com/posts/{postId} 로 부터 블로그 내용을 가져온다.
- 가저온 블로그 내용을 수정한다.
- PUT "https://jsonplaceholder.typicode.com/posts/{postId}로 수정된 내용을 저장한다.
- 클라이언트에게 수정된 블로그 내용을 전달한다.
- Http InboudGateway 말고 DiectChannel로 동일한 Flow을 수행할 수 있게 한다.
3. 구현
3.1 HTTP 요청 수신
Spring Integration의 HTTP Inbound Gateway를 사용하여 HTTP 요청을 수신합니다.
@Bean
public IntegrationFlow httpInboundFlow() {
return IntegrationFlow.from(Http.inboundGateway("/receive")
.requestPayloadType(String.class)) // 요청 데이터를 문자열로 변환
.enrichHeaders(h -> {
// 헤더 추가
h.headerFunction("postId", m -> m.getPayload());
})
.handle((payload, headers) -> {
logger.info("========> Receive Handle Headers: {}", headers);
logger.info("========> Receive Handle Payload: {}", payload);
return payload;
})
.channel("http.get.rest.channel") // 데이터를 처리할 채널
.get();
}
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlow.from("http.input.channel") // 요청 데이터를 문자열로 변환
.handle((payload, headers) -> {
logger.info("========> Receive Handle Headers: {}", headers);
logger.info("========> Receive Handle Payload: {}", payload);
return payload;
})
.channel("http.get.rest.channel") // 데이터를 처리할 채널
.get();
}
- 설명:
- /receive 엔드포인트에서 HTTP 요청을 수신.
- 요청 Payload를 문자열로 변환하여 Message.Header에 postId로 저장한다.
- REST API 호출 채널를 지정한다.
- "inboundFlow" Bean은 DiectChannel로 동일한 Flow을 수행한다.
3.2 REST API 호출
Spring Integration의 HTTP Outbound Gateway를 사용하여 외부 REST API를 호출합니다.
@Bean
public IntegrationFlow restApiFlow() {
return IntegrationFlow.from("http.get.rest.channel") // HTTP 요청 데이터 처리 채널
.handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
.httpMethod(HttpMethod.GET) // POST 메서드 사용
.expectedResponseType(String.class)) // 응답 데이터를 문자열로 처리
.handle((payload, headers) -> {
logger.info("========> GET Rest Handle Headers: {}", headers);
logger.info("========> GET RestHandle Payload: {}", payload);
return payload;
})
.channel("http.process.rest.channel") // 응답 데이터를 전달할 채널
.get();
}
- 설명:
- 외부 API URL: https://jsonplaceholder.typicode.com/posts/{postId}
- 요청 데이터를 GET 방식으로 요청.
- API의 응답 데이터를 문자열로 수신.
- 블로그 내용을 수정할 채널로 전달한다.
3.3 응답 데이터 처리
REST API의 응답 데이터를 가공하여 클라이언트로 반환합니다.
@Bean
public IntegrationFlow httpResponseFlow() {
return IntegrationFlow.from("http.process.rest.channel")
.transform(String.class, payload -> {
try {
logger.info("========> Process Payload: {}", payload);
// JSON 파싱
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(payload);
// 특정 항목(body) 수정
((ObjectNode) rootNode).put("body", "Updated Body Content");
// 수정된 JSON 문자열 반환
logger.info("========> Process newPayload: {}", objectMapper.writeValueAsString(rootNode));
return rootNode;
} catch (Exception e) {
throw new RuntimeException("Error processing JSON response", e);
}
})
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
.httpMethod(HttpMethod.PUT)
.expectedResponseType(String.class)
.headerMapper(new DefaultHttpHeaderMapper())) // 기본 헤더 매핑) // 가공된 데이터를 반환
.handle((payload, headers) -> {
logger.info("========> After Handle Headers: {}", headers);
logger.info("========> After Handle Payload: {}", payload);
return payload;
})
.route(Message.class, message -> "inline".equals(message.getHeaders().get("inputType")), mapping -> mapping
.subFlowMapping(true, subFlow -> subFlow
.channel("http.output.channel"))
.subFlowMapping(false, subuFlow -> subuFlow
.handle((paylod, headers) -> {
logger.info("Call By HttpInput");
return paylod;
})))
.get();
}
- 설명:
- 문자열의 paylod을 Json으로 변환하여 블로그의 Body 내용을 수정하는 transform
- 수정된 블로그 내용을 REST API, PUT https://jsonplaceholder.typicode.com/posts/{postId}로 전달
- 최초 시행이 Http InboundGateway로 진행되었는지, DiectChannel로 진행되었는지에 따라 Route로 지정
3.4 Java DSL 전체 코드
package kr.co.thekeytech.spring.eai.http;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.http.support.DefaultHttpHeaderMapper;
import org.springframework.messaging.Message;
@Configuration
public class HttpProcessingConfig {
private static final Logger logger = LoggerFactory.getLogger(HttpProcessingConfig.class);
@Bean
public IntegrationFlow httpInboundFlow() {
return IntegrationFlow.from(Http.inboundGateway("/receive")
.requestPayloadType(String.class)) // 요청 데이터를 문자열로 변환
.enrichHeaders(h -> {
// 헤더 추가
h.headerFunction("postId", m -> m.getPayload());
})
.handle((payload, headers) -> {
logger.info("========> Receive Handle Headers: {}", headers);
logger.info("========> Receive Handle Payload: {}", payload);
return payload;
})
.channel("http.get.rest.channel") // 데이터를 처리할 채널
.get();
}
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlow.from("http.input.channel") // 요청 데이터를 문자열로 변환
.handle((payload, headers) -> {
logger.info("========> Receive Handle Headers: {}", headers);
logger.info("========> Receive Handle Payload: {}", payload);
return payload;
})
.channel("http.get.rest.channel") // 데이터를 처리할 채널
.get();
}
@Bean
public IntegrationFlow restApiFlow() {
return IntegrationFlow.from("http.get.rest.channel") // HTTP 요청 데이터 처리 채널
.handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
.httpMethod(HttpMethod.GET) // POST 메서드 사용
.expectedResponseType(String.class)) // 응답 데이터를 문자열로 처리
.handle((payload, headers) -> {
logger.info("========> GET Rest Handle Headers: {}", headers);
logger.info("========> GET RestHandle Payload: {}", payload);
return payload;
})
.channel("http.process.rest.channel") // 응답 데이터를 전달할 채널
.get();
}
@Bean
public IntegrationFlow httpResponseFlow() {
return IntegrationFlow.from("http.process.rest.channel")
.transform(String.class, payload -> {
try {
logger.info("========> Process Payload: {}", payload);
// JSON 파싱
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(payload);
// 특정 항목(body) 수정
((ObjectNode) rootNode).put("body", "Updated Body Content");
// 수정된 JSON 문자열 반환
logger.info("========> Process newPayload: {}", objectMapper.writeValueAsString(rootNode));
return rootNode;
} catch (Exception e) {
throw new RuntimeException("Error processing JSON response", e);
}
})
.enrichHeaders(h -> h.header("Content-Type", "application/json"))
.handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
.httpMethod(HttpMethod.PUT)
.expectedResponseType(String.class)
.headerMapper(new DefaultHttpHeaderMapper())) // 기본 헤더 매핑) // 가공된 데이터를 반환
.handle((payload, headers) -> {
logger.info("========> After Handle Headers: {}", headers);
logger.info("========> After Handle Payload: {}", payload);
return payload;
})
.route(Message.class, message -> "inline".equals(message.getHeaders().get("inputType")), mapping -> mapping
.subFlowMapping(true, subFlow -> subFlow
.channel("http.output.channel"))
.subFlowMapping(false, subuFlow -> subuFlow
.handle((paylod, headers) -> {
logger.info("Call By HttpInput");
return paylod;
})))
.get();
}
}
3.5 Gateway Code
package kr.co.thekeytech.spring.eai.http;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway
public interface HttpPostGateway {
@Gateway(requestChannel = "http.input.channel") // inputChannel을 명시적으로 지정
void sendMessage(String payload, @Header("postId") String postId, @Header("inputType") String inputType);
}
- 설명:
- DiectChannel로 사용될 Gateway
- DSL 코드에서 route 조건으로 사용하기 위해 "inputType" Header을 지정한다.
3.6 테스트 코드
package kr.co.thekeytech.spring.eai.http;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
@SpringBootTest
@AutoConfigureMockMvc
class HttpProcessingConfigTest {
@Autowired
private HttpPostGateway httpPostGateway;
@Autowired
@Qualifier("http.output.channel")
private MessageChannel httpOutputChannel;
@Autowired
private MockMvc mockMvc;
@Test
void testFlow() {
QueueChannel outputQueue = new QueueChannel();
((DirectChannel) httpOutputChannel).subscribe(outputQueue::send);
// DiectChannel 사용
httpPostGateway.sendMessage("Http Post Api Test", "1", "inline");
// 결과 출력
Message<?> outputMessage = outputQueue.receive(1000);
assertThat(outputMessage).isNotNull();
System.out.println("## Final JSON: \n" + outputMessage.getPayload());
}
@Test
void testHttpInboundFlow() throws Exception {
// 요청 Payload
String requestBody = "1";
// POST 요청 실행
MvcResult result = mockMvc.perform(post("/receive")
.contentType("application/json")
.content(requestBody))
.andExpect(status().isOk()) // 200 OK 응답 확인
.andReturn();
// 결과 확인
System.out.println(result.getResponse().getContentAsString());
}
}
728x90
4. 실행결과
4.1 Http InboundGateway 결과
2025-01-08 12:50:58 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Headers: {http_requestMethod=POST, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, http_requestUrl=http://localhost/receive, postId=1, id=c0e8c1b6-fd7f-13a6-f4bb-cb8c8baec4e6, Content-Length=1, contentType=application/json;charset=UTF-8, timestamp=1736308258973}
2025-01-08 12:50:58 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Payload: 1
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET Rest Handle Headers: {http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308266000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", http_requestUrl=http://localhost/receive, Vary=Origin, Accept-Encoding, id=851499ad-a510-d356-e86c-4cb48c45a632, Content-Length=292, contentType=application/json;charset=utf-8, Age=26902, timestamp=1736308260021}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET RestHandle Payload: {
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process Payload: {
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process newPayload: {"userId":1,"id":1,"title":"sunt aut facere repellat provident occaecati excepturi optio reprehenderit","body":"Updated Body Content"}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Headers: {http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308266000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", http_requestUrl=http://localhost/receive, Vary=Origin, Accept-Encoding, id=d937d263-e0a4-2304-be6a-e0a78f561f5e, Content-Length=292, contentType=application/json;charset=utf-8, Age=26902, Content-Type=application/json, timestamp=1736308260763}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Payload: {
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "Updated Body Content"
}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - Call By HttpInput
...
{
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "Updated Body Content"
}
4.2 DiectChannel 실행결과
2025-01-08 12:57:42 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Headers: {replyChannel=nullChannel, inputType=inline, errorChannel=, postId=1, id=04788349-8986-ad33-d0b6-58f6ec85e6aa, timestamp=1736308662807}
2025-01-08 12:57:42 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Payload: Http Post Api Test
...
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET Rest Handle Headers: {errorChannel=, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308670000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=nullChannel, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", Vary=Origin, Accept-Encoding, inputType=inline, id=f86989bc-57a8-4442-62c4-e0c281fd5753, Content-Length=292, contentType=application/json;charset=utf-8, Age=18619, timestamp=1736308663739}
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET RestHandle Payload: {
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
...
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process Payload: {
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process newPayload: {"userId":1,"id":1,"title":"sunt aut facere repellat provident occaecati excepturi optio reprehenderit","body":"Updated Body Content"}
...
2025-01-08 12:57:44 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Headers: {errorChannel=, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308670000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=nullChannel, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", Vary=Origin, Accept-Encoding, inputType=inline, id=487ef00d-4e69-b200-50dc-f7aa2f6e0495, Content-Length=292, contentType=application/json;charset=utf-8, Age=18619, Content-Type=application/json, timestamp=1736308664463}
2025-01-08 12:57:44 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Payload: {
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "Updated Body Content"
}
...
## Final JSON:
{
"userId": 1,
"id": 1,
"title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
"body": "Updated Body Content"
}
5. 확장 가능성
- 데이터 검증:
- HTTP 요청 데이터의 유효성을 검증하여 잘못된 데이터를 차단.
- 다양한 HTTP 메서드 지원:
- GET, PUT, DELETE 등 다양한 메서드를 지원하는 플로우 추가.
- 에러 처리:
- 외부 API 호출 실패 시 에러 채널(errorChannel)로 메시지를 전달하고 복구 로직 추가.
- 다중 API 호출:
- 여러 REST API를 순차적으로 호출하거나 병렬 처리.
- 동적 API URL 설정:
- 요청 데이터에 따라 API URL을 동적으로 결정.
728x90
'Spring Integration for Beginners' 카테고리의 다른 글
Splitter와 Aggregator - 예제: XML 데이터 처리 (0) | 2025.01.07 |
---|---|
Splitter와 Aggregator - 메시지 분할(Split)과 집계(Aggregate) 이해 (0) | 2025.01.06 |
Service Activator와 Message Handler - Custom Message Handler 작성 (2) | 2025.01.03 |
Service Activator와 Message Handler - Service Activator를 사용한 비즈니스 로직 구현 (0) | 2025.01.02 |
Message Filter와 Router - 동적 라우팅 구현 (0) | 2024.12.27 |
Comments