Filter의 Flux<DataBuffer>에 대해 알아보자

 

exchange.request.body.map { dataBuffer -> ... }

 

  • exchange.request.body의 타입은:
    Flux<DataBuffer>
    즉, DataBuffer들을 비동기적으로 흘려보내는 스트림이다.
  • map { dataBuffer -> ... }에서 dataBuffer는 Flux가 하나씩 방출하는 요소(DataBuffer)다.

그럼 왜 dataBuffer가 사용 가능한가?

Flux<T>는 코틀린에서 말하면 (List<T>나 Sequence<T>와 비슷하게)
T 타입의 값들을 하나씩 처리할 수 있는 구조

  • exchange.request.body가 Flux<DataBuffer>이므로
  • .map { dataBuffer -> ... }는 그 안의 DataBuffer를 하나씩 꺼내서 처리하겠다는 의미
  • dataBuffer는 DataBuffer 타입 하나의 인스턴스를 의미함

비유로 이해하기

val list = listOf("apple", "banana", "cherry")
val upperList = list.map { fruit -> fruit.uppercase() }
  • 여기서 list.map { fruit -> ... }에서 fruit은 "apple", "banana", "cherry" 순서대로 들어옴
  • 마찬가지로 Flux<DataBuffer>.map { buffer -> ... }는 내부적으로 buffer들을 하나씩 처리

결국 map 안에 있는 파라미터는 Flux가 "하나씩 흘려보내는 요소"

 

exchange.request.body
    .map { buffer ->
        val bytes = ByteArray(buffer.readableByteCount())
        buffer.read(bytes)
        println("읽은 body 한 조각: ${String(bytes)}")
        bytes
    }

 

 

근데 request body는 flux같은 데이터 스트림이 아니라 1개의 json아닌가?

그러면 Flux가 아니라 Mono로 처리해야하는 거 아닌가?

 

 

내 생각에... 정답은
body가 하나만 오는 것 같음 맞다. 실제로 대부분 요청은 단일 chunk
그런데 Flux.map은 왜 되지? 타입이 Flux이기 때문. 방출 개수와 관계없이 map은 사용 가능
그럼 Mono처럼 써도 되는가? 실제 사용 목적에 따라 .single(), .next()로 변환 가능하지만, Flux 자체로 처리하는 것이 일반적

 

 

단일 chunk일 때 Mono로 처리하고 싶다면?

 
val monoBody = exchange.request.body.next() // Mono<DataBuffer>
val joined = DataBufferUtils.join(exchange.request.body) // Mono<DataBuffer>
 

→ 위처럼 Mono<DataBuffer>로 변환해서 처리할 수도 있다.

하지만 WebFilter에서는 ServerHttpRequest.body를 무조건 Flux<DataBuffer>로 받게 되어 있다.

즉, Mono처럼 다뤄도 실제 타입은 여전히 Flux<DataBuffer>임

 

 

 Flux의 try catch 적용

 

  • Mono/Flux는 지연 실행(lazy) 구조라서, 선언부(filter 메서드의 리턴 부분)에서 바로 실행되지 않고, 구독(subscribe)이 일어날 때 실행됨
  • try-catch는 동기 코드에만 적용되므로, 리액티브 스트림 내부 예외를 잡으려면 다르게 처리해야 함

 

1. onErrorResume 또는 doOnError 같은 리액티브 에러 처리 연산자 사용

return DataBufferUtils.join(exchange.request.body)
    .flatMap { dataBuffer ->
        // body 처리 코드
    }
    .doOnError { e ->
        println("에러 발생: ${e.message}")
    }
    .onErrorResume { e ->
        // 에러 발생 시 fallback 처리
        Mono.error(e)
    }

 

2. 리액티브 내부에서 예외 발생 가능성이 있는 부분에 try-catch를 쓸 땐, flatMap 내부에서만 써야 함

.flatMap { dataBuffer ->
    try {
        val bytes = ByteArray(dataBuffer.readableByteCount())
        dataBuffer.read(bytes)
        DataBufferUtils.release(dataBuffer)

        val bodyString = String(bytes, Charsets.UTF_8)
        println("요청 body: $bodyString")

        // 나머지 처리...
        chain.filter(mutatedExchange)

    } catch (e: Exception) {
        println("내부 예외: ${e.message}")
        Mono.error(e)
    }
}