mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-03 06:32:47 +00:00
Merge branch 'master' into kegan/history-vis
This commit is contained in:
commit
7b712865c6
65 changed files with 1457 additions and 245 deletions
|
@ -271,5 +271,6 @@ func (r *RoomserverInternalAPI) sendUpdatedAliasesEvent(
|
|||
var inputRes api.InputRoomEventsResponse
|
||||
|
||||
// Send the request
|
||||
return r.InputRoomEvents(ctx, &inputReq, &inputRes)
|
||||
r.InputRoomEvents(ctx, &inputReq, &inputRes)
|
||||
return inputRes.Err()
|
||||
}
|
||||
|
|
|
@ -16,13 +16,78 @@ package helpers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// CheckForSoftFail returns true if the event should be soft-failed
|
||||
// and false otherwise. The return error value should be checked before
|
||||
// the soft-fail bool.
|
||||
func CheckForSoftFail(
|
||||
ctx context.Context,
|
||||
db storage.Database,
|
||||
event gomatrixserverlib.HeaderedEvent,
|
||||
stateEventIDs []string,
|
||||
) (bool, error) {
|
||||
rewritesState := len(stateEventIDs) > 1
|
||||
|
||||
var authStateEntries []types.StateEntry
|
||||
var err error
|
||||
if rewritesState {
|
||||
authStateEntries, err = db.StateEntriesForEventIDs(ctx, stateEventIDs)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("StateEntriesForEventIDs failed: %w", err)
|
||||
}
|
||||
} else {
|
||||
// Work out if the room exists.
|
||||
var roomInfo *types.RoomInfo
|
||||
roomInfo, err = db.RoomInfo(ctx, event.RoomID())
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("db.RoomNID: %w", err)
|
||||
}
|
||||
if roomInfo == nil || roomInfo.IsStub {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Then get the state entries for the current state snapshot.
|
||||
// We'll use this to check if the event is allowed right now.
|
||||
roomState := state.NewStateResolution(db, *roomInfo)
|
||||
authStateEntries, err = roomState.LoadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// As a special case, it's possible that the room will have no
|
||||
// state because we haven't received a m.room.create event yet.
|
||||
// If we're now processing the first create event then never
|
||||
// soft-fail it.
|
||||
if len(authStateEntries) == 0 && event.Type() == gomatrixserverlib.MRoomCreate {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Work out which of the state events we actually need.
|
||||
stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event.Unwrap()})
|
||||
|
||||
// Load the actual auth events from the database.
|
||||
authEvents, err := loadAuthEvents(ctx, db, stateNeeded, authStateEntries)
|
||||
if err != nil {
|
||||
return true, fmt.Errorf("loadAuthEvents: %w", err)
|
||||
}
|
||||
|
||||
// Check if the event is allowed.
|
||||
if err = gomatrixserverlib.Allowed(event.Event, &authEvents); err != nil {
|
||||
// return true, nil
|
||||
return true, fmt.Errorf("gomatrixserverlib.Allowed: %w", err)
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// CheckAuthEvents checks that the event passes authentication checks
|
||||
// Returns the numeric IDs for the auth events.
|
||||
func CheckAuthEvents(
|
||||
|
@ -36,7 +101,7 @@ func CheckAuthEvents(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// TODO: check for duplicate state keys here.
|
||||
authStateEntries = types.DeduplicateStateEntries(authStateEntries)
|
||||
|
||||
// Work out which of the state events we actually need.
|
||||
stateNeeded := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{event.Unwrap()})
|
||||
|
|
|
@ -110,7 +110,7 @@ func (r *Inputer) InputRoomEvents(
|
|||
ctx context.Context,
|
||||
request *api.InputRoomEventsRequest,
|
||||
response *api.InputRoomEventsResponse,
|
||||
) error {
|
||||
) {
|
||||
// Create a wait group. Each task that we dispatch will call Done on
|
||||
// this wait group so that we know when all of our events have been
|
||||
// processed.
|
||||
|
@ -156,8 +156,10 @@ func (r *Inputer) InputRoomEvents(
|
|||
// that back to the caller.
|
||||
for _, task := range tasks {
|
||||
if task.err != nil {
|
||||
return task.err
|
||||
response.ErrMsg = task.err.Error()
|
||||
_, rejected := task.err.(*gomatrixserverlib.NotAllowed)
|
||||
response.NotAllowed = rejected
|
||||
return
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -46,10 +46,25 @@ func (r *Inputer) processRoomEvent(
|
|||
|
||||
// Check that the event passes authentication checks and work out
|
||||
// the numeric IDs for the auth events.
|
||||
authEventNIDs, err := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event")
|
||||
return
|
||||
isRejected := false
|
||||
authEventNIDs, rejectionErr := helpers.CheckAuthEvents(ctx, r.DB, headered, input.AuthEventIDs)
|
||||
if rejectionErr != nil {
|
||||
logrus.WithError(rejectionErr).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event, rejecting event")
|
||||
isRejected = true
|
||||
}
|
||||
|
||||
var softfail bool
|
||||
if input.Kind == api.KindBackfill || input.Kind == api.KindNew {
|
||||
// Check that the event passes authentication checks based on the
|
||||
// current room state.
|
||||
softfail, err = helpers.CheckForSoftFail(ctx, r.DB, headered, input.StateEventIDs)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
}).WithError(err).Info("Error authing soft-failed event")
|
||||
}
|
||||
}
|
||||
|
||||
// If we don't have a transaction ID then get one.
|
||||
|
@ -65,12 +80,13 @@ func (r *Inputer) processRoomEvent(
|
|||
}
|
||||
|
||||
// Store the event.
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
|
||||
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// if storing this event results in it being redacted then do so.
|
||||
if redactedEventID == event.EventID() {
|
||||
if !isRejected && redactedEventID == event.EventID() {
|
||||
r, rerr := eventutil.RedactEvent(redactionEvent, &event)
|
||||
if rerr != nil {
|
||||
return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
|
@ -86,7 +102,8 @@ func (r *Inputer) processRoomEvent(
|
|||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
}).Info("Stored outlier")
|
||||
"sender": event.Sender(),
|
||||
}).Debug("Stored outlier")
|
||||
return event.EventID(), nil
|
||||
}
|
||||
|
||||
|
@ -101,12 +118,33 @@ func (r *Inputer) processRoomEvent(
|
|||
if stateAtEvent.BeforeStateSnapshotNID == 0 {
|
||||
// We haven't calculated a state for this event yet.
|
||||
// Lets calculate one.
|
||||
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event)
|
||||
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
|
||||
if isRejected || softfail {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
"soft_fail": softfail,
|
||||
"sender": event.Sender(),
|
||||
}).Debug("Stored rejected event")
|
||||
return event.EventID(), rejectionErr
|
||||
}
|
||||
|
||||
if input.Kind == api.KindRewrite {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": event.EventID(),
|
||||
"type": event.Type(),
|
||||
"room": event.RoomID(),
|
||||
}).Debug("Stored rewrite")
|
||||
return event.EventID(), nil
|
||||
}
|
||||
|
||||
if err = r.updateLatestEvents(
|
||||
ctx, // context
|
||||
roomInfo, // room info for the room being updated
|
||||
|
@ -114,6 +152,7 @@ func (r *Inputer) processRoomEvent(
|
|||
event, // event
|
||||
input.SendAsServer, // send as server
|
||||
input.TransactionID, // transaction ID
|
||||
input.HasState, // rewrites state?
|
||||
); err != nil {
|
||||
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
}
|
||||
|
@ -147,11 +186,12 @@ func (r *Inputer) calculateAndSetState(
|
|||
roomInfo types.RoomInfo,
|
||||
stateAtEvent *types.StateAtEvent,
|
||||
event gomatrixserverlib.Event,
|
||||
isRejected bool,
|
||||
) error {
|
||||
var err error
|
||||
roomState := state.NewStateResolution(r.DB, roomInfo)
|
||||
|
||||
if input.HasState {
|
||||
if input.HasState && !isRejected {
|
||||
// Check here if we think we're in the room already.
|
||||
stateAtEvent.Overwrite = true
|
||||
var joinEventNIDs []types.EventNID
|
||||
|
@ -167,19 +207,25 @@ func (r *Inputer) calculateAndSetState(
|
|||
// Check that those state events are in the database and store the state.
|
||||
var entries []types.StateEntry
|
||||
if entries, err = r.DB.StateEntriesForEventIDs(ctx, input.StateEventIDs); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("r.DB.StateEntriesForEventIDs: %w", err)
|
||||
}
|
||||
entries = types.DeduplicateStateEntries(entries)
|
||||
|
||||
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("r.DB.AddState: %w", err)
|
||||
}
|
||||
} else {
|
||||
stateAtEvent.Overwrite = false
|
||||
|
||||
// We haven't been told what the state at the event is so we need to calculate it from the prev_events
|
||||
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event); err != nil {
|
||||
return err
|
||||
if stateAtEvent.BeforeStateSnapshotNID, err = roomState.CalculateAndStoreStateBeforeEvent(ctx, event, isRejected); err != nil {
|
||||
return fmt.Errorf("roomState.CalculateAndStoreStateBeforeEvent: %w", err)
|
||||
}
|
||||
}
|
||||
return r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
|
||||
|
||||
err = r.DB.SetState(ctx, stateAtEvent.EventNID, stateAtEvent.BeforeStateSnapshotNID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.DB.SetState: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ func (r *Inputer) updateLatestEvents(
|
|||
event gomatrixserverlib.Event,
|
||||
sendAsServer string,
|
||||
transactionID *api.TransactionID,
|
||||
rewritesState bool,
|
||||
) (err error) {
|
||||
updater, err := r.DB.GetLatestEventsForUpdate(ctx, *roomInfo)
|
||||
if err != nil {
|
||||
|
@ -71,6 +72,7 @@ func (r *Inputer) updateLatestEvents(
|
|||
event: event,
|
||||
sendAsServer: sendAsServer,
|
||||
transactionID: transactionID,
|
||||
rewritesState: rewritesState,
|
||||
}
|
||||
|
||||
if err = u.doUpdateLatestEvents(); err != nil {
|
||||
|
@ -93,6 +95,7 @@ type latestEventsUpdater struct {
|
|||
stateAtEvent types.StateAtEvent
|
||||
event gomatrixserverlib.Event
|
||||
transactionID *api.TransactionID
|
||||
rewritesState bool
|
||||
// Which server to send this event as.
|
||||
sendAsServer string
|
||||
// The eventID of the event that was processed before this one.
|
||||
|
@ -178,7 +181,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
|||
return fmt.Errorf("u.api.updateMemberships: %w", err)
|
||||
}
|
||||
|
||||
update, err := u.makeOutputNewRoomEvent()
|
||||
var update *api.OutputEvent
|
||||
update, err = u.makeOutputNewRoomEvent()
|
||||
if err != nil {
|
||||
return fmt.Errorf("u.makeOutputNewRoomEvent: %w", err)
|
||||
}
|
||||
|
@ -305,6 +309,7 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
|||
|
||||
ore := api.OutputNewRoomEvent{
|
||||
Event: u.event.Headered(u.roomInfo.RoomVersion),
|
||||
RewritesState: u.rewritesState,
|
||||
LastSentEventID: u.lastEventIDSent,
|
||||
LatestEventIDs: latestEventIDs,
|
||||
TransactionID: u.transactionID,
|
||||
|
@ -337,6 +342,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
|
|||
return nil, fmt.Errorf("failed to load add_state_events from db: %w", err)
|
||||
}
|
||||
}
|
||||
// State is rewritten if the input room event HasState and we actually produced a delta on state events.
|
||||
// Without this check, /get_missing_events which produce events with associated (but not complete) state
|
||||
// will incorrectly purge the room and set it to no state. TODO: This is likely flakey, as if /gme produced
|
||||
// a state conflict res which just so happens to include 2+ events we might purge the room state downstream.
|
||||
ore.RewritesState = len(ore.AddsStateEventIDs) > 1
|
||||
|
||||
return &api.OutputEvent{
|
||||
Type: api.OutputTypeNewRoomEvent,
|
||||
|
|
|
@ -547,7 +547,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse
|
|||
var stateAtEvent types.StateAtEvent
|
||||
var redactedEventID string
|
||||
var redactionEvent *gomatrixserverlib.Event
|
||||
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
|
||||
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids, false)
|
||||
if err != nil {
|
||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
|
||||
continue
|
||||
|
|
|
@ -183,7 +183,8 @@ func (r *Inviter) PerformInvite(
|
|||
},
|
||||
}
|
||||
inputRes := &api.InputRoomEventsResponse{}
|
||||
if err = r.Inputer.InputRoomEvents(context.Background(), inputReq, inputRes); err != nil {
|
||||
r.Inputer.InputRoomEvents(context.Background(), inputReq, inputRes)
|
||||
if err = inputRes.Err(); err != nil {
|
||||
return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -183,33 +183,33 @@ func (r *Joiner) performJoinRoomByID(
|
|||
return "", fmt.Errorf("eb.SetContent: %w", err)
|
||||
}
|
||||
|
||||
// First work out if this is in response to an existing invite
|
||||
// from a federated server. If it is then we avoid the situation
|
||||
// where we might think we know about a room in the following
|
||||
// section but don't know the latest state as all of our users
|
||||
// have left.
|
||||
// Force a federated join if we aren't in the room and we've been
|
||||
// given some server names to try joining by.
|
||||
serverInRoom, _ := helpers.IsServerCurrentlyInRoom(ctx, r.DB, r.ServerName, req.RoomIDOrAlias)
|
||||
forceFederatedJoin := len(req.ServerNames) > 0 && !serverInRoom
|
||||
|
||||
// Force a federated join if we're dealing with a pending invite
|
||||
// and we aren't in the room.
|
||||
isInvitePending, inviteSender, _, err := helpers.IsInvitePending(ctx, r.DB, req.RoomIDOrAlias, req.UserID)
|
||||
if err == nil && isInvitePending && !serverInRoom {
|
||||
// Check if there's an invite pending.
|
||||
if err == nil && isInvitePending {
|
||||
_, inviterDomain, ierr := gomatrixserverlib.SplitID('@', inviteSender)
|
||||
if ierr != nil {
|
||||
return "", fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
|
||||
}
|
||||
|
||||
// Check that the domain isn't ours. If it's local then we don't
|
||||
// need to do anything as our own copy of the room state will be
|
||||
// up-to-date.
|
||||
// If we were invited by someone from another server then we can
|
||||
// assume they are in the room so we can join via them.
|
||||
if inviterDomain != r.Cfg.Matrix.ServerName {
|
||||
// Add the server of the person who invited us to the server list,
|
||||
// as they should be a fairly good bet.
|
||||
req.ServerNames = append(req.ServerNames, inviterDomain)
|
||||
|
||||
// Perform a federated room join.
|
||||
return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
|
||||
forceFederatedJoin = true
|
||||
}
|
||||
}
|
||||
|
||||
// If we should do a forced federated join then do that.
|
||||
if forceFederatedJoin {
|
||||
return req.RoomIDOrAlias, r.performFederatedJoinRoomByID(ctx, req)
|
||||
}
|
||||
|
||||
// Try to construct an actual join event from the template.
|
||||
// If this succeeds then it is a sign that the room already exists
|
||||
// locally on the homeserver.
|
||||
|
@ -247,7 +247,8 @@ func (r *Joiner) performJoinRoomByID(
|
|||
},
|
||||
}
|
||||
inputRes := api.InputRoomEventsResponse{}
|
||||
if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
|
||||
r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes)
|
||||
if err = inputRes.Err(); err != nil {
|
||||
var notAllowed *gomatrixserverlib.NotAllowed
|
||||
if errors.As(err, ¬Allowed) {
|
||||
return "", &api.PerformError{
|
||||
|
|
|
@ -139,7 +139,8 @@ func (r *Leaver) performLeaveRoomByID(
|
|||
},
|
||||
}
|
||||
inputRes := api.InputRoomEventsResponse{}
|
||||
if err = r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes); err != nil {
|
||||
r.Inputer.InputRoomEvents(ctx, &inputReq, &inputRes)
|
||||
if err = inputRes.Err(); err != nil {
|
||||
return nil, fmt.Errorf("r.InputRoomEvents: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -70,6 +70,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
|||
if err != nil {
|
||||
switch err.(type) {
|
||||
case types.MissingEventError:
|
||||
util.GetLogger(ctx).Errorf("QueryStateAfterEvents: MissingEventError: %s", err)
|
||||
return nil
|
||||
default:
|
||||
return err
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue