diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 8e3d7d86..f1f6169d 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -276,6 +276,7 @@ NextPDU: } } +/* func fromStateTuples(tuples []gomatrixserverlib.StateKeyTuple, omitTuples []gomatrixserverlib.StateKeyTuple) (result []*gomatrixserverlib.HeaderedEvent) { NextTuple: for _, t := range tuples { @@ -291,6 +292,7 @@ NextTuple: } return } +*/ func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) { for _, g := range got { diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 6090e14d..c6abbae1 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -33,7 +33,6 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" ) @@ -67,8 +66,6 @@ func (r *Inputer) workerForRoom(roomID string) *phony.Inbox { return inbox.(*phony.Inbox) } -var eventIDsQueued sync.Map - // onMessage is called when a new event arrives in the roomserver input stream. func (r *Inputer) Start() error { _, err := r.JetStream.Subscribe( @@ -84,19 +81,10 @@ func (r *Inputer) Start() error { return } - eventID := inputRoomEvent.Event.EventID() - _, existed := eventIDsQueued.LoadOrStore(eventID, struct{}{}) - if existed { - logrus.Errorf("XXX: We already received event ID %s", eventID) - _ = msg.Term() - return - } - _ = msg.InProgress() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() r.workerForRoom(roomID).Act(nil, func() { _ = msg.InProgress() // resets the acknowledgement wait timer - defer eventIDsQueued.Delete(eventID) ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime) defer cancel() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()