Akashic Records

실용적인 통합 시나리오- HTTP와 REST 통합 본문

Spring Integration for Beginners

실용적인 통합 시나리오- HTTP와 REST 통합

Andrew's Akashic Records 2025. 1. 8. 13:02
728x90
3. 실용적인 통합 시나리오
3.1 HTTP와 REST 통합

Spring Integration for Backend Developers

HTTP와 REST 통합

Spring Integration에서 HTTP AdapterREST API를 사용하여 데이터를 통합하고 전송하는 방법을 소개합니다. 이 시나리오에서는 HTTP Inbound AdapterOutbound Gateway를 활용하여 RESTful API와의 데이터 전송을 구현합니다.

1. 요구사항

  1. HTTP 요청 수신:
    • 클라이언트로부터 데이터를 수신.
  2. RESTful API 호출:
    • 수신된 데이터를 외부 REST API로 전송.
  3. 응답 데이터 처리:
    • REST API의 응답 데이터를 처리 후 반환.

2. 시나리오

  1. Http Inbound로 전달 받은 블로그 postId로 블로그 정보를 가져와 내용을 수정후 다시 저장한다.
  2. URL "/receive"로 Http.inboundGateway을 지정한다.
  3. "/receive" URL에 Post 방식으로 호출되면 Request Body에 postId 값만 문자열로 전달 한다.
  4. GET "https://jsonplaceholder.typicode.com/posts/{postId} 로 부터 블로그 내용을 가져온다.
  5. 가저온 블로그 내용을 수정한다.
  6. PUT "https://jsonplaceholder.typicode.com/posts/{postId}로 수정된 내용을 저장한다.
  7. 클라이언트에게 수정된 블로그 내용을 전달한다.
  8. Http InboudGateway 말고 DiectChannel로 동일한 Flow을 수행할 수 있게 한다.

3. 구현

3.1 HTTP 요청 수신

Spring Integration의 HTTP Inbound Gateway를 사용하여 HTTP 요청을 수신합니다.

@Bean
public IntegrationFlow httpInboundFlow() {
    return IntegrationFlow.from(Http.inboundGateway("/receive")
                    .requestPayloadType(String.class)) // 요청 데이터를 문자열로 변환
            .enrichHeaders(h -> {
                // 헤더 추가
                h.headerFunction("postId", m -> m.getPayload());
            })
            .handle((payload, headers) -> {
                logger.info("========> Receive Handle Headers: {}", headers);
                logger.info("========> Receive Handle Payload: {}", payload);
                return payload;
            })
            .channel("http.get.rest.channel") // 데이터를 처리할 채널
            .get();
}


@Bean
public IntegrationFlow inboundFlow() {
    return IntegrationFlow.from("http.input.channel") // 요청 데이터를 문자열로 변환
            .handle((payload, headers) -> {
                logger.info("========> Receive Handle Headers: {}", headers);
                logger.info("========> Receive Handle Payload: {}", payload);
                return payload;
            })
            .channel("http.get.rest.channel") // 데이터를 처리할 채널
            .get();
}
  • 설명:
    • /receive 엔드포인트에서 HTTP 요청을 수신.
    • 요청 Payload를 문자열로 변환하여 Message.Header에 postId로 저장한다.
    • REST API 호출 채널를 지정한다. 
    • "inboundFlow" Bean은  DiectChannel로 동일한 Flow을 수행한다.

3.2 REST API 호출

Spring Integration의 HTTP Outbound Gateway를 사용하여 외부 REST API를 호출합니다.

    @Bean
    public IntegrationFlow restApiFlow() {
        return IntegrationFlow.from("http.get.rest.channel") // HTTP 요청 데이터 처리 채널
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.GET) // POST 메서드 사용
                        .expectedResponseType(String.class)) // 응답 데이터를 문자열로 처리
                .handle((payload, headers) -> {
                    logger.info("========> GET Rest Handle Headers: {}", headers);
                    logger.info("========> GET RestHandle Payload: {}", payload);
                    return payload;
                })
                .channel("http.process.rest.channel") // 응답 데이터를 전달할 채널
                .get();
    }

3.3 응답 데이터 처리

