본문 바로가기
Spring Integration for Beginners

실용적인 통합 시나리오- HTTP와 REST 통합

by Andrew's Akashic Records 2025. 1. 8.
728x90
3. 실용적인 통합 시나리오
3.1 HTTP와 REST 통합

Spring Integration for Backend Developers

HTTP와 REST 통합

Spring Integration에서 HTTP AdapterREST API를 사용하여 데이터를 통합하고 전송하는 방법을 소개합니다. 이 시나리오에서는 HTTP Inbound AdapterOutbound Gateway를 활용하여 RESTful API와의 데이터 전송을 구현합니다.

1. 요구사항

  1. HTTP 요청 수신:
    • 클라이언트로부터 데이터를 수신.
  2. RESTful API 호출:
    • 수신된 데이터를 외부 REST API로 전송.
  3. 응답 데이터 처리:
    • REST API의 응답 데이터를 처리 후 반환.

2. 시나리오

  1. Http Inbound로 전달 받은 블로그 postId로 블로그 정보를 가져와 내용을 수정후 다시 저장한다.
  2. URL "/receive"로 Http.inboundGateway을 지정한다.
  3. "/receive" URL에 Post 방식으로 호출되면 Request Body에 postId 값만 문자열로 전달 한다.
  4. GET "https://jsonplaceholder.typicode.com/posts/{postId} 로 부터 블로그 내용을 가져온다.
  5. 가저온 블로그 내용을 수정한다.
  6. PUT "https://jsonplaceholder.typicode.com/posts/{postId}로 수정된 내용을 저장한다.
  7. 클라이언트에게 수정된 블로그 내용을 전달한다.
  8. Http InboudGateway 말고 DiectChannel로 동일한 Flow을 수행할 수 있게 한다.

3. 구현

3.1 HTTP 요청 수신

Spring Integration의 HTTP Inbound Gateway를 사용하여 HTTP 요청을 수신합니다.

@Bean
public IntegrationFlow httpInboundFlow() {
    return IntegrationFlow.from(Http.inboundGateway("/receive")
                    .requestPayloadType(String.class)) // 요청 데이터를 문자열로 변환
            .enrichHeaders(h -> {
                // 헤더 추가
                h.headerFunction("postId", m -> m.getPayload());
            })
            .handle((payload, headers) -> {
                logger.info("========> Receive Handle Headers: {}", headers);
                logger.info("========> Receive Handle Payload: {}", payload);
                return payload;
            })
            .channel("http.get.rest.channel") // 데이터를 처리할 채널
            .get();
}


@Bean
public IntegrationFlow inboundFlow() {
    return IntegrationFlow.from("http.input.channel") // 요청 데이터를 문자열로 변환
            .handle((payload, headers) -> {
                logger.info("========> Receive Handle Headers: {}", headers);
                logger.info("========> Receive Handle Payload: {}", payload);
                return payload;
            })
            .channel("http.get.rest.channel") // 데이터를 처리할 채널
            .get();
}
  • 설명:
    • /receive 엔드포인트에서 HTTP 요청을 수신.
    • 요청 Payload를 문자열로 변환하여 Message.Header에 postId로 저장한다.
    • REST API 호출 채널를 지정한다. 
    • "inboundFlow" Bean은  DiectChannel로 동일한 Flow을 수행한다.

3.2 REST API 호출

Spring Integration의 HTTP Outbound Gateway를 사용하여 외부 REST API를 호출합니다.

    @Bean
    public IntegrationFlow restApiFlow() {
        return IntegrationFlow.from("http.get.rest.channel") // HTTP 요청 데이터 처리 채널
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.GET) // POST 메서드 사용
                        .expectedResponseType(String.class)) // 응답 데이터를 문자열로 처리
                .handle((payload, headers) -> {
                    logger.info("========> GET Rest Handle Headers: {}", headers);
                    logger.info("========> GET RestHandle Payload: {}", payload);
                    return payload;
                })
                .channel("http.process.rest.channel") // 응답 데이터를 전달할 채널
                .get();
    }

