-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
Description
Describe the bug
So we have a Spring Boot web server.
In some requests, we use .awaitSingle()
on a library that internally uses Spring WebClient
which uses Reactor.
Earlier we deployed some faulty code that led to HTTP 400 which raised a WebClientResponseException
and Coroutines did not handle that well. It seems like the Coroutine just sat there and did nothing, as the requesters observed timeouts.
We're using Kotlin 2.2.0
and Coroutines 1.10.2
kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[Dispatchers.Unconfined, Continuation at kotlinx.coroutines.reactor.MonoKt.awaitSingle(Mono.kt:77)@18c39e89]){Completed}@55e543ca. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
at kotlinx.coroutines.DispatchedTask.handleFatalException$kotlinx_coroutines_core(DispatchedTask.kt:130)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:250)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:147)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:470)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$kotlinx_coroutines_core(CancellableContinuationImpl.kt:504)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$kotlinx_coroutines_core$default(CancellableContinuationImpl.kt:493)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:359)
at kotlinx.coroutines.reactor.MonoKt$awaitSingleOrNull$2$1.onError(Mono.kt:62)
at reactor.core.publisher.StrictSubscriber.onError(StrictSubscriber.java:106)
at reactor.core.publisher.MonoContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onError(MonoContextWriteRestoringThreadLocals.java:127)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2235)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onError(FluxOnAssembly.java:544)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:180)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onComplete(FluxOnErrorReturn.java:169)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:179)
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:145)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:241)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:315)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:280)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onComplete(FluxOnErrorReturn.java:169)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:179)
at reactor.netty.channel.FluxReceive.subscribe(FluxReceive.java:145)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals.subscribe(FluxContextWriteRestoringThreadLocals.java:46)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
at reactor.netty.ByteBufFlux.subscribe(ByteBufFlux.java:340)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2096)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:455)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:509)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:821)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: null
Caused by: org.springframework.web.reactive.function.client.WebClientResponseException$BadRequest: 400 Bad Request from POST
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:321)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ 400 BAD_REQUEST from POST [DefaultWebClient]
Original Stack Trace:
at org.springframework.web.reactive.function.client.WebClientResponseException.create(WebClientResponseException.java:321)
at org.springframework.web.reactive.function.client.DefaultClientResponse.lambda$createException$1(DefaultClientResponse.java:214)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxOnErrorReturn$ReturnSubscriber.onNext(FluxOnErrorReturn.java:162)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2096)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)
at reactor.core.publisher.FluxContextWriteRestoringThreadLocals$ContextWriteRestoringThreadLocalsSubscriber.onComplete(FluxContextWriteRestoringThreadLocals.java:149)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:455)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:509)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:821)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: null
Provide a Reproducer
Can't share code as it's internal. I may try to make smaller reproduction later.