@@ -487,16 +487,19 @@ func (c *Client) Close() error {
487
487
}
488
488
}
489
489
490
- for _ , cs := range c .coordinator .consumers {
491
- err := c .coordinator .RemoveConsumerById (cs .(* Consumer ).ID , Event {
492
- Command : CommandClose ,
493
- StreamName : cs .(* Consumer ).GetStreamName (),
494
- Name : cs .(* Consumer ).GetName (),
495
- Reason : SocketClosed ,
496
- Err : nil ,
497
- })
498
- if err != nil {
499
- logs .LogWarn ("error removing consumer: %s" , err )
490
+ for _ , cs := range c .coordinator .GetConsumers () {
491
+ if cs != nil {
492
+ err := c .coordinator .RemoveConsumerById (cs .(* Consumer ).ID , Event {
493
+ Command : CommandClose ,
494
+ StreamName : cs .(* Consumer ).GetStreamName (),
495
+ Name : cs .(* Consumer ).GetName (),
496
+ Reason : SocketClosed ,
497
+ Err : nil ,
498
+ })
499
+
500
+ if err != nil {
501
+ logs .LogWarn ("error removing consumer: %s" , err )
502
+ }
500
503
}
501
504
}
502
505
if c .getSocket ().isOpen () {
@@ -1019,10 +1022,30 @@ func (c *Client) declareSubscriber(streamName string,
1019
1022
}
1020
1023
}
1021
1024
1022
- case <- time .After (consumer .options .autoCommitStrategy .flushInterval ):
1023
- consumer .cacheStoreOffset ()
1025
+ case <- time .After (1_000 * time .Millisecond ):
1026
+ if consumer .options .autocommit && time .Since (consumer .getLastAutoCommitStored ()) >= consumer .options .autoCommitStrategy .flushInterval {
1027
+ consumer .cacheStoreOffset ()
1028
+ }
1029
+
1030
+ // This is a very edge case where the consumer is not active anymore
1031
+ // but the consumer is still in the list of consumers
1032
+ // It can happen during the reconnection with load-balancing
1033
+ // found this problem with a caos test where random killing the load-balancer and node where
1034
+ // the client should be connected
1035
+ if consumer .isZombie () {
1036
+ logs .LogWarn ("Detected zombie consumer for stream %s, closing" , streamName )
1037
+ consumer .close (Event {
1038
+ Command : CommandUnsubscribe ,
1039
+ StreamName : consumer .GetStreamName (),
1040
+ Name : consumer .GetName (),
1041
+ Reason : ZombieConsumer ,
1042
+ Err : nil ,
1043
+ })
1044
+ return
1045
+ }
1024
1046
}
1025
1047
}
1048
+
1026
1049
}()
1027
1050
return consumer , err .Err
1028
1051
}
0 commit comments