diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 06b3c5a6..cde02348 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -74,10 +74,12 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") + _ = msg.Nak() return } if output.Type != api.OutputTypeNewRoomEvent { + _ = msg.Nak() return } @@ -87,7 +89,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { // Send event to any relevant application services if err := s.filterRoomserverEvents(context.TODO(), events); err != nil { log.WithError(err).Errorf("roomserver output log: filter error") + _ = msg.Nak() + return } + + _ = msg.Ack() } // filterRoomserverEvents takes in events and decides whether any of them need diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index c29226eb..5fee5b48 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -81,6 +81,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { var ote api.OutputSendToDeviceEvent if err := json.Unmarshal(msg.Data, &ote); err != nil { log.WithError(err).Errorf("eduserver output log: message parse failed (expected send-to-device)") + _ = msg.Nak() return } @@ -88,16 +89,19 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { _, originServerName, err := gomatrixserverlib.SplitID('@', ote.Sender) if err != nil { log.WithError(err).WithField("user_id", ote.Sender).Error("Failed to extract domain from send-to-device sender") + _ = msg.Nak() return } if originServerName != t.ServerName { log.WithField("other_server", originServerName).Info("Suppressing send-to-device: originated elsewhere") + _ = msg.Nak() return } _, destServerName, err := gomatrixserverlib.SplitID('@', ote.UserID) if err != nil { log.WithError(err).WithField("user_id", ote.UserID).Error("Failed to extract domain from send-to-device destination") + _ = msg.Nak() return } @@ -118,6 +122,7 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { } if edu.Content, err = json.Marshal(tdm); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") + _ = msg.Nak() return } @@ -125,6 +130,8 @@ func (t *OutputEDUConsumer) onSendToDeviceEvent(msg *nats.Msg) { if err := t.queues.SendEDU(edu, t.ServerName, []gomatrixserverlib.ServerName{destServerName}); err != nil { log.WithError(err).Error("failed to send EDU") } + + _ = msg.Ack() } // onTypingEvent is called in response to a message received on the typing @@ -135,6 +142,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &ote); err != nil { // Skip this msg but continue processing messages. log.WithError(err).Errorf("eduserver output log: message parse failed (expected typing)") + _ = msg.Nak() return } @@ -142,6 +150,7 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) { _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) if err != nil { log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") + _ = msg.Nak() return } if typingServerName != t.ServerName { @@ -166,12 +175,15 @@ func (t *OutputEDUConsumer) onTypingEvent(msg *nats.Msg) { "typing": ote.Event.Typing, }); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") + _ = msg.Nak() return } if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { log.WithError(err).Error("failed to send EDU") } + + _ = msg.Ack() } // onReceiptEvent is called in response to a message received on the receipt @@ -182,6 +194,7 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &receipt); err != nil { // Skip this msg but continue processing messages. log.WithError(err).Errorf("eduserver output log: message parse failed (expected receipt)") + _ = msg.Nak() return } @@ -189,9 +202,11 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { _, receiptServerName, err := gomatrixserverlib.SplitID('@', receipt.UserID) if err != nil { log.WithError(err).WithField("user_id", receipt.UserID).Error("failed to extract domain from receipt sender") + _ = msg.Nak() return } if receiptServerName != t.ServerName { + _ = msg.Nak() return // don't log, very spammy as it logs for each remote receipt } @@ -224,10 +239,13 @@ func (t *OutputEDUConsumer) onReceiptEvent(msg *nats.Msg) { } if edu.Content, err = json.Marshal(content); err != nil { log.WithError(err).Error("failed to marshal EDU JSON") + _ = msg.Nak() return } if err := t.queues.SendEDU(edu, t.ServerName, names); err != nil { log.WithError(err).Error("failed to send EDU") } + + _ = msg.Ack() } diff --git a/federationsender/consumers/roomserver.go b/federationsender/consumers/roomserver.go index a4b776d8..61db7e84 100644 --- a/federationsender/consumers/roomserver.go +++ b/federationsender/consumers/roomserver.go @@ -76,6 +76,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { if err := json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") + _ = msg.Nak() return } @@ -96,6 +97,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { log.WithField("error", output.Type).Info( err.Error(), ) + _ = msg.Nak() default: // panic rather than continue with an inconsistent database log.WithFields(log.Fields{ @@ -108,6 +110,9 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { } return } + + _ = msg.Ack() + case api.OutputTypeNewInboundPeek: if err := s.processInboundPeek(*output.NewInboundPeek); err != nil { log.WithFields(log.Fields{ @@ -116,10 +121,13 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { }).Panicf("roomserver output log: remote peek event failure") return } + _ = msg.Ack() + default: log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) + _ = msg.Ack() return } } diff --git a/syncapi/consumers/clientapi.go b/syncapi/consumers/clientapi.go index b5ae32a0..9f80f060 100644 --- a/syncapi/consumers/clientapi.go +++ b/syncapi/consumers/clientapi.go @@ -74,6 +74,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("client API server output log: message parse failure") sentry.CaptureException(err) + _ = msg.Nak() return } @@ -96,4 +97,6 @@ func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) { s.stream.Advance(streamPos) s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos}) + + _ = msg.Ack() } diff --git a/syncapi/consumers/eduserver_receipts.go b/syncapi/consumers/eduserver_receipts.go index f8aa7a5f..7b3e984a 100644 --- a/syncapi/consumers/eduserver_receipts.go +++ b/syncapi/consumers/eduserver_receipts.go @@ -70,6 +70,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU server output log: message parse failure") sentry.CaptureException(err) + _ = msg.Nak() return } @@ -88,4 +89,6 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) { s.stream.Advance(streamPos) s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos}) + + _ = msg.Ack() } diff --git a/syncapi/consumers/eduserver_sendtodevice.go b/syncapi/consumers/eduserver_sendtodevice.go index 54c13897..d8c43a85 100644 --- a/syncapi/consumers/eduserver_sendtodevice.go +++ b/syncapi/consumers/eduserver_sendtodevice.go @@ -74,15 +74,18 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU server output log: message parse failure") sentry.CaptureException(err) + _ = msg.Nak() return } _, domain, err := gomatrixserverlib.SplitID('@', output.UserID) if err != nil { sentry.CaptureException(err) + _ = msg.Nak() return } if domain != s.serverName { + _ = msg.Nak() return } @@ -108,4 +111,6 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) { []string{output.DeviceID}, types.StreamingToken{SendToDevicePosition: streamPos}, ) + + _ = msg.Ack() } diff --git a/syncapi/consumers/eduserver_typing.go b/syncapi/consumers/eduserver_typing.go index f6f63050..47c5da68 100644 --- a/syncapi/consumers/eduserver_typing.go +++ b/syncapi/consumers/eduserver_typing.go @@ -71,6 +71,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("EDU server output log: message parse failure") sentry.CaptureException(err) + _ = msg.Nak() return } @@ -94,4 +95,6 @@ func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) { s.stream.Advance(typingPos) s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos}) + + _ = msg.Ack() } diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index fbe4408f..c6e1c23a 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -83,6 +83,7 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { if err = json.Unmarshal(msg.Data, &output); err != nil { // If the message was invalid, log it and move on to the next message in the stream log.WithError(err).Errorf("roomserver output log: message parse failure") + _ = msg.Nak() return } @@ -115,10 +116,15 @@ func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) { log.WithField("type", output.Type).Debug( "roomserver output log: ignoring unknown output type", ) + _ = msg.Nak() } if err != nil { log.WithError(err).Error("roomserver output log: failed to process event") + _ = msg.Nak() + return } + + _ = msg.Ack() } func (s *OutputRoomEventConsumer) onRedactEvent(