REST API의 응답 데이터를 가공하여 클라이언트로 반환합니다.

    @Bean
    public IntegrationFlow httpResponseFlow() {
        return IntegrationFlow.from("http.process.rest.channel")
                .transform(String.class, payload -> {
                    try {

                        logger.info("========> Process Payload: {}", payload);

                        // JSON 파싱
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode rootNode = objectMapper.readTree(payload);

                        // 특정 항목(body) 수정
                        ((ObjectNode) rootNode).put("body", "Updated Body Content");

                        // 수정된 JSON 문자열 반환
                        logger.info("========> Process newPayload: {}", objectMapper.writeValueAsString(rootNode));
                        return rootNode;
                    } catch (Exception e) {
                        throw new RuntimeException("Error processing JSON response", e);
                    }
                })
                .enrichHeaders(h -> h.header("Content-Type", "application/json"))
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.PUT)
                        .expectedResponseType(String.class)
                        .headerMapper(new DefaultHttpHeaderMapper())) // 기본 헤더 매핑) // 가공된 데이터를 반환
                .handle((payload, headers) -> {
                    logger.info("========> After Handle Headers: {}", headers);
                    logger.info("========> After Handle Payload: {}", payload);
                    return payload;
                })
                .route(Message.class, message -> "inline".equals(message.getHeaders().get("inputType")), mapping -> mapping
                        .subFlowMapping(true, subFlow -> subFlow
                                .channel("http.output.channel"))
                        .subFlowMapping(false, subuFlow -> subuFlow
                                .handle((paylod, headers) -> {
                                    logger.info("Call By HttpInput");
                                    return paylod;
                                })))
                .get();
    }
  • 설명:
    • 문자열의 paylod을 Json으로 변환하여 블로그의 Body 내용을 수정하는 transform
    • 수정된 블로그 내용을 REST API, PUT https://jsonplaceholder.typicode.com/posts/{postId}로 전달
    • 최초 시행이 Http InboundGateway로 진행되었는지, DiectChannel로 진행되었는지에 따라  Route로 지정

3.4 Java DSL 전체 코드

package kr.co.thekeytech.spring.eai.http;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.http.support.DefaultHttpHeaderMapper;
import org.springframework.messaging.Message;

@Configuration
public class HttpProcessingConfig {

    private static final Logger logger = LoggerFactory.getLogger(HttpProcessingConfig.class);

    @Bean
    public IntegrationFlow httpInboundFlow() {
        return IntegrationFlow.from(Http.inboundGateway("/receive")
                        .requestPayloadType(String.class)) // 요청 데이터를 문자열로 변환
                .enrichHeaders(h -> {
                    // 헤더 추가
                    h.headerFunction("postId", m -> m.getPayload());
                })
                .handle((payload, headers) -> {
                    logger.info("========> Receive Handle Headers: {}", headers);
                    logger.info("========> Receive Handle Payload: {}", payload);
                    return payload;
                })
                .channel("http.get.rest.channel") // 데이터를 처리할 채널
                .get();
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlow.from("http.input.channel") // 요청 데이터를 문자열로 변환
                .handle((payload, headers) -> {
                    logger.info("========> Receive Handle Headers: {}", headers);
                    logger.info("========> Receive Handle Payload: {}", payload);
                    return payload;
                })
                .channel("http.get.rest.channel") // 데이터를 처리할 채널
                .get();
    }

    @Bean
    public IntegrationFlow restApiFlow() {
        return IntegrationFlow.from("http.get.rest.channel") // HTTP 요청 데이터 처리 채널
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.GET) // POST 메서드 사용
                        .expectedResponseType(String.class)) // 응답 데이터를 문자열로 처리
                .handle((payload, headers) -> {
                    logger.info("========> GET Rest Handle Headers: {}", headers);
                    logger.info("========> GET RestHandle Payload: {}", payload);
                    return payload;
                })
                .channel("http.process.rest.channel") // 응답 데이터를 전달할 채널
                .get();
    }

    @Bean
    public IntegrationFlow httpResponseFlow() {
        return IntegrationFlow.from("http.process.rest.channel")
                .transform(String.class, payload -> {
                    try {

                        logger.info("========> Process Payload: {}", payload);

                        // JSON 파싱
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode rootNode = objectMapper.readTree(payload);

                        // 특정 항목(body) 수정
                        ((ObjectNode) rootNode).put("body", "Updated Body Content");

                        // 수정된 JSON 문자열 반환
                        logger.info("========> Process newPayload: {}", objectMapper.writeValueAsString(rootNode));
                        return rootNode;
                    } catch (Exception e) {
                        throw new RuntimeException("Error processing JSON response", e);
                    }
                })
                .enrichHeaders(h -> h.header("Content-Type", "application/json"))
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.PUT)
                        .expectedResponseType(String.class)
                        .headerMapper(new DefaultHttpHeaderMapper())) // 기본 헤더 매핑) // 가공된 데이터를 반환
                .handle((payload, headers) -> {
                    logger.info("========> After Handle Headers: {}", headers);
                    logger.info("========> After Handle Payload: {}", payload);
                    return payload;
                })
                .route(Message.class, message -> "inline".equals(message.getHeaders().get("inputType")), mapping -> mapping
                        .subFlowMapping(true, subFlow -> subFlow
                                .channel("http.output.channel"))
                        .subFlowMapping(false, subuFlow -> subuFlow
                                .handle((paylod, headers) -> {
                                    logger.info("Call By HttpInput");
                                    return paylod;
                                })))
                .get();
    }
}

3.5 Gateway Code