3.3 응답 데이터 처리

REST API의 응답 데이터를 가공하여 클라이언트로 반환합니다.

    @Bean
    public IntegrationFlow httpResponseFlow() {
        return IntegrationFlow.from("http.process.rest.channel")
                .transform(String.class, payload -> {
                    try {

                        logger.info("========> Process Payload: {}", payload);

                        // JSON 파싱
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode rootNode = objectMapper.readTree(payload);

                        // 특정 항목(body) 수정
                        ((ObjectNode) rootNode).put("body", "Updated Body Content");

                        // 수정된 JSON 문자열 반환
                        logger.info("========> Process newPayload: {}", objectMapper.writeValueAsString(rootNode));
                        return rootNode;
                    } catch (Exception e) {
                        throw new RuntimeException("Error processing JSON response", e);
                    }
                })
                .enrichHeaders(h -> h.header("Content-Type", "application/json"))
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.PUT)
                        .expectedResponseType(String.class)
                        .headerMapper(new DefaultHttpHeaderMapper())) // 기본 헤더 매핑) // 가공된 데이터를 반환
                .handle((payload, headers) -> {
                    logger.info("========> After Handle Headers: {}", headers);
                    logger.info("========> After Handle Payload: {}", payload);
                    return payload;
                })
                .route(Message.class, message -> "inline".equals(message.getHeaders().get("inputType")), mapping -> mapping
                        .subFlowMapping(true, subFlow -> subFlow
                                .channel("http.output.channel"))
                        .subFlowMapping(false, subuFlow -> subuFlow
                                .handle((paylod, headers) -> {
                                    logger.info("Call By HttpInput");
                                    return paylod;
                                })))
                .get();
    }
  • 설명:
    • 문자열의 paylod을 Json으로 변환하여 블로그의 Body 내용을 수정하는 transform
    • 수정된 블로그 내용을 REST API, PUT https://jsonplaceholder.typicode.com/posts/{postId}로 전달
    • 최초 시행이 Http InboundGateway로 진행되었는지, DiectChannel로 진행되었는지에 따라  Route로 지정

3.4 Java DSL 전체 코드

package kr.co.thekeytech.spring.eai.http;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.http.dsl.Http;
import org.springframework.integration.http.support.DefaultHttpHeaderMapper;
import org.springframework.messaging.Message;

@Configuration
public class HttpProcessingConfig {

    private static final Logger logger = LoggerFactory.getLogger(HttpProcessingConfig.class);

    @Bean
    public IntegrationFlow httpInboundFlow() {
        return IntegrationFlow.from(Http.inboundGateway("/receive")
                        .requestPayloadType(String.class)) // 요청 데이터를 문자열로 변환
                .enrichHeaders(h -> {
                    // 헤더 추가
                    h.headerFunction("postId", m -> m.getPayload());
                })
                .handle((payload, headers) -> {
                    logger.info("========> Receive Handle Headers: {}", headers);
                    logger.info("========> Receive Handle Payload: {}", payload);
                    return payload;
                })
                .channel("http.get.rest.channel") // 데이터를 처리할 채널
                .get();
    }

    @Bean
    public IntegrationFlow inboundFlow() {
        return IntegrationFlow.from("http.input.channel") // 요청 데이터를 문자열로 변환
                .handle((payload, headers) -> {
                    logger.info("========> Receive Handle Headers: {}", headers);
                    logger.info("========> Receive Handle Payload: {}", payload);
                    return payload;
                })
                .channel("http.get.rest.channel") // 데이터를 처리할 채널
                .get();
    }

