mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Full roomserver input transactional isolation (#2141)
* Add transaction to all database tables in roomserver, rename latest events updater to room updater, use room updater for all RS input * Better transaction management * Tweak order * Handle cases where the room does not exist * Other fixes * More tweaks * Fill some gaps * Fill in the gaps * good lord it gets worse * Don't roll back transactions when events rejected * Pass through errors properly * Fix bugs * Fix incorrect error check * Don't panic on nil txns * Tweaks * Hopefully fix panics for good in SQLite this time * Fix rollback * Minor bug fixes with latest event updater * Some review comments * Revert "Some review comments" This reverts commit 0caf8cf53e62c33f7b83c52e9df1d963871f751e. * Fix a couple of bugs * Clearer commit and rollback results * Remove unnecessary prepares
This commit is contained in:
parent
4d9f5b2e57
commit
eb352a5f6b
35 changed files with 867 additions and 499 deletions
|
@ -20,17 +20,22 @@ import (
|
|||
"sort"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
type checkForAuthAndSoftFailStorage interface {
|
||||
state.StateResolutionStorage
|
||||
StateEntriesForEventIDs(ctx context.Context, eventIDs []string) ([]types.StateEntry, error)
|
||||
RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo, error)
|
||||
}
|
||||
|
||||
// CheckForSoftFail returns true if the event should be soft-failed
|
||||
// and false otherwise. The return error value should be checked before
|
||||
// the soft-fail bool.
|
||||
func CheckForSoftFail(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
db checkForAuthAndSoftFailStorage,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
stateEventIDs []string,
|
||||
) (bool, error) {
|
||||
|
@ -92,7 +97,7 @@ func CheckForSoftFail(
|
|||
// Returns the numeric IDs for the auth events.
|
||||
func CheckAuthEvents(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
db checkForAuthAndSoftFailStorage,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
authEventIDs []string,
|
||||
) ([]types.EventNID, error) {
|
||||
|
@ -193,7 +198,7 @@ func (ae *authEvents) lookupEvent(typeNID types.EventTypeNID, stateKey string) *
|
|||
// loadAuthEvents loads the events needed for authentication from the supplied room state.
|
||||
func loadAuthEvents(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
db state.StateResolutionStorage,
|
||||
needed gomatrixserverlib.StateNeeded,
|
||||
state []types.StateEntry,
|
||||
) (result authEvents, err error) {
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -38,6 +39,19 @@ import (
|
|||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
type retryAction int
|
||||
type commitAction int
|
||||
|
||||
const (
|
||||
doNotRetry retryAction = iota
|
||||
retryLater
|
||||
)
|
||||
|
||||
const (
|
||||
commitTransaction commitAction = iota
|
||||
rollbackTransaction
|
||||
)
|
||||
|
||||
var keyContentFields = map[string]string{
|
||||
"m.room.join_rules": "join_rule",
|
||||
"m.room.history_visibility": "history_visibility",
|
||||
|
@ -101,7 +115,8 @@ func (r *Inputer) Start() error {
|
|||
_ = msg.InProgress() // resets the acknowledgement wait timer
|
||||
defer eventsInProgress.Delete(index)
|
||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||
if err := r.processRoomEvent(context.Background(), &inputRoomEvent); err != nil {
|
||||
action, err := r.processRoomEventUsingUpdater(context.Background(), roomID, &inputRoomEvent)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
@ -111,7 +126,12 @@ func (r *Inputer) Start() error {
|
|||
"type": inputRoomEvent.Event.Type(),
|
||||
}).Warn("Roomserver failed to process async event")
|
||||
}
|
||||
_ = msg.Ack()
|
||||
switch action {
|
||||
case retryLater:
|
||||
_ = msg.Nak()
|
||||
case doNotRetry:
|
||||
_ = msg.Ack()
|
||||
}
|
||||
})
|
||||
},
|
||||
// NATS wants to acknowledge automatically by default when the message is
|
||||
|
@ -131,6 +151,37 @@ func (r *Inputer) Start() error {
|
|||
return err
|
||||
}
|
||||
|
||||
// processRoomEventUsingUpdater opens up a room updater and tries to
|
||||
// process the event. It returns whether or not we should positively
|
||||
// or negatively acknowledge the event (i.e. for NATS) and an error
|
||||
// if it occurred.
|
||||
func (r *Inputer) processRoomEventUsingUpdater(
|
||||
ctx context.Context,
|
||||
roomID string,
|
||||
inputRoomEvent *api.InputRoomEvent,
|
||||
) (retryAction, error) {
|
||||
roomInfo, err := r.DB.RoomInfo(ctx, roomID)
|
||||
if err != nil {
|
||||
return doNotRetry, fmt.Errorf("r.DB.RoomInfo: %w", err)
|
||||
}
|
||||
updater, err := r.DB.GetRoomUpdater(ctx, roomInfo)
|
||||
if err != nil {
|
||||
return retryLater, fmt.Errorf("r.DB.GetRoomUpdater: %w", err)
|
||||
}
|
||||
action, err := r.processRoomEvent(ctx, updater, inputRoomEvent)
|
||||
switch action {
|
||||
case commitTransaction:
|
||||
if cerr := updater.Commit(); cerr != nil {
|
||||
return retryLater, fmt.Errorf("updater.Commit: %w", cerr)
|
||||
}
|
||||
case rollbackTransaction:
|
||||
if rerr := updater.Rollback(); rerr != nil {
|
||||
return retryLater, fmt.Errorf("updater.Rollback: %w", rerr)
|
||||
}
|
||||
}
|
||||
return doNotRetry, err
|
||||
}
|
||||
|
||||
// InputRoomEvents implements api.RoomserverInternalAPI
|
||||
func (r *Inputer) InputRoomEvents(
|
||||
ctx context.Context,
|
||||
|
@ -177,7 +228,7 @@ func (r *Inputer) InputRoomEvents(
|
|||
worker.Act(nil, func() {
|
||||
defer eventsInProgress.Delete(index)
|
||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||
err := r.processRoomEvent(ctx, &inputRoomEvent)
|
||||
_, err := r.processRoomEventUsingUpdater(ctx, roomID, &inputRoomEvent)
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) {
|
||||
sentry.CaptureException(err)
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/helpers"
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
|
@ -67,14 +68,15 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
|
|||
// nolint:gocyclo
|
||||
func (r *Inputer) processRoomEvent(
|
||||
ctx context.Context,
|
||||
updater *shared.RoomUpdater,
|
||||
input *api.InputRoomEvent,
|
||||
) (err error) {
|
||||
) (commitAction, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Before we do anything, make sure the context hasn't expired for this pending task.
|
||||
// If it has then we'll give up straight away — it's probably a synchronous input
|
||||
// request and the caller has already given up, but the inbox task was still queued.
|
||||
return context.DeadlineExceeded
|
||||
return rollbackTransaction, context.DeadlineExceeded
|
||||
default:
|
||||
}
|
||||
|
||||
|
@ -107,7 +109,7 @@ func (r *Inputer) processRoomEvent(
|
|||
// if we have already got this event then do not process it again, if the input kind is an outlier.
|
||||
// Outliers contain no extra information which may warrant a re-processing.
|
||||
if input.Kind == api.KindOutlier {
|
||||
evs, err2 := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
|
||||
evs, err2 := updater.EventsFromIDs(ctx, []string{event.EventID()})
|
||||
if err2 == nil && len(evs) == 1 {
|
||||
// check hash matches if we're on early room versions where the event ID was a random string
|
||||
idFormat, err2 := headered.RoomVersion.EventIDFormat()
|
||||
|
@ -116,11 +118,11 @@ func (r *Inputer) processRoomEvent(
|
|||
case gomatrixserverlib.EventIDFormatV1:
|
||||
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return nil
|
||||
return rollbackTransaction, nil
|
||||
}
|
||||
default:
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return nil
|
||||
return rollbackTransaction, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -134,8 +136,8 @@ func (r *Inputer) processRoomEvent(
|
|||
AuthEventIDs: event.AuthEventIDs(),
|
||||
PrevEventIDs: event.PrevEventIDs(),
|
||||
}
|
||||
if err = r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
|
||||
return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
|
||||
return rollbackTransaction, fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
}
|
||||
}
|
||||
missingAuth := len(missingRes.MissingAuthEventIDs) > 0
|
||||
|
@ -146,8 +148,8 @@ func (r *Inputer) processRoomEvent(
|
|||
RoomID: event.RoomID(),
|
||||
ExcludeSelf: true,
|
||||
}
|
||||
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||
return rollbackTransaction, fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
}
|
||||
// Sort all of the servers into a map so that we can randomise
|
||||
// their order. Then make sure that the input origin and the
|
||||
|
@ -176,8 +178,8 @@ func (r *Inputer) processRoomEvent(
|
|||
isRejected := false
|
||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||
knownEvents := map[string]*types.Event{}
|
||||
if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||
return fmt.Errorf("r.fetchAuthEvents: %w", err)
|
||||
if err := r.fetchAuthEvents(ctx, updater, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||
return rollbackTransaction, fmt.Errorf("r.fetchAuthEvents: %w", err)
|
||||
}
|
||||
|
||||
// Check if the event is allowed by its auth events. If it isn't then
|
||||
|
@ -193,7 +195,7 @@ func (r *Inputer) processRoomEvent(
|
|||
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
|
||||
for _, authEventID := range authEventIDs {
|
||||
if _, ok := knownEvents[authEventID]; !ok {
|
||||
return fmt.Errorf("missing auth event %s", authEventID)
|
||||
return rollbackTransaction, fmt.Errorf("missing auth event %s", authEventID)
|
||||
}
|
||||
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
|
||||
}
|
||||
|
@ -202,7 +204,8 @@ func (r *Inputer) processRoomEvent(
|
|||
if input.Kind == api.KindNew {
|
||||
// Check that the event passes authentication checks based on the
|
||||
// current room state.
|
||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||
var err error
|
||||
softfail, err = helpers.CheckForSoftFail(ctx, updater, headered, input.StateEventIDs)
|
||||
if err != nil {
|
||||
logger.WithError(err).Warn("Error authing soft-failed event")
|
||||
}
|
||||
|
@ -227,7 +230,7 @@ func (r *Inputer) processRoomEvent(
|
|||
origin: input.Origin,
|
||||
inputer: r,
|
||||
queryer: r.Queryer,
|
||||
db: r.DB,
|
||||
db: updater,
|
||||
federation: r.FSAPI,
|
||||
keys: r.KeyRing,
|
||||
roomsMu: internal.NewMutexByRoom(),
|
||||
|
@ -235,7 +238,7 @@ func (r *Inputer) processRoomEvent(
|
|||
hadEvents: map[string]bool{},
|
||||
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
|
||||
}
|
||||
if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
|
||||
if err := missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
|
||||
isRejected = true
|
||||
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
|
||||
} else {
|
||||
|
@ -248,16 +251,16 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
|
||||
// Store the event.
|
||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := updater.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
return rollbackTransaction, fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// if storing this event results in it being redacted then do so.
|
||||
if !isRejected && redactedEventID == event.EventID() {
|
||||
r, rerr := eventutil.RedactEvent(redactionEvent, event)
|
||||
if rerr != nil {
|
||||
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
return rollbackTransaction, fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
}
|
||||
event = r
|
||||
}
|
||||
|
@ -268,23 +271,23 @@ func (r *Inputer) processRoomEvent(
|
|||
if input.Kind == api.KindOutlier {
|
||||
logger.Debug("Stored outlier")
|
||||
hooks.Run(hooks.KindNewEventPersisted, headered)
|
||||
return nil
|
||||
return commitTransaction, nil
|
||||
}
|
||||
|
||||
roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID())
|
||||
roomInfo, err := updater.RoomInfo(ctx, event.RoomID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.RoomInfo: %w", err)
|
||||
return rollbackTransaction, fmt.Errorf("updater.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil {
|
||||
return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
|
||||
return rollbackTransaction, fmt.Errorf("updater.RoomInfo missing for room %s", event.RoomID())
|
||||
}
|
||||
|
||||
if !missingPrev && stateAtEvent.BeforeStateSnapshotNID == 0 {
|
||||
// We haven't calculated a state for this event yet.
|
||||
// Lets calculate one.
|
||||
err = r.calculateAndSetState(ctx, input, roomInfo, &stateAtEvent, event, isRejected)
|
||||
err = r.calculateAndSetState(ctx, updater, input, roomInfo, &stateAtEvent, event, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
return rollbackTransaction, fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,13 +297,14 @@ func (r *Inputer) processRoomEvent(
|
|||
"soft_fail": softfail,
|
||||
"missing_prev": missingPrev,
|
||||
}).Warn("Stored rejected event")
|
||||
return rejectionErr
|
||||
return commitTransaction, rejectionErr
|
||||
}
|
||||
|
||||
switch input.Kind {
|
||||
case api.KindNew:
|
||||
if err = r.updateLatestEvents(
|
||||
ctx, // context
|
||||
updater, // room updater
|
||||
roomInfo, // room info for the room being updated
|
||||
stateAtEvent, // state at event (below)
|
||||
event, // event
|
||||
|
@ -308,7 +312,7 @@ func (r *Inputer) processRoomEvent(
|
|||
input.TransactionID, // transaction ID
|
||||
input.HasState, // rewrites state?
|
||||
); err != nil {
|
||||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
return rollbackTransaction, fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
}
|
||||
case api.KindOld:
|
||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
||||
|
@ -320,7 +324,7 @@ func (r *Inputer) processRoomEvent(
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||
return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -339,14 +343,14 @@ func (r *Inputer) processRoomEvent(
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||
return rollbackTransaction, fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Everything was OK — the latest events updater didn't error and
|
||||
// we've sent output events. Finally, generate a hook call.
|
||||
hooks.Run(hooks.KindNewEventPersisted, headered)
|
||||
return nil
|
||||
return commitTransaction, nil
|
||||
}
|
||||
|
||||
// fetchAuthEvents will check to see if any of the
|
||||
|
@ -358,6 +362,7 @@ func (r *Inputer) processRoomEvent(
|
|||
// they are now in the database.
|
||||
func (r *Inputer) fetchAuthEvents(
|
||||
ctx context.Context,
|
||||
updater *shared.RoomUpdater,
|
||||
logger *logrus.Entry,
|
||||
event *gomatrixserverlib.HeaderedEvent,
|
||||
auth *gomatrixserverlib.AuthEvents,
|
||||
|
@ -375,7 +380,7 @@ func (r *Inputer) fetchAuthEvents(
|
|||
}
|
||||
|
||||
for _, authEventID := range authEventIDs {
|
||||
authEvents, err := r.DB.EventsFromIDs(ctx, []string{authEventID})
|
||||
authEvents, err := updater.EventsFromIDs(ctx, []string{authEventID})
|
||||
if err != nil || len(authEvents) == 0 || authEvents[0].Event == nil {
|
||||
unknown[authEventID] = struct{}{}
|
||||
continue
|
||||
|
@ -454,9 +459,9 @@ func (r *Inputer) fetchAuthEvents(
|
|||
}
|
||||
|
||||
// Finally, store the event in the database.
|
||||
eventNID, _, _, _, _, err := r.DB.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
||||
eventNID, _, _, _, _, err := updater.StoreEvent(ctx, authEvent, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
return fmt.Errorf("updater.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// Now we know about this event, it was stored and the signatures were OK.
|
||||
|
@ -471,6 +476,7 @@ func (r *Inputer) fetchAuthEvents(
|
|||
|
||||
func (r *Inputer) calculateAndSetState(
|
||||
ctx context.Context,
|
||||
updater *shared.RoomUpdater,
|
||||
input *api.InputRoomEvent,
|
||||
roomInfo *types.RoomInfo,
|
||||
stateAtEvent *types.StateAtEvent,
|
||||
|
@ -478,14 +484,14 @@ func (r *Inputer) calculateAndSetState(
|
|||
isRejected bool,
|
||||
) error {
|
||||
var err error
|
||||
roomState := state.NewStateResolution(r.DB, roomInfo)
|
||||
roomState := state.NewStateResolution(updater, roomInfo)
|
||||
|
||||
if input.HasState {
|
||||
// Check here if we think we're in the room already.
|
||||
stateAtEvent.Overwrite = true
|
||||
var joinEventNIDs []types.EventNID
|
||||
// Request join memberships only for local users only.
|
||||
if joinEventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, true); err == nil {
|
||||
if joinEventNIDs, err = updater.GetMembershipEventNIDsForRoom(ctx, roomInfo.RoomNID, true, true); err == nil {
|
||||
// If we have no local users that are joined to the room then any state about
|
||||
// the room that we have is quite possibly out of date. Therefore in that case
|
||||
// we should overwrite it rather than merge it.
|
||||
|
@ -495,13 +501,13 @@ func (r *Inputer) calculateAndSetState(
|
|||
// We've been told what the state at the event is so we don't need to calculate it.
|
||||
// Check that those state events are in the database and store the state.
|
||||
var entries []types.StateEntry
|
||||
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
|
||||
return fmt.Errorf("r.DB.StateEntriesForEventIDs: %w", err)
|
||||
if entries, err = updater.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
|
||||
return fmt.Errorf("updater.StateEntriesForEventIDs: %w", err)
|
||||
}
|
||||
entries = types.DeduplicateStateEntries(entries)
|
||||
|
||||
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
|
||||
return fmt.Errorf("r.DB.AddState: %w", err)
|
||||
if stateAtEvent.BeforeStateSnapshotNID, err = updater.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
|
||||
return fmt.Errorf("updater.AddState: %w", err)
|
||||
}
|
||||
} else {
|
||||
stateAtEvent.Overwrite = false
|
||||
|
@ -512,7 +518,7 @@ func (r *Inputer) calculateAndSetState(
|
|||
}
|
||||
}
|
||||
|
||||
err = r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
|
||||
err = updater.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.SetState: %w", err)
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
|
@ -48,6 +47,7 @@ import (
|
|||
// Can only be called once at a time
|
||||
func (r *Inputer) updateLatestEvents(
|
||||
ctx context.Context,
|
||||
updater *shared.RoomUpdater,
|
||||
roomInfo *types.RoomInfo,
|
||||
stateAtEvent types.StateAtEvent,
|
||||
event *gomatrixserverlib.Event,
|
||||
|
@ -55,13 +55,6 @@ func (r *Inputer) updateLatestEvents(
|
|||
transactionID *api.TransactionID,
|
||||
rewritesState bool,
|
||||
) (err error) {
|
||||
updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.GetLatestEventsForUpdate: %w", err)
|
||||
}
|
||||
succeeded := false
|
||||
defer sqlutil.EndTransactionWithCheck(updater, &succeeded, &err)
|
||||
|
||||
u := latestEventsUpdater{
|
||||
ctx: ctx,
|
||||
api: r,
|
||||
|
@ -78,7 +71,6 @@ func (r *Inputer) updateLatestEvents(
|
|||
return fmt.Errorf("u.doUpdateLatestEvents: %w", err)
|
||||
}
|
||||
|
||||
succeeded = true
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -89,7 +81,7 @@ func (r *Inputer) updateLatestEvents(
|
|||
type latestEventsUpdater struct {
|
||||
ctx context.Context
|
||||
api *Inputer
|
||||
updater *shared.LatestEventsUpdater
|
||||
updater *shared.RoomUpdater
|
||||
roomInfo *types.RoomInfo
|
||||
stateAtEvent types.StateAtEvent
|
||||
event *gomatrixserverlib.Event
|
||||
|
@ -199,7 +191,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
|||
|
||||
func (u *latestEventsUpdater) latestState() error {
|
||||
var err error
|
||||
roomState := state.NewStateResolution(u.api.DB, u.roomInfo)
|
||||
roomState := state.NewStateResolution(u.updater, u.roomInfo)
|
||||
|
||||
// Work out if the state at the extremities has actually changed
|
||||
// or not. If they haven't then we won't bother doing all of the
|
||||
|
@ -413,7 +405,7 @@ func (u *latestEventsUpdater) extraEventsForIDs(roomVersion gomatrixserverlib.Ro
|
|||
if len(extraEventIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
extraEvents, err := u.api.DB.EventsFromIDs(u.ctx, extraEventIDs)
|
||||
extraEvents, err := u.updater.EventsFromIDs(u.ctx, extraEventIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -436,7 +428,7 @@ func (u *latestEventsUpdater) stateEventMap() (map[types.EventNID]string, error)
|
|||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||
}
|
||||
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
|
||||
return u.api.DB.EventIDs(u.ctx, stateEventNIDs)
|
||||
return u.updater.EventIDs(u.ctx, stateEventNIDs)
|
||||
}
|
||||
|
||||
type eventNIDSorter []types.EventNID
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
// consumers about the invites added or retired by the change in current state.
|
||||
func (r *Inputer) updateMemberships(
|
||||
ctx context.Context,
|
||||
updater *shared.LatestEventsUpdater,
|
||||
updater *shared.RoomUpdater,
|
||||
removed, added []types.StateEntry,
|
||||
) ([]api.OutputEvent, error) {
|
||||
changes := membershipChanges(removed, added)
|
||||
|
@ -79,7 +79,7 @@ func (r *Inputer) updateMemberships(
|
|||
}
|
||||
|
||||
func (r *Inputer) updateMembership(
|
||||
updater *shared.LatestEventsUpdater,
|
||||
updater *shared.RoomUpdater,
|
||||
targetUserNID types.EventStateKeyNID,
|
||||
remove, add *gomatrixserverlib.Event,
|
||||
updates []api.OutputEvent,
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
@ -19,7 +19,7 @@ import (
|
|||
|
||||
type missingStateReq struct {
|
||||
origin gomatrixserverlib.ServerName
|
||||
db storage.Database
|
||||
db *shared.RoomUpdater
|
||||
inputer *Inputer
|
||||
queryer *query.Queryer
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
|
@ -78,7 +78,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// we can just inject all the newEvents as new as we may have only missed 1 or 2 events and have filled
|
||||
// in the gap in the DAG
|
||||
for _, newEvent := range newEvents {
|
||||
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
Kind: api.KindNew,
|
||||
Event: newEvent.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
@ -187,7 +187,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
}
|
||||
// TODO: we could do this concurrently?
|
||||
for _, ire := range outlierRoomEvents {
|
||||
if err = t.inputer.processRoomEvent(ctx, &ire); err != nil {
|
||||
if _, err = t.inputer.processRoomEvent(ctx, t.db, &ire); err != nil {
|
||||
return fmt.Errorf("t.inputer.processRoomEvent[outlier]: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -200,7 +200,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
stateIDs = append(stateIDs, event.EventID())
|
||||
}
|
||||
|
||||
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
Kind: api.KindOld,
|
||||
Event: backwardsExtremity.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
@ -217,7 +217,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// they will automatically fast-forward based on the room state at the
|
||||
// extremity in the last step.
|
||||
for _, newEvent := range newEvents {
|
||||
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
_, err = t.inputer.processRoomEvent(ctx, t.db, &api.InputRoomEvent{
|
||||
Kind: api.KindOld,
|
||||
Event: newEvent.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue