Detect dupes from NATS

This commit is contained in:
Neil Alexander 2022-01-24 12:48:36 +00:00
parent 9ddb8749c1
commit 0308cebc6a
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
@ -66,6 +67,8 @@ func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
return inbox.(*phony.Inbox) return inbox.(*phony.Inbox)
} }
var eventIDsQueued sync.Map
// onMessage is called when a new event arrives in the roomserver input stream. // onMessage is called when a new event arrives in the roomserver input stream.
func (r *Inputer) Start() error { func (r *Inputer) Start() error {
_, err := r.JetStream.Subscribe( _, err := r.JetStream.Subscribe(
@ -80,6 +83,16 @@ func (r *Inputer) Start() error {
_ = msg.Term() _ = msg.Term()
return return
} }
eventID := inputRoomEvent.Event.EventID()
_, existed := eventIDsQueued.LoadOrStore(eventID, struct{}{})
if existed {
logrus.Errorf("XXX: We already received event ID %s", eventID)
_ = msg.Term()
return
}
defer eventIDsQueued.Delete(eventID)
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
worker := r.workerForRoom(roomID) worker := r.workerForRoom(roomID)
worker.Act(nil, func() { worker.Act(nil, func() {