    @Bean
    public IntegrationFlow restApiFlow() {
        return IntegrationFlow.from("http.get.rest.channel") // HTTP 요청 데이터 처리 채널
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.GET) // POST 메서드 사용
                        .expectedResponseType(String.class)) // 응답 데이터를 문자열로 처리
                .handle((payload, headers) -> {
                    logger.info("========> GET Rest Handle Headers: {}", headers);
                    logger.info("========> GET RestHandle Payload: {}", payload);
                    return payload;
                })
                .channel("http.process.rest.channel") // 응답 데이터를 전달할 채널
                .get();
    }

    @Bean
    public IntegrationFlow httpResponseFlow() {
        return IntegrationFlow.from("http.process.rest.channel")
                .transform(String.class, payload -> {
                    try {

                        logger.info("========> Process Payload: {}", payload);

                        // JSON 파싱
                        ObjectMapper objectMapper = new ObjectMapper();
                        JsonNode rootNode = objectMapper.readTree(payload);

                        // 특정 항목(body) 수정
                        ((ObjectNode) rootNode).put("body", "Updated Body Content");

                        // 수정된 JSON 문자열 반환
                        logger.info("========> Process newPayload: {}", objectMapper.writeValueAsString(rootNode));
                        return rootNode;
                    } catch (Exception e) {
                        throw new RuntimeException("Error processing JSON response", e);
                    }
                })
                .enrichHeaders(h -> h.header("Content-Type", "application/json"))
                .handle(Http.outboundGateway(message -> "https://jsonplaceholder.typicode.com/posts/" + message.getHeaders().getOrDefault("postId", "1"))
                        .httpMethod(HttpMethod.PUT)
                        .expectedResponseType(String.class)
                        .headerMapper(new DefaultHttpHeaderMapper())) // 기본 헤더 매핑) // 가공된 데이터를 반환
                .handle((payload, headers) -> {
                    logger.info("========> After Handle Headers: {}", headers);
                    logger.info("========> After Handle Payload: {}", payload);
                    return payload;
                })
                .route(Message.class, message -> "inline".equals(message.getHeaders().get("inputType")), mapping -> mapping
                        .subFlowMapping(true, subFlow -> subFlow
                                .channel("http.output.channel"))
                        .subFlowMapping(false, subuFlow -> subuFlow
                                .handle((paylod, headers) -> {
                                    logger.info("Call By HttpInput");
                                    return paylod;
                                })))
                .get();
    }
}

3.5 Gateway Code

package kr.co.thekeytech.spring.eai.http;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway
public interface HttpPostGateway {
    @Gateway(requestChannel = "http.input.channel") // inputChannel을 명시적으로 지정
    void sendMessage(String payload, @Header("postId") String postId, @Header("inputType") String inputType);
}
  • 설명:
    • DiectChannel로 사용될 Gateway
    • DSL 코드에서 route 조건으로 사용하기 위해 "inputType" Header을 지정한다.

3.6 테스트 코드

package kr.co.thekeytech.spring.eai.http;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.MvcResult;

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;

@SpringBootTest
@AutoConfigureMockMvc
class HttpProcessingConfigTest {
    @Autowired
    private HttpPostGateway httpPostGateway;

    @Autowired
    @Qualifier("http.output.channel")
    private MessageChannel httpOutputChannel;

    @Autowired
    private MockMvc mockMvc;

    @Test
    void testFlow() {
        QueueChannel outputQueue = new QueueChannel();
        ((DirectChannel) httpOutputChannel).subscribe(outputQueue::send);

        // DiectChannel 사용
        httpPostGateway.sendMessage("Http Post Api Test", "1", "inline");

        // 결과 출력
        Message<?> outputMessage = outputQueue.receive(1000);
        assertThat(outputMessage).isNotNull();
        System.out.println("## Final JSON: \n" + outputMessage.getPayload());
    }

    @Test
    void testHttpInboundFlow() throws Exception {

        // 요청 Payload
        String requestBody = "1";

        // POST 요청 실행
        MvcResult result = mockMvc.perform(post("/receive")
                        .contentType("application/json")
                        .content(requestBody))
                .andExpect(status().isOk()) // 200 OK 응답 확인
                .andReturn();

        // 결과 확인
        System.out.println(result.getResponse().getContentAsString());
    }
}
728x90

4. 실행결과

4.1 Http InboundGateway 결과

