Reactor(Mono, Flux)와 코루틴... 도대체 무슨 사이인데
Mono와 Flux는 Reactor에서 쓰이는 타입이다.
Spring webFlux에서 사용하는 것이 Reactor이기 때문에, webFlux에서는 주로 Mono나 Flux를 쓴다.
근데 우리 프로젝트는 코루틴을 쓴단 말이지. (코드에 suspend가 있으면 당신은 코루틴을 쓰고 있는 것이다)
그러면 코루틴은 뭔데.
코루틴도 비동기 처리에 쓰인다...
그럼 둘의 다른 점은 뭔데...
항목 | Kotlin Coroutines | Reactor (Mono/Flux) |
핵심 개념 | suspend를 통한 일시 정지 | 비동기 스트림 |
코드 스타일 | 명령형 (imperative) | 선언형 (declarative) |
철학 | 동기처럼 보이는 비동기 | 데이터 흐름 기반 처리 |
설계 목표 | 간결하고 읽기 쉬운 코드 | 복잡한 데이터 흐름 처리 |
- Reactor는 데이터 스트림을 다룬다는 점.
이것이 Flux다. Spring WebFlux에서 Flux<T> 타입은 0개 이상의 데이터를 비동기 스트림으로 방출한다.
코틀린은 그럼 데이터 스트림을 안 다루나? 다룰 수 있긴 함. Reactor의 Flux에 대응하는 것이 Kotlin의 Flow이다.
(코루틴은 일반적으로 단일 값이나 Flow를 다루므로, Flux에서 값을 꺼내려면 awaitFirst() 같은 브릿지 함수가 필요하다.) - 동기처럼 보이는 비동기 ⬅️WHY????? 이게 왜 중요한데
왜냐면...
동기코드는 읽기 쉽기 때문이다!!!!!!!!!!!!
java로 RestTemplate(동기임)을 쓰던 시절을 생각해보자...
그리고 js에서 쓰이던 async 코드(비동기임) 때문에 개고생하던 시절을...
그렇다... 동기코드는 알아보기가 쉽게 생겼음!!!!!!!!!!!!!!!
아래는 webFlux의 비동기 코드이다...
webClient.get()
.uri("/user")
.retrieve()
.bodyToMono(User::class.java)
.flatMap { user -> getOrders(user.id) }
.flatMap { orders -> process(orders) }
.subscribe()
.flatMap 지옥이 생기곤 함 (flatmap은 받아온 데이터를 처리할 수 있게 해주는 연산자다)
val user = webClient.get()
.uri("/user")
.retrieve()
.bodyToMono(User::class.java)
.awaitSingle()
val orders = getOrders(user.id)
val result = process(orders)
flatMap으로 처리하던 부분이 통신 부분에서 분리된!!! 것을 볼 수 있다.
얼마나 이해하기 쉽느뇨?
조금 더 코드의 비교를 통해 감을 잡아보자
Spring WebFlux + Kotlin
// 코루틴 방식 (Spring WebFlux + Kotlin)
@GetMapping("/user")
suspend fun getUser(): User {
return userService.getUser() // suspend fun
}
Spring WebFlux + Java
// Reactor 방식 (Spring WebFlux + Java)
@GetMapping("/user")
public Mono<User> getUser() {
return userService.getUser();
}
suspend가 들어가면 코루틴이다...
당연한 소리 왜 하냐고 하지 말어라 모를수도있지...
근데 우리 코드는 코루틴 쓰는데 갑자기 Mono도 등장함................
왜임? 왜 코루틴 쓰는데 mono가 있음?ㅠㅠ
그 이유는 바로 이 둘을 섞어서 쓸 수도 있기 떄문이다... 그걸 위한 함수를 제공함
변환 방향 | 방법 |
suspend → Mono | mono { ... } 블록 사용 |
Mono → suspend | awaitSingle(), awaitFirst() 등 확장 함수 사용 |
Flux → Flow (Kotlin) | .asFlow() |
Flow → Flux | .asFlux() |
예시 )
// Mono를 코루틴에서 사용
val user: User = userService.getUserMono().awaitSingle()
// suspend 함수를 Mono로 감싸기
val userMono: Mono<User> = mono {
userService.getUser() // suspend fun
}
그러면 awaitFirst()에 대해 좀 더 알아보자...
awaitFirst()는 Kotlin 코루틴에서 Flux<T>를 T로 변환할 때 사용하는 확장 함수다.
즉, Flux가 emit하는 첫 번째 값을 기다렸다가 꺼내는 용도이다.
Spring WebFlux에서 Flux<T> 타입은 0개 이상의 데이터를 비동기 스트림으로 방출한다.
하지만 코루틴은 일반적으로 단일 값이나 Flow를 다루므로, Flux에서 값을 꺼내려면 awaitFirst() 같은 브릿지 함수가 필요하다.
브릿지 함수
awaitFirst() | 첫 번째 값을 기다림. 값이 없으면 예외 발생 |
awaitFirstOrNull() | 첫 번째 값을 기다림. 값이 없으면 null 반환 |
awaitSingle() | 정확히 하나만 방출해야 함. 0개 또는 2개 이상이면 예외 |
awaitSingleOrNull() | 0개면 null, 2개 이상이면 예외 |
awaitLast() | 마지막 값을 기다림. 값이 없으면 예외 |
통신은 어떻게 동작하냐면.. WebClient를 쓰는데...
val res = webClient.post()
// 생략
.retrieve()
.bodyToMono() // 여기서 Mono로 바뀐다
.awaitFirst() // 여기서 코루틴으로 바뀐다
retrieve는 뭐냐면...
retrieve는 뭔데 코루틴 코드에서도, WebFlux 코드에서도 보이는걸까
WebClient에서 사용하는 코드이기 때문인데..
WebClient에서 응답을 받을 때에는 아래의 두 가지 메소드 중 하나를 선택해서 사용하시면 됩니다.
- retrieve() : ResponseEntity를 받아 디코딩
- exchange() : ClientResponse를 상태값 그리고 헤더와 함께
출처: https://gngsn.tistory.com/198
webClient는 비동기 HTTP 클라이언트다...
비동기 요청을 보내주는 녀석이라는 뜻이다...
거기에 .~~() 형식으로 함수를 체이닝해서 원하는대로 응답값을 볶아먹을 수 있다.
WebFlux는 Reactor 라이브러리를 사용하기 때문에
webClient에는 Reactor가 내장돼있다...
WebClient가 응답을 Mono로 주면, 그걸 기다렸다가 awaitSingle()로 꺼내쓸 수 있는 그런... 협업관계인 것이다
마지막으로 코루틴과 Reactor의 실행 방식의 차이점..
항목 | Kotlin Coroutines | Reactor |
실행 방식 | 코루틴이 suspend 지점에서 스레드를 차단하지 않고 대기 | 체이닝된 연산자들이 등록된 후, subscribe() 호출 시 실행 |
일시 정지 | 컴파일러가 suspend 지점 관리 | 콜백 체인을 통한 처리 |
시작 시점 | suspend 함수 호출 시 즉시 실행 | subscribe 전까지는 실제 실행되지 않음 (lazy) |