Relax roomserver input transactional isolation (#2224)

* Don't force full transactional isolation on roomserver input

* Set succeeded

* Tweak `MissingAuthPrevEvents`
This commit is contained in:
Neil Alexander 2022-02-23 15:41:32 +00:00 committed by GitHub
parent b8a97b6ee0
commit fea8d152e7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 105 additions and 165 deletions

View file

@ -19,7 +19,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
@ -40,19 +39,6 @@ import (
"github.com/tidwall/gjson"
)
type retryAction int
type commitAction int
const (
doNotRetry retryAction = iota
retryLater
)
const (
commitTransaction commitAction = iota
rollbackTransaction
)
var keyContentFields = map[string]string{
"m.room.join_rules": "join_rule",
"m.room.history_visibility": "history_visibility",
@ -117,8 +103,7 @@ func (r *Inputer) Start() error {
_ = msg.InProgress() // resets the acknowledgement wait timer
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
action, err := r.processRoomEventUsingUpdater(r.ProcessContext.Context(), roomID, &inputRoomEvent)
if err != nil {
if err := r.processRoomEvent(r.ProcessContext.Context(), &inputRoomEvent); err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)
}
@ -127,11 +112,8 @@ func (r *Inputer) Start() error {
"event_id": inputRoomEvent.Event.EventID(),
"type": inputRoomEvent.Event.Type(),
}).Warn("Roomserver failed to process async event")
}
switch action {
case retryLater:
_ = msg.Nak()
case doNotRetry:
_ = msg.Term()
} else {
_ = msg.Ack()
}
})
@ -153,37 +135,6 @@ func (r *Inputer) Start() error {
return err
}
// processRoomEventUsingUpdater opens up a room updater and tries to
// process the event. It returns whether or not we should positively
// or negatively acknowledge the event (i.e. for NATS) and an error
// if it occurred.
func (r *Inputer) processRoomEventUsingUpdater(
ctx context.Context,
roomID string,
inputRoomEvent *api.InputRoomEvent,
) (retryAction, error) {
roomInfo, err := r.DB.RoomInfo(ctx, roomID)
if err != nil {
return doNotRetry, fmt.Errorf("r.DB.RoomInfo: %w", err)
}
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
if err != nil {
return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
}
action, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
switch action {
case commitTransaction:
if cerr := updater.Commit(); cerr != nil {
return retryLater, fmt.Errorf("updater.Commit: %w", cerr)
}
case rollbackTransaction:
if rerr := updater.Rollback(); rerr != nil {
return retryLater, fmt.Errorf("updater.Rollback: %w", rerr)
}
}
return doNotRetry, err
}
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
ctx context.Context,
@ -230,7 +181,7 @@ func (r *Inputer) InputRoomEvents(
worker.Act(nil, func() {
defer eventsInProgress.Delete(index)
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
_, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent)
err := r.processRoomEvent(ctx, &inputRoomEvent)
if err != nil {
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
sentry.CaptureException(err)