2025-01-08 12:50:58 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Headers: {http_requestMethod=POST, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, http_requestUrl=http://localhost/receive, postId=1, id=c0e8c1b6-fd7f-13a6-f4bb-cb8c8baec4e6, Content-Length=1, contentType=application/json;charset=UTF-8, timestamp=1736308258973}
2025-01-08 12:50:58 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Payload: 1
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET Rest Handle Headers: {http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308266000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", http_requestUrl=http://localhost/receive, Vary=Origin, Accept-Encoding, id=851499ad-a510-d356-e86c-4cb48c45a632, Content-Length=292, contentType=application/json;charset=utf-8, Age=26902, timestamp=1736308260021}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET RestHandle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process newPayload: {"userId":1,"id":1,"title":"sunt aut facere repellat provident occaecati excepturi optio reprehenderit","body":"Updated Body Content"}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Headers: {http_requestMethod=POST, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308266000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@593db293, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", http_requestUrl=http://localhost/receive, Vary=Origin, Accept-Encoding, id=d937d263-e0a4-2304-be6a-e0a78f561f5e, Content-Length=292, contentType=application/json;charset=utf-8, Age=26902, Content-Type=application/json, timestamp=1736308260763}
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}
...
2025-01-08 12:51:00 - k.c.t.s.e.http.HttpProcessingConfig - Call By HttpInput
...
{
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}

4.2 DiectChannel 실행결과

2025-01-08 12:57:42 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Headers: {replyChannel=nullChannel, inputType=inline, errorChannel=, postId=1, id=04788349-8986-ad33-d0b6-58f6ec85e6aa, timestamp=1736308662807}
2025-01-08 12:57:42 - k.c.t.s.e.http.HttpProcessingConfig - ========> Receive Handle Payload: Http Post Api Test
...
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET Rest Handle Headers: {errorChannel=, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308670000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=nullChannel, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", Vary=Origin, Accept-Encoding, inputType=inline, id=f86989bc-57a8-4442-62c4-e0c281fd5753, Content-Length=292, contentType=application/json;charset=utf-8, Age=18619, timestamp=1736308663739}
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> GET RestHandle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
...
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto"
}
2025-01-08 12:57:43 - k.c.t.s.e.http.HttpProcessingConfig - ========> Process newPayload: {"userId":1,"id":1,"title":"sunt aut facere repellat provident occaecati excepturi optio reprehenderit","body":"Updated Body Content"}
...
2025-01-08 12:57:44 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Headers: {errorChannel=, Server=cloudflare, Connection=keep-alive, Pragma=no-cache, postId=1, http_statusCode=200 OK, Date=1736308670000, Via=1.1 vegur, Accept-Ranges=bytes, replyChannel=nullChannel, Cache-Control=max-age=43200, Etag=W/"124-yiKdLzqO5gfBrJFrcdJ8Yq0LGnU", Vary=Origin, Accept-Encoding, inputType=inline, id=487ef00d-4e69-b200-50dc-f7aa2f6e0495, Content-Length=292, contentType=application/json;charset=utf-8, Age=18619, Content-Type=application/json, timestamp=1736308664463}
2025-01-08 12:57:44 - k.c.t.s.e.http.HttpProcessingConfig - ========> After Handle Payload: {
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}
...
## Final JSON: 
{
  "userId": 1,
  "id": 1,
  "title": "sunt aut facere repellat provident occaecati excepturi optio reprehenderit",
  "body": "Updated Body Content"
}

 

5. 확장 가능성

  1. 데이터 검증:
    • HTTP 요청 데이터의 유효성을 검증하여 잘못된 데이터를 차단.
  2. 다양한 HTTP 메서드 지원:
    • GET, PUT, DELETE 등 다양한 메서드를 지원하는 플로우 추가.
  3. 에러 처리:
    • 외부 API 호출 실패 시 에러 채널(errorChannel)로 메시지를 전달하고 복구 로직 추가.
  4. 다중 API 호출:
    • 여러 REST API를 순차적으로 호출하거나 병렬 처리.
  5. 동적 API URL 설정:
    • 요청 데이터에 따라 API URL을 동적으로 결정.

728x90