More tweaks

This commit is contained in:
Neil Alexander 2022-01-24 13:35:04 +00:00
parent bcda9729e4
commit da78cd909c
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 2 additions and 12 deletions

View file

@ -276,6 +276,7 @@ NextPDU:
} }
} }
/*
func fromStateTuples(tuples []gomatrixserverlib.StateKeyTuple, omitTuples []gomatrixserverlib.StateKeyTuple) (result []*gomatrixserverlib.HeaderedEvent) { func fromStateTuples(tuples []gomatrixserverlib.StateKeyTuple, omitTuples []gomatrixserverlib.StateKeyTuple) (result []*gomatrixserverlib.HeaderedEvent) {
NextTuple: NextTuple:
for _, t := range tuples { for _, t := range tuples {
@ -291,6 +292,7 @@ NextTuple:
} }
return return
} }
*/
func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) { func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []*gomatrixserverlib.HeaderedEvent) {
for _, g := range got { for _, g := range got {

View file

@ -33,7 +33,6 @@ import (
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson" "github.com/tidwall/gjson"
) )
@ -67,8 +66,6 @@ func (r *Inputer) workerForRoom(roomID string) *phony.Inbox {
return inbox.(*phony.Inbox) return inbox.(*phony.Inbox)
} }
var eventIDsQueued sync.Map
// onMessage is called when a new event arrives in the roomserver input stream. // onMessage is called when a new event arrives in the roomserver input stream.
func (r *Inputer) Start() error { func (r *Inputer) Start() error {
_, err := r.JetStream.Subscribe( _, err := r.JetStream.Subscribe(
@ -84,19 +81,10 @@ func (r *Inputer) Start() error {
return return
} }
eventID := inputRoomEvent.Event.EventID()
_, existed := eventIDsQueued.LoadOrStore(eventID, struct{}{})
if existed {
logrus.Errorf("XXX: We already received event ID %s", eventID)
_ = msg.Term()
return
}
_ = msg.InProgress() _ = msg.InProgress()
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc() roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
r.workerForRoom(roomID).Act(nil, func() { r.workerForRoom(roomID).Act(nil, func() {
_ = msg.InProgress() // resets the acknowledgement wait timer _ = msg.InProgress() // resets the acknowledgement wait timer
defer eventIDsQueued.Delete(eventID)
ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime) ctx, cancel := context.WithTimeout(context.Background(), MaximumProcessingTime)
defer cancel() defer cancel()
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec() defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()