diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 27809e54..7d58b80b 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" ) @@ -66,6 +67,8 @@ func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { return inbox.(*phony.Inbox) } +var eventIDsQueued sync.Map + // onMessage is called when a new event arrives in the roomserver input stream. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( @@ -80,6 +83,16 @@ func (r *Inputer) Start() error { _ = msg.Term() return } + + eventID := inputRoomEvent.Event.EventID() + _, existed := eventIDsQueued.LoadOrStore(eventID, struct{}{}) + if existed { + logrus.Errorf("XXX: We already received event ID %s", eventID) + _ = msg.Term() + return + } + defer eventIDsQueued.Delete(eventID) + roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() worker := r.workerForRoom(roomID) worker.Act(nil, func() {