Use memberships to determine whether to reset latest events/state on room join (#1047)

* Track local/remote memberships, re-scope some input stuff

* Check if we're in the room already before resetting latest events/state

* Fix postgres, fix lint

* Review comments
This commit is contained in:
Neil Alexander 2020-05-20 18:03:06 +01:00 committed by GitHub
parent 6091bf044f
commit f2c07437fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 162 additions and 100 deletions

View file

@ -60,7 +60,7 @@ func (r *RoomserverInternalAPI) InputRoomEvents(
defer r.mutex.Unlock()
for i := range request.InputInviteEvents {
var loopback *api.InputRoomEvent
if loopback, err = processInviteEvent(ctx, r.DB, r, request.InputInviteEvents[i]); err != nil {
if loopback, err = r.processInviteEvent(ctx, r, request.InputInviteEvents[i]); err != nil {
return err
}
// The processInviteEvent function can optionally return a
@ -71,7 +71,7 @@ func (r *RoomserverInternalAPI) InputRoomEvents(
}
}
for i := range request.InputRoomEvents {
if response.EventID, err = processRoomEvent(ctx, r.DB, r, request.InputRoomEvents[i]); err != nil {
if response.EventID, err = r.processRoomEvent(ctx, request.InputRoomEvents[i]); err != nil {
return err
}
}

View file

@ -31,21 +31,13 @@ import (
log "github.com/sirupsen/logrus"
)
// OutputRoomEventWriter has the APIs needed to write an event to the output logs.
type OutputRoomEventWriter interface {
// Write a list of events for a room
WriteOutputEvents(roomID string, updates []api.OutputEvent) error
}
// processRoomEvent can only be called once at a time
//
// TODO(#375): This should be rewritten to allow concurrent calls. The
// difficulty is in ensuring that we correctly annotate events with the correct
// state deltas when sending to kafka streams
func processRoomEvent(
func (r *RoomserverInternalAPI) processRoomEvent(
ctx context.Context,
db storage.Database,
ow OutputRoomEventWriter,
input api.InputRoomEvent,
) (eventID string, err error) {
// Parse and validate the event JSON
@ -54,7 +46,7 @@ func processRoomEvent(
// Check that the event passes authentication checks and work out
// the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs)
authEventNIDs, err := checkAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
if err != nil {
logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
return
@ -63,7 +55,7 @@ func processRoomEvent(
// If we don't have a transaction ID then get one.
if input.TransactionID != nil {
tdID := input.TransactionID
eventID, err = db.GetTransactionEventID(
eventID, err = r.DB.GetTransactionEventID(
ctx, tdID.TransactionID, tdID.SessionID, event.Sender(),
)
// On error OR event with the transaction already processed/processesing
@ -73,7 +65,7 @@ func processRoomEvent(
}
// Store the event.
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
roomNID, stateAtEvent, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil {
return
}
@ -93,16 +85,14 @@ func processRoomEvent(
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event)
err = r.calculateAndSetState(ctx, input, roomNID, &stateAtEvent, event)
if err != nil {
return
}
}
if err = updateLatestEvents(
if err = r.updateLatestEvents(
ctx, // context
db, // roomserver database
ow, // output event writer
roomNID, // room NID to update
stateAtEvent, // state at event (below)
event, // event
@ -116,29 +106,36 @@ func processRoomEvent(
return event.EventID(), nil
}
func calculateAndSetState(
func (r *RoomserverInternalAPI) calculateAndSetState(
ctx context.Context,
db storage.Database,
input api.InputRoomEvent,
roomNID types.RoomNID,
stateAtEvent *types.StateAtEvent,
event gomatrixserverlib.Event,
) error {
var err error
roomState := state.NewStateResolution(db)
roomState := state.NewStateResolution(r.DB)
if input.HasState {
// TODO: Check here if we think we're in the room already.
// Check here if we think we're in the room already.
stateAtEvent.Overwrite = true
var joinEventNIDs []types.EventNID
// Request join memberships only for local users only.
if joinEventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true, true); err == nil {
// If we have no local users that are joined to the room then any state about
// the room that we have is quite possibly out of date. Therefore in that case
// we should overwrite it rather than merge it.
stateAtEvent.Overwrite = len(joinEventNIDs) == 0
}
// We've been told what the state at the event is so we don't need to calculate it.
// Check that those state events are in the database and store the state.
var entries []types.StateEntry
if entries, err = db.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
return err
}
if stateAtEvent.BeforeStateSnapshotNID, err = db.AddState(ctx, roomNID, nil, entries); err != nil {
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
return err
}
} else {
@ -149,12 +146,11 @@ func calculateAndSetState(
return err
}
}
return db.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
}
func processInviteEvent(
func (r *RoomserverInternalAPI) processInviteEvent(
ctx context.Context,
db storage.Database,
ow *RoomserverInternalAPI,
input api.InputInviteEvent,
) (*api.InputRoomEvent, error) {
@ -172,7 +168,10 @@ func processInviteEvent(
"target_user_id": targetUserID,
}).Info("processing invite event")
updater, err := db.MembershipUpdater(ctx, roomID, targetUserID, input.RoomVersion)
_, domain, _ := gomatrixserverlib.SplitID('@', targetUserID)
isTargetLocalUser := domain == r.Cfg.Matrix.ServerName
updater, err := r.DB.MembershipUpdater(ctx, roomID, targetUserID, isTargetLocalUser, input.RoomVersion)
if err != nil {
return nil, err
}
@ -239,7 +238,7 @@ func processInviteEvent(
// up from local data (which is most likely to be if the event came
// from the CS API). If we know about the room then we can insert
// the invite room state, if we don't then we just fail quietly.
if irs, ierr := buildInviteStrippedState(ctx, db, input); ierr == nil {
if irs, ierr := buildInviteStrippedState(ctx, r.DB, input); ierr == nil {
if err = event.SetUnsignedField("invite_room_state", irs); err != nil {
return nil, err
}

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -46,17 +45,15 @@ import (
// 7 <----- latest
//
// Can only be called once at a time
func updateLatestEvents(
func (r *RoomserverInternalAPI) updateLatestEvents(
ctx context.Context,
db storage.Database,
ow OutputRoomEventWriter,
roomNID types.RoomNID,
stateAtEvent types.StateAtEvent,
event gomatrixserverlib.Event,
sendAsServer string,
transactionID *api.TransactionID,
) (err error) {
updater, err := db.GetLatestEventsForUpdate(ctx, roomNID)
updater, err := r.DB.GetLatestEventsForUpdate(ctx, roomNID)
if err != nil {
return
}
@ -70,9 +67,8 @@ func updateLatestEvents(
u := latestEventsUpdater{
ctx: ctx,
db: db,
api: r,
updater: updater,
ow: ow,
roomNID: roomNID,
stateAtEvent: stateAtEvent,
event: event,
@ -94,9 +90,8 @@ func updateLatestEvents(
// when there are so many variables to pass around.
type latestEventsUpdater struct {
ctx context.Context
db storage.Database
api *RoomserverInternalAPI
updater types.RoomRecentEventsUpdater
ow OutputRoomEventWriter
roomNID types.RoomNID
stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event
@ -181,7 +176,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// If we need to generate any output events then here's where we do it.
// TODO: Move this!
updates, err := updateMemberships(u.ctx, u.db, u.updater, u.removed, u.added)
updates, err := u.api.updateMemberships(u.ctx, u.updater, u.removed, u.added)
if err != nil {
return err
}
@ -200,7 +195,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
// send the event asynchronously but we would need to ensure that 1) the events are written to the log in
// the correct order, 2) that pending writes are resent across restarts. In order to avoid writing all the
// necessary bookkeeping we'll keep the event sending synchronous for now.
if err = u.ow.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
if err = u.api.WriteOutputEvents(u.event.RoomID(), updates); err != nil {
return err
}
@ -213,7 +208,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
func (u *latestEventsUpdater) latestState() error {
var err error
roomState := state.NewStateResolution(u.db)
roomState := state.NewStateResolution(u.api.DB)
// Get a list of the current latest events.
latestStateAtEvents := make([]types.StateAtEvent, len(u.latest))
@ -303,7 +298,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID
}
roomVersion, err := u.db.GetRoomVersionForRoom(u.ctx, u.event.RoomID())
roomVersion, err := u.api.DB.GetRoomVersionForRoom(u.ctx, u.event.RoomID())
if err != nil {
return nil, err
}
@ -329,7 +324,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
}
stateEventNIDs = stateEventNIDs[:util.SortAndUnique(eventNIDSorter(stateEventNIDs))]
eventIDMap, err := u.db.EventIDs(u.ctx, stateEventNIDs)
eventIDMap, err := u.api.DB.EventIDs(u.ctx, stateEventNIDs)
if err != nil {
return nil, err
}

View file

@ -19,7 +19,6 @@ import (
"fmt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)
@ -28,9 +27,8 @@ import (
// user affected by a change in the current state of the room.
// Returns a list of output events to write to the kafka log to inform the
// consumers about the invites added or retired by the change in current state.
func updateMemberships(
func (r *RoomserverInternalAPI) updateMemberships(
ctx context.Context,
db storage.Database,
updater types.RoomRecentEventsUpdater,
removed, added []types.StateEntry,
) ([]api.OutputEvent, error) {
@ -48,7 +46,7 @@ func updateMemberships(
// Load the event JSON so we can look up the "membership" key.
// TODO: Maybe add a membership key to the events table so we can load that
// key without having to load the entire event JSON?
events, err := db.Events(ctx, eventNIDs)
events, err := r.DB.Events(ctx, eventNIDs)
if err != nil {
return nil, err
}
@ -71,15 +69,16 @@ func updateMemberships(
ae = &ev.Event
}
}
if updates, err = updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
if updates, err = r.updateMembership(updater, targetUserNID, re, ae, updates); err != nil {
return nil, err
}
}
return updates, nil
}
func updateMembership(
updater types.RoomRecentEventsUpdater, targetUserNID types.EventStateKeyNID,
func (r *RoomserverInternalAPI) updateMembership(
updater types.RoomRecentEventsUpdater,
targetUserNID types.EventStateKeyNID,
remove, add *gomatrixserverlib.Event,
updates []api.OutputEvent,
) ([]api.OutputEvent, error) {
@ -113,7 +112,7 @@ func updateMembership(
return updates, nil
}
mu, err := updater.MembershipUpdater(targetUserNID)
mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add))
if err != nil {
return nil, err
}
@ -132,6 +131,15 @@ func updateMembership(
}
}
func (r *RoomserverInternalAPI) isLocalTarget(event *gomatrixserverlib.Event) bool {
isTargetLocalUser := false
if statekey := event.StateKey(); statekey != nil {
_, domain, _ := gomatrixserverlib.SplitID('@', *statekey)
isTargetLocalUser = domain == r.Cfg.Matrix.ServerName
}
return isTargetLocalUser
}
func updateToInviteMembership(
mu types.MembershipUpdater, add *gomatrixserverlib.Event, updates []api.OutputEvent,
roomVersion gomatrixserverlib.RoomVersion,

View file

@ -267,7 +267,7 @@ func (r *RoomserverInternalAPI) QueryMembershipsForRoom(
var stateEntries []types.StateEntry
if stillInRoom {
var eventNIDs []types.EventNID
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly)
eventNIDs, err = r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, request.JoinedOnly, false)
if err != nil {
return err
}
@ -591,7 +591,7 @@ func (r *RoomserverInternalAPI) isServerCurrentlyInRoom(ctx context.Context, ser
return false, err
}
eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true)
eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false)
if err != nil {
return false, err
}

View file

@ -297,7 +297,7 @@ func joinEventsFromHistoryVisibility(
if err != nil {
return nil, err
}
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true)
joinEventNIDs, err := db.GetMembershipEventNIDsForRoom(ctx, roomNID, true, false)
if err != nil {
return nil, err
}