Skip to content

Commit 05dbcd9

Browse files
authored
GH-3189: Properly handle async in ScatterGatherHandler (#10469)
* GH-3189: Properly handle `async` in `ScatterGatherHandler` Fixes: #3189 Despite supporting `async = true`, the `ScatterGatherHandler` does blocking in its `handleRequestMessage()` on the `gatherResultChannel.receive()` call. * Fix `ScatterGatherHandler` to handle an `async` mode via internal `Mono` for the reply object * Use pattern variable expressions for `ifs` in the `ScatterGatherHandler.doInit()` for the better readability * Extract `ScatterGatherHandler.replyFromGatherResult()` method to avoid code duplication * Document a new (fixed) functionality * Mention `Mono` reply in the `ScatterGatherHandler` Javadocs **Auto-cherry-pick to `6.5.x`**
1 parent 3811244 commit 05dbcd9

File tree

3 files changed

+63
-20
lines changed

3 files changed

+63
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package org.springframework.integration.scattergather;
1818

19+
import java.time.Duration;
20+
1921
import org.jspecify.annotations.Nullable;
22+
import reactor.core.publisher.Mono;
23+
import reactor.core.publisher.Sinks;
2024

2125
import org.springframework.aop.support.AopUtils;
2226
import org.springframework.beans.factory.BeanFactory;
@@ -32,6 +36,7 @@
3236
import org.springframework.integration.endpoint.PollingConsumer;
3337
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
3438
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
39+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3540
import org.springframework.integration.support.management.ManageableLifecycle;
3641
import org.springframework.messaging.Message;
3742
import org.springframework.messaging.MessageChannel;
@@ -48,6 +53,9 @@
4853
/**
4954
* The {@link MessageHandler} implementation for the
5055
* <a href="https://www.enterpriseintegrationpatterns.com/BroadcastAggregate.html">Scatter-Gather</a> EIP pattern.
56+
* <p>
57+
* When {@link #setAsync(boolean)} is {@code true}, the {@link ScatterGatherHandler} produces
58+
* a {@link Mono} as a reply based on the gather result.
5159
*
5260
* @author Artem Bilan
5361
* @author Abdul Zaheer
@@ -146,11 +154,11 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
146154
}
147155

148156
});
149-
if (this.gatherChannel instanceof SubscribableChannel) {
150-
this.gatherEndpoint = new EventDrivenConsumer((SubscribableChannel) this.gatherChannel, this.gatherer);
157+
if (this.gatherChannel instanceof SubscribableChannel subscribableChannel) {
158+
this.gatherEndpoint = new EventDrivenConsumer(subscribableChannel, this.gatherer);
151159
}
152-
else if (this.gatherChannel instanceof PollableChannel) {
153-
this.gatherEndpoint = new PollingConsumer((PollableChannel) this.gatherChannel, this.gatherer);
160+
else if (this.gatherChannel instanceof PollableChannel pollableChannel) {
161+
this.gatherEndpoint = new PollingConsumer(pollableChannel, this.gatherer);
154162
((PollingConsumer) this.gatherEndpoint).setReceiveTimeout(this.gatherTimeout);
155163
}
156164
else if (this.gatherChannel instanceof ReactiveStreamsSubscribableChannel) {
@@ -191,7 +199,18 @@ private Message<?> enhanceScatterReplyMessage(Message<?> message) {
191199
@Override
192200
protected @Nullable Object handleRequestMessage(Message<?> requestMessage) {
193201
MessageHeaders requestMessageHeaders = requestMessage.getHeaders();
194-
PollableChannel gatherResultChannel = new QueueChannel();
202+
boolean async = isAsync();
203+
MessageChannel gatherResultChannel;
204+
Sinks.One<Message<?>> replyMono;
205+
206+
if (async) {
207+
replyMono = Sinks.one();
208+
gatherResultChannel = (message, timeout) -> replyMono.tryEmitValue(message).isSuccess();
209+
}
210+
else {
211+
replyMono = null;
212+
gatherResultChannel = new QueueChannel();
213+
}
195214

196215
Message<?> scatterMessage =
197216
getMessageBuilderFactory()
@@ -204,17 +223,28 @@ private Message<?> enhanceScatterReplyMessage(Message<?> message) {
204223

205224
this.messagingTemplate.send(this.scatterChannel, scatterMessage);
206225

207-
Message<?> gatherResult = gatherResultChannel.receive(this.gatherTimeout);
208-
if (gatherResult != null) {
209-
return getMessageBuilderFactory()
210-
.fromMessage(gatherResult)
211-
.removeHeaders(GATHER_RESULT_CHANNEL, ORIGINAL_ERROR_CHANNEL,
212-
MessageHeaders.REPLY_CHANNEL, MessageHeaders.ERROR_CHANNEL);
226+
if (replyMono != null) {
227+
return replyMono.asMono()
228+
.map(this::replyFromGatherResult)
229+
.timeout(Duration.ofMillis(this.gatherTimeout), Mono.empty());
230+
}
231+
else {
232+
Message<?> gatherResult = ((PollableChannel) gatherResultChannel).receive(this.gatherTimeout);
233+
if (gatherResult != null) {
234+
return replyFromGatherResult(gatherResult);
235+
}
213236
}
214237

215238
return null;
216239
}
217240

241+
private AbstractIntegrationMessageBuilder<?> replyFromGatherResult(Message<?> gatherResult) {
242+
return getMessageBuilderFactory()
243+
.fromMessage(gatherResult)
244+
.removeHeaders(GATHER_RESULT_CHANNEL, ORIGINAL_ERROR_CHANNEL,
245+
MessageHeaders.REPLY_CHANNEL, MessageHeaders.ERROR_CHANNEL);
246+
}
247+
218248
@Override
219249
public void start() {
220250
if (this.gatherEndpoint != null) {
@@ -240,8 +270,8 @@ private static void checkClass(Class<?> gathererClass, String className, String
240270
Assert.isAssignable(clazz, gathererClass,
241271
() -> "the '" + type + "' must be an " + className + " " + "instance");
242272
}
243-
catch (ClassNotFoundException e) {
244-
throw new IllegalStateException("The class for '" + className + "' cannot be loaded", e);
273+
catch (ClassNotFoundException ex) {
274+
throw new IllegalStateException("The class for '" + className + "' cannot be loaded", ex);
245275
}
246276
}
247277

spring-integration-core/src/test/java/org/springframework/integration/dsl/routers/RouterTests.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.dsl.routers;
1818

19+
import java.util.ArrayList;
1920
import java.util.Arrays;
2021
import java.util.List;
2122
import java.util.Map;
@@ -463,15 +464,21 @@ public void testRouterAsNonLastComponent() {
463464
@Test
464465
public void testScatterGather() {
465466
QueueChannel replyChannel = new QueueChannel();
466-
Message<String> request = MessageBuilder.withPayload("foo")
467+
Message<String> request = MessageBuilder.withPayload("test")
467468
.setReplyChannel(replyChannel)
468469
.build();
469470
this.scatterGatherFlowInput.send(request);
470471
Message<?> bestQuoteMessage = replyChannel.receive(10000);
471-
assertThat(bestQuoteMessage).isNotNull();
472-
Object payload = bestQuoteMessage.getPayload();
473-
assertThat(payload).isInstanceOf(List.class);
474-
assertThat(((List<?>) payload).size()).isGreaterThanOrEqualTo(1);
472+
assertThat(bestQuoteMessage)
473+
.extracting(Message::getPayload)
474+
.asInstanceOf(InstanceOfAssertFactories.LIST)
475+
.hasSizeGreaterThanOrEqualTo(1)
476+
.first()
477+
.asInstanceOf(InstanceOfAssertFactories.type(Message.class))
478+
.extracting(Message::getHeaders)
479+
.asInstanceOf(InstanceOfAssertFactories.MAP)
480+
.extractingByKey("gatherResultChannel")
481+
.isNotInstanceOf(PollableChannel.class);
475482
}
476483

477484
@Autowired
@@ -859,9 +866,11 @@ public IntegrationFlow scatterGatherFlow() {
859866
group.size() == 3 ||
860867
group.getMessages()
861868
.stream()
862-
.anyMatch(m -> (Double) m.getPayload() > 5)),
869+
.anyMatch(m -> (Double) m.getPayload() > 5))
870+
.outputProcessor(group -> new ArrayList<>(group.getMessages())),
863871
scatterGather -> scatterGather
864-
.gatherTimeout(10_000));
872+
.gatherTimeout(10_000)
873+
.async(true));
865874
}
866875

867876
@Bean

src/reference/antora/modules/ROOT/pages/scatter-gather.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ Mutually exclusive with `scatter-channel` attribute.
154154
<13> The `<aggregator>` options.
155155
Required.
156156

157+
NOTE: Starting with version `6.5.3`, when a `ScatterGatherHandler` is configured for the `async = true` option, the request message handling thread is not blocked anymore waiting for a gather result on an internal `((PollableChannel) gatherResultChannel).receive(this.gatherTimeout)` operation.
158+
Instead, a `reactor.core.publisher.Mono` is returned as a reply object based on a gather result eventually produced from the `gatherResultChannel`.
159+
Such a `Mono` is handled then according to the xref:reactive-streams.adoc#reactive-reply-payload[Reactive Streams support] in the framework.
160+
157161
[[scatter-gather-error-handling]]
158162
== Error Handling
159163

0 commit comments

Comments
 (0)