Skip to content

Commit 861bc85

Browse files
committed
fix(awssqs): include MessageGroupId in SQS SendMessage request
The MessageGroupId was being processed in templating but not actually used when sending messages to SQS. This fix ensures that when a MessageGroupId is specified in the notification template, it's properly included in the SendMessageInput, which is required for FIFO queues. - Add MessageGroupId to SendMessageInput when available - Add unit tests to verify MessageGroupId inclusion/exclusion behavior - Fix ensures proper FIFO queue message grouping functionality
1 parent c449507 commit 861bc85

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

pkg/services/awssqs.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,18 @@ func (s awsSqsService) Send(notif Notification, dest Destination) error {
6767
}
6868

6969
func (s awsSqsService) sendMessageInput(queueUrl *string, notif Notification) *sqs.SendMessageInput {
70-
return &sqs.SendMessageInput{
70+
input := &sqs.SendMessageInput{
7171
QueueUrl: queueUrl,
7272
MessageBody: aws.String(notif.Message),
7373
DelaySeconds: 10,
7474
}
75+
76+
// Add MessageGroupId if available (required for FIFO queues)
77+
if notif.AwsSqs != nil && notif.AwsSqs.MessageGroupId != "" {
78+
input.MessageGroupId = aws.String(notif.AwsSqs.MessageGroupId)
79+
}
80+
81+
return input
7582
}
7683
func (s awsSqsService) getQueueInput(dest Destination) *sqs.GetQueueUrlInput {
7784
result := &sqs.GetQueueUrlInput{}

pkg/services/awssqs_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,61 @@ func TestGetClientOptionsCustomEndpointUrl_AwsSqs(t *testing.T) {
213213
assert.Equal(t, 2, len(options))
214214
}
215215

216+
func TestSendMessageInput_WithMessageGroupId_AwsSqs(t *testing.T) {
217+
s := NewTypedAwsSqsService(AwsSqsOptions{})
218+
queueUrl := "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue.fifo"
219+
220+
notification := Notification{
221+
Message: "Hello",
222+
AwsSqs: &AwsSqsNotification{
223+
MessageGroupId: "test-group-id",
224+
},
225+
}
226+
227+
input := SendMessageInput(s, &queueUrl, notification)
228+
229+
assert.Equal(t, &queueUrl, input.QueueUrl)
230+
assert.Equal(t, "Hello", *input.MessageBody)
231+
assert.Equal(t, int32(10), input.DelaySeconds)
232+
assert.Equal(t, "test-group-id", *input.MessageGroupId)
233+
}
234+
235+
func TestSendMessageInput_WithoutMessageGroupId_AwsSqs(t *testing.T) {
236+
s := NewTypedAwsSqsService(AwsSqsOptions{})
237+
queueUrl := "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
238+
239+
notification := Notification{
240+
Message: "Hello",
241+
AwsSqs: &AwsSqsNotification{
242+
MessageGroupId: "", // Empty string
243+
},
244+
}
245+
246+
input := SendMessageInput(s, &queueUrl, notification)
247+
248+
assert.Equal(t, &queueUrl, input.QueueUrl)
249+
assert.Equal(t, "Hello", *input.MessageBody)
250+
assert.Equal(t, int32(10), input.DelaySeconds)
251+
assert.Nil(t, input.MessageGroupId) // Should not be set
252+
}
253+
254+
func TestSendMessageInput_WithoutAwsSqsNotification_AwsSqs(t *testing.T) {
255+
s := NewTypedAwsSqsService(AwsSqsOptions{})
256+
queueUrl := "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
257+
258+
notification := Notification{
259+
Message: "Hello",
260+
AwsSqs: nil, // No AWS SQS notification
261+
}
262+
263+
input := SendMessageInput(s, &queueUrl, notification)
264+
265+
assert.Equal(t, &queueUrl, input.QueueUrl)
266+
assert.Equal(t, "Hello", *input.MessageBody)
267+
assert.Equal(t, int32(10), input.DelaySeconds)
268+
assert.Nil(t, input.MessageGroupId) // Should not be set
269+
}
270+
216271
// Helpers
217272
var GetConfigOptions = (*awsSqsService).getConfigOptions
218273
var GetClientOptions = (*awsSqsService).getClientOptions

0 commit comments

Comments
 (0)