package kr.co.thekeytech.spring.eai.http;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway
public interface HttpPostGateway {
    @Gateway(requestChannel = "http.input.channel") // inputChannel을 명시적으로 지정
    void sendMessage(String payload, @Header("postId") String postId, @Header("inputType") String inputType);
}
  • 설명:
    • DiectChannel로 사용될 Gateway
    • DSL 코드에서 route 조건으로 사용하기 위해 "inputType" Header을 지정한다.

3.6 테스트 코드

package kr.co.thekeytech.spring.eai.http;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest
@AutoConfigureMockMvc
class HttpProcessingConfigTest {
    @Autowired
    private HttpPostGateway httpPostGateway;

    @Autowired
    @Qualifier("http.output.channel")
    private MessageChannel httpOutputChannel;

    @Autowired
    private MockMvc mockMvc;

    @Test
    void testFlow() {
        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) httpOutputChannel).subscribe(outputQueue::send);

        // DiectChannel 사용
        httpPostGateway.sendMessage("Http Post Api Test", "1", "inline");

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        System.out.println("## Final JSON: \n" + outputMessage.getPayload());
    }

    @Test
    void testHttpInboundFlow() throws Exception {

        // 요청 Payload
        String requestBody = "1";

        // POST 요청 실행
        MvcResult result = mockMvc.perform(post("/receive")
                        .contentType("application/json")
                        .content(requestBody))
                .andExpect(status().isOk()) // 200 OK 응답 확인
                .andReturn();

        // 결과 확인
        System.out.println(result.getResponse().getContentAsString());
    }
}
728x90

4. 실행결과

4.1 Http InboundGateway 결과

2025-01-08 12:50:58 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Headers: {http_requestMethod=POST, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, http_requestUrl=http://localhost/receive, postId=1, id=c0e8c1b6-fd7f-13a6-f4bb-cb8c8baec4e6, Content-Length=1, contentType=application/json;charset=UTF-8, timestamp=1736308258973}
2025-01-08 12:50:58 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Payload: 1
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET Rest Handle Headers: {http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308266000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", http_requestUrl=http://localhost/receive, Vary=Origin, Accept-Encoding, id=851499ad-a510-d356-e86c-4cb48c45a632, Content-Length=292, contentType=application/json;charset=utf-8, Age=26902, timestamp=1736308260021}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET RestHandle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process newPayload: {"userId":1,"id":1,"title":"sunt aut facere repellat provident occaecati excepturi optio reprehenderit","body":"Updated Body Content"}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Headers: {http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308266000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", http_requestUrl=http://localhost/receive, Vary=Origin, Accept-Encoding, id=d937d263-e0a4-2304-be6a-e0a78f561f5e, Content-Length=292, contentType=application/json;charset=utf-8, Age=26902, Content-Type=application/json, timestamp=1736308260763}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - Call By HttpInput
...
{
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}

4.2 DiectChannel 실행결과

2025-01-08 12:57:42 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Headers: {replyChannel=nullChannel, inputType=inline, errorChannel=, postId=1, id=04788349-8986-ad33-d0b6-58f6ec85e6aa, timestamp=1736308662807}
2025-01-08 12:57:42 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Payload: Http Post Api Test
...
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET Rest Handle Headers: {errorChannel=, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308670000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=nullChannel, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", Vary=Origin, Accept-Encoding, inputType=inline, id=f86989bc-57a8-4442-62c4-e0c281fd5753, Content-Length=292, contentType=application/json;charset=utf-8, Age=18619, timestamp=1736308663739}
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET RestHandle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
...
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process newPayload: {"userId":1,"id":1,"title":"sunt aut facere repellat provident occaecati excepturi optio reprehenderit","body":"Updated Body Content"}
...
2025-01-08 12:57:44 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Headers: {errorChannel=, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308670000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=nullChannel, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", Vary=Origin, Accept-Encoding, inputType=inline, id=487ef00d-4e69-b200-50dc-f7aa2f6e0495, Content-Length=292, contentType=application/json;charset=utf-8, Age=18619, Content-Type=application/json, timestamp=1736308664463}
2025-01-08 12:57:44 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}
...
## Final JSON: 
{
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}

 

5. 확장 가능성

  1. 데이터 검증:
    • HTTP 요청 데이터의 유효성을 검증하여 잘못된 데이터를 차단.
  2. 다양한 HTTP 메서드 지원:
    • GET, PUT, DELETE 등 다양한 메서드를 지원하는 플로우 추가.
  3. 에러 처리:
    • 외부 API 호출 실패 시 에러 채널(errorChannel)로 메시지를 전달하고 복구 로직 추가.
  4. 다중 API 호출:
    • 여러 REST API를 순차적으로 호출하거나 병렬 처리.
  5. 동적 API URL 설정:
    • 요청 데이터에 따라 API URL을 동적으로 결정.

728x90
Comments