제휴사 측 이슈로 webClient 통신을 할 때 특정 전문에서 timeout으로 한 두 번 실패하는 경우가 잦았다.
제휴사 측에 이의를 제기하기에는 사소하게... 한 두 건...이라서 수정을 요구하기도 좀 그렇고
그렇다고 이런 몇 건가지고 itmeout시간을 늘리는 것도 좀 그렇고
이에 tiemout으로 실패했을 경우에는 한 번 retry를 하도록 설계해보자는 결론이 났다.
아래 항목들에 대해서 이야기해보겠다.
- 서킷브레이커
- webClient의 retry/backoff 기능
1. retry 로직설계
먼저 webClient에서 retryWhen을 사용하기로 했다.
val response = responseSpec
.retrieve
.bodyToMono(responseType)
.onErrorMap { e ->
if ( e is CancellationException ||
e is java.util.concurrent.TimeoutException ||
e is WebClientRequestException && (
e.cause is ReadTimeoutException ||
e.cause is TimeoutException
)) {
RetryableException(retryEnabled, e)
} else {
e
}
}
.retryWhen(
Retry.max(1)
.filter { it is RetryableException && it.retryEnabled }
.doBeforeRetry { // logging
}
.onRetryExhaustedThrow { _, signal ->
// logging
signal.failure()
}
)
1. onErrorMap을 사용해서 발생한 예외를 커스텀 예외로 치환해준다.
- reactor.core.publisher
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#onErrorMap-java.lang.Class-java.util.function.Function-

2. retryWhen을 사용하여 재시도해준다.
- reactor.core.publisher
- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#retryWhen-reactor.util.retry.Retry-

패키지가 Reacotr인게 보이는데 리액터가 왜 나오냐면
리액터(Reactor)는 Spring WebFlux의 리액티브 엔진을 일컫는다. webFlux에서 사용하는 엔진이라서 그렇다... 엔진이란 뭘까?
2-1. Retry 클래스를 사용한다.
1회만 재시도하도록 설정했다. max(1)
- .retry(long n)
- 실패 시 즉시 n번 재시도.
- .retryWhen(Retry retrySpec) : Retry 클래스를 사용해서 조건, 지연, backoff등의 정책을 설정할 수 있다.
.retryWhen(Retry.max(3)) // 최대 3회 재시도
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2))) // 고정 지연
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) // backoff(exponential)
Retey 클래스에 대해서도 알아보자.
- Retry.max(long n)
- 성공 여부와 관계없이 누적 실패 횟수 기준으로 계산된다
- 에러가 발생하면 카운트가 1 증가
- n을 초과하면 마지막 에러를 그대로 전파한다.
- Retry.maxInARow(long n)
- 연속으로 실패한 횟수만을 기준으로 재시도를 제한한다. (중간에 성공 한 번 하면 실패 카운터 초기화)
n이 3일 경우...
실패 (1)
실패 (2)
성공 → 카운터 초기화
실패 (1)
실패 (2)
실패 (3)
실패 (4) → 예외 전파 - 간헐적 오류는 허용하고 연속 장애가 발생하는 경우에만 차단하고 싶을 때 사용한다
- 연속으로 실패한 횟수만을 기준으로 재시도를 제한한다. (중간에 성공 한 번 하면 실패 카운터 초기화)
- Retry.backoff(long n, Duration d)
- 재시도 횟수가 들어날수록 대기시간이 점점 증가한다
- 첫 retry는 d, 다음 retry는 d * 2 ... (이후 계속 증가)
- 즉시 재시도하면 더 위험한 상황 (결제, 인증, 외부 API 등)
그런데 Retry 클래스에 .filter를 붙일 수 있는 걸 보고 좀 신기했다. 그래서 찾아봄... 왜 .filiter를 붙일 수 있는지? 그러면 .map도 붙일 수 있는지?(아니겠지만...)
Retry는 설정용 DSL 객체이기 때문에 가능하다. 무슨 말이냐? 재시도 전략을 설정해놓는 객체라는 것이다. 재시도 가능한지를 판단하는 규칙 묶음이라는 것.... (사실 아직도 DSL이 뭔지 잘 모르겠음...;; 걍 빌더 패턴 쓰는 것들의 무언가라고 생각하고 있음..)
Retry는 체이닝 문법을 사용할 뿐... filter가 붙었으니 다른 리액티브 연산자, 스트림 연산자인 map도 쓸 수 있나?? 그건 아니라는 것이다.
Retry.backoff(...)
.filter(...)
.maxAttempts(...)
.doBeforeRetry(...)
실제로는 Retry 객체를 하나 만들고 filter 조건을 추가한 새 Retry를 반환하는 구조이다.
Retry는 언제 실행될까?
- upstream에서 에러 발생
- 해당 에러를 Retry에게 전달
- Retry가 판단
- 재시도가 가능 >>> 지연 후 재구독
- 불가능 >>> 에러 전파
그리고 retryWhen이 붙는 위치도 궁금했는데...
.awaitFirst() 뒤에는 붙일 수가 없더라.
이유는 터미널 연산자. 코루틴에서 값을 꺼내는 연산자. 리액티브 체인을 끝내버리는 연산자라서이다.
upstream이란?
지금 보고 있는 연산자보다 먼저 실행되는 쪽(위쪽)에 있는 스트림 단계
반대는 downstream(아래쪽)
upstream에서 발생한 신호(값/에러)가 내려온다고 표현한다.
[ upstream ] ---> 데이터 / 에러 ---> [ downstream ]
upstream error
downstream consumer
upstream source
같은 표현을 씀.
코드를 다시 보자
val response =
responseSpec
.retrieve()
.bodyToMono(responseType)
.awaitFirst()
| 단계 | 타입 |
| bodyToMono(...) | Mono<T> |
| awaitFirst() | T |
awaitFirst()가 호출되는 순간 Mono<T>가 T로 변환된다!!
retryWhen은 Mono 클래스에 속하는(?) 함수(?)이므로(?) awaitFirst 뒤에 붙을 수는 없는 것~ Mono / Flux 전용 연산자라는 것이다.
그래서 리액티브 타입(Mono, Flux)이 살아있을 때에만 retryWhen을 사용할 수 있는 것임.
awaitFirst()는 사실 이런 모양인데
mono.subscribe() 요게 뭔 뜻이냐면
첫 값을 받을 때까지 코루틴을 suspend하고
그 뒤에 값을 반환한단거다... 두둥~
리액티브 세계의 철칙이라는 게 있는데
연산자 체이닝을 하고 마지막에 구독(subscribe)을 하는 구조라는 것.
Mono랑 Flux는 파이프라인이다.
awaitFirst()는 파이프라인을 구독하고나서 block(suspend)해버리는 것...
비슷한 애들로는 .block() .subscribe()가 있음.
근데 얘네들이 terminal 연산자인지 내가 어케 아냔말임 <ㅇ> doc에도 터 미 널 연 산 자 이런식으로 안 적혀있는데
suspend fun <T> Publisher<T>.awaitFirst(): T
이 모습을 보면 awaitFirst가 Publisher 클래스(Mono/Flux의 부모되시겠다)의 확장함수라는 걸 알 수 있고
suspend함수임을 알 수 있따! 이게 뭘 의미하냐면... 바로바로 ~~~ awaitFirst가 terminal 연산자라는거다. 왜...?
왜냐면
suspend가 실행되면 구독이 끝나기 때문(terminal)이다.
리액티브 파이프라인은 lazy한데, 이건 구독이 일어나기 전까지는 아무일도 안 생긴다는 의미다.
- mono.susbscribe()
- mono.awaitFirst() // suspend하면서 subscribe 발생
- mono.block() // blocking하면서 subscribe 발생
이 코드가 작성되기 전까진 아무 일도 안 생김. 저것이 문장의 마침표 같은 거라는 것~~
이걸 리액티브 스트림을 콜드에서 실행 상태로 바뀐다고 한다.
예를들어
webClient.get()
.retrieve()
.bodyToMono<String>()
.map { println("map!") }
이 코드를 실행하면 아무런 일이 일어나지 않지만
webClient.get()
.retrieve()
.bodyToMono<String>()
.map { println("map!") }
.awaitFirst()
이 코드는 실행하면 map!이 찍히는 걸 볼 수 있을 것이다..
리액티브 연산자랑 터미널 연산자가 좀 햇갈리기 시작해서 또 찾아봤다.
| Reactive Operator | Terminal (awaitFirst) | |
| 결과 타입 | Mono/Flux | 실제 값 T |
| subscribe 시점 | 아니라도 됨 | 바로 발생 |
| 체이닝 가능 여부 | 가능 | 불가 |
| 예 | map, filter, retryWhen, flatMap, onErrorMap, timeout, doOnNext ... | |
| 정의 위치 | Reactor API | Kotlin Coroutine Extension (awaitFirst()는 Reactor Kotlin 확장 라이브러리 함수다. 그래서 Reactor 공식 Javadoc에는 없음) |
3. onRetryExhaustedThrow를 사용하여 retry가 종료된 시점(에러 전파 지점)에 로그 찍고 에러 전파하기
이 retry가 종료된 시점을 Mono는 모르고 Retry만 알고 있다. 왜?
Mono는 에러가 났다는 사실만알고 그 에러가 몇 번째 시도에서 났는지는 모른다.
retry 횟수,상태를 관리하는 주체가 Retry이기 때문.
Mono는 본질적으로 신호 전달자여서 onNext(값을전달), onComplete(성공하면), onError(에러나면) 기본적으로 이 세가지만 다운스트림으로 전달한다. mono입장에서는 '에러가 발생함!!!!'만 알 수 있다.
그래서 .doOnError는 적합하지 않다고함...
signal.failure()은 retry를 끝나게 만든 마지막 예외이고 이 블록은 딱 한 번만 실행된다! (retry중에는 실행되지 않음)
- signal.totalRetries() // 전체 retry 횟수
- signal.totalRetriesInARow() // 연속 retry 횟수
- signal.failure() // 마지막 Throwable
만약 retry 성공 후 로그를 찍고 싶다면 이건 또 Retry가 아니라 Mono쪽 책임이다.
.doOnSuccess {} 에서 로그 찍어주시면 되시겠다...
2. test code 작성
timeout 자체를 테스트하는 방법은 다양하다...
본인 상황에 맞는 걸 잘 골라다 써야한다.
- MockWebServer.TruncationBuffer().timeout()
- httpHeader까지는 도착하고 body를 읽다가 타임아웃이 난 케이스에 사용한다.
- 관련 포스팅 : https://tistory-pencilcase.tistory.com/480
- JUnit5의 assertionTimeout
- 특정 코드 블록이 제시간안에 종료됐는지를 검증하고 싶을 때 사용한다.
- 관련 포스팅 : https://tistory-pencilcase.tistory.com/479
- WebTestClient()
- 관련 포스팅 : https://tistory-pencilcase.tistory.com/478
- WebTestClient()와 WebClient()를 spyk하는 것의 차이점에 대해서 정리해보자면...
- WebTestClient()는 실제로 webFlux서버를 띄워서(또는 Mock server를 띄워서) HTTP 통신을 수행한다.
즉, 작성한 api나 로직이 '실 제 로' 작동되는지를 확인할 때 사용한다.
컨트롤러, 필터, 서비스를 모두 실제로 탄다. (통합테스트) - WetClient()를 spyk할 경우는, 실제로 HTTP 통신을 하지 않고, 내부 동작을 가짜로 설정해주는 것(mock, spy)이다. 테스트 대상이 내 비즈니스 로직임. (단위테스트)
- spyk에 대한 관련 포스팅.... https://tistory-pencilcase.tistory.com/481
하여 MockWebServer를 사용하여 가상의 서버를 하나 띄워주고,
WebClient를 사용하여 실제로 retryWhen()을 찌르는지 테스트해보기로 했다.
http 응답은 mockResponse를 사용해서 NO_RESPONSE, 응답없음 상태를 가정하여 httpClient에 지정된 timeout을 유도했다.
class StepVerifierTest {
private lateinit var webClientSpy: WebClient
private lateinit var adaptorApiClient: AdaptorApiClient
private lateinit var mockWebServer: MockWebServer
@BeforeEach
fun setUp() {
MockKAnnotations.init(this)
mockWebServer = MockWebServer()
mockWebServer.start()
val httpClient = HttpClient.create().responseTimeout(Duration.ofSeconds(2))
webClientSpy = spyk(WebClient
.builder()
.baseUrl(mockWebServer.url("/").toString())
.clientConnector(ReactorClientHttpConnector(httpClient))
.build()
)
apiClient = ApiClient(webClientSpy)
}
@AfterEach
fun tearDown() {
mockWebServer.shutdown()
}
@Test
fun `aa`(): Unit = runBlocking {
mockWebServer.enqueue(
MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE)
)
mockWebServer.enqueue(
MockResponse()
.setResponseCode(200)
.setHeader("Content-Type", "application/json") // 설정 안해주면 octet-stream이 리턴됨
.setBody("{\"body\":\"OK\"}")
)
val result = apiClient.bypass(
baseUrl = mockWebServer.url("/").toString(),
endpoint = "",
method = HttpMethod.POST,
requestBody = "",
requestParams = mapOf("" to ""),
headers = mapOf("readOnly" to "true"),
responseType = Any::class.java
)
assertThat(result.toString()).isEqualTo("{body=OK}")
}
}
mockWebServer 를 사용해서 가짜 HTTP 응답을 만들어준다. (MockResponse 부분)
enqueue
mockWebServer.enqueue(response)
// 내부적으로는 이런 구조
MockWebServer
└── Queue<MockResponse>
├── response1
├── response2
└── ...
1️⃣ 클라이언트(WebClient)가 HTTP 요청을 보냄
2️⃣ MockWebServer가 요청을 받음
3️⃣ 큐에서 맨 앞의 MockResponse 하나를 꺼냄 (FIFO)
4️⃣ 그걸 실제 HTTP 응답처럼 직렬화해서 반환
webClient입장에서는 실제 서버인지 mock 서버인지, url만 보는 입장에서 구분할 수 없다.
- MockResponse는 WebClient의 mock이 아니다
- WebClient의 동작을 가로채는 것도 아니다
- 오직 서버 역할만 한다
WebClient 내부 로직 테스트 → spyk / mock
HTTP 통신 흐름 테스트 → MockWebServer
이런 식으로 쪼개서 생각하면 이해하기 좋다.
MockWebServer는 지정된 자신의 url로 요청을 보내야 받을 수 있다.
http://<host>:<port>/
http://127.0.0.1:{mockwebServerPort}/...
localhost:8080이 아니다~!
webClient의 baseUrl을 로컬호스트로 지정해버리면 아래와 같은 에러를 볼 수 있게된다...
WebClientRequestException:
Connection refused: no further information: localhost/127.0.0.1:8080
이 에러는 TCP 커넥션을 시도했다가 8080포트에 리스닝하고 있는 프로세스가 아무것도 없어서 발생된 예외다.
MockWebServer는 실행될 때 랜덤으로 포트를 잡는다.
mockWebServer.start() // 이 때 http://127.0.0.1:52554같은 임의의 주소가 만들어짐
mockWebServer.url("/") // 저 주소를 가져오는 방법임
mockWebServer.url("/")은 무엇을 반환하는가?
- MockWebServer가 바인딩한 소켓 주소
- http://<host>:<port>/
이 때 host는 JVM이 인식하는 로컬 머신의 host name이다. - mockWebServer.url("/").toString()을 했을 때 희안한 값이 나오더라도 놀라지마시라..그것이 당신 컴퓨터의 host name인거니까... (C:\Windows\System32\drivers\etc\hosts 이 경로에서 확인이 가능하다)
MockResponse().setSocketPolicy(SocketPolicy.NO_RESPONSE)
- 요청은 정상적으로 읽고
- 응답은 아예 안 보냄(헤더/바디 다)
- 연결(socket은 열린 상태로 유지)
- 얘가 타임아웃을 발생시키는 건 아님.
말 그대로 서버이기 때문에
걍 답이 없는 서버가 되는 것 뿐...
조용한 서버..
영원히... - 타임아웃을 발생시키려면 webClient에서 설정해주어야한다. httpClient를 사용해서
assertThat과 stepVerifiier
stepVerifier은 Publisher<T>라서 (Mono/Flux)값만 받을 수 있다. 예를 들면 Mono<String>, Flux<Int> 같은...
만약 webClient가 리턴하는 모양새가 String, Any, Map, LinkedHashMap(json)이라면 stepVerifier는 사용할 수 없다.
webClientRequestException과 ReadTimeoutException
org.springframework.web.reactive.function.client.WebClientRequestException
Caused by: io.netty.handler.timeout.ReadTimeoutException
caused by......
WebClientException이 발생은 했는데
cause는 ReadTimeoutException이라는거다
이러면 이제 예외 트리거 어딘가에 ReadTimeoutException이 있다는 거고, onErrorMap이 받는 객체는 다른거다.
WebClientRequestException
└── cause: ReadTimeoutException
왜 이딴 모양새가 생기냐면
WebClient는 발생한 예외를 모두 WebClientRequestException 으로 래핑한다.
.onErrorMap에는 이 타입만 도달하는것이다 따쉬!!!!
그래서 timeout 예외를 잡고 싶다면.... .onErrorMap에서
.onErrorMap { e ->
if (
e is CancellationException ||
e is TimeoutException ||
e is WebClientRequestException && (
e.cause is ReadTimeoutException ||
e.cause is TimeoutException
)
) {
RetryableException(retryEnabled, e)
} else {
e
}
}
이런 모양을 잡아줘야한다.
그럼 이제 왜 CancellationException이랑 TimeoutException은 cause로 안 들어가는지가 궁금해지는데... 지피티한테 물어봄.
| 예외 | 발생 주체 | 감싸짐 여부 |
| --------------------------------------- | ----------------------- | -------------------------------- |
| CancellationException | kotlinx.coroutines | ❌ 감싸지 않음 |
| TimeoutException (java.util.concurrent) | Reactor / 사용자 코드 | ❌ 감싸지 않음 |
| ReadTimeoutException | Netty | ✅ WebClientRequestException으로 래핑 |
CancellationException은 코루틴이 취소됐다는 프로토콜이다.
리액터나 웹클라이언트가 래핑하게되면 취소 전파가 깨지기 때문에 래핑하지 않는다.
java.util.concurrent.TimeoutException 역시
Mono.timeout()이나 Flux.timeout()이나 block(Duration) 같은 리액터 연산자가 직접 던진다.
ReadTimeoutException은
Netty I/O 스레드에서 발생했고 네트워크 요청 실패의 하위 원인인 셈이라고 한다.
[Coroutine]
└─ CancellationException ← 그대로 전달
[Reactor Operator]
└─ TimeoutException ← 그대로 전달
[Netty I/O]
└─ ReadTimeoutException
└─ WebClientRequestException ← 감싸짐
https://blog.naver.com/orez_log/222638364251
https://github.com/reactor/reactor-netty
https://sangpire.tistory.com/entry/StepVerifier-%EC%82%AC%EC%9A%A9-%EB%B0%A9%EB%B2%95
'┝ framework > ┎ Spring' 카테고리의 다른 글
| spring security >>> access denied (0) | 2026.01.14 |
|---|---|
| [docker newbie] spring boot 서버를 도커 컨테이너 안에서 실행하기 (feat 자동 빌드) (0) | 2026.01.12 |
| MockWebServer.class의 TruncationBuffer의 timeout()함수 (1) | 2025.12.26 |
| timeout 관련 assertion (0) | 2025.12.26 |
| [testcode] timeout을 발생하는 테스트코드 (0) | 2025.12.26 |