mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Unmarshal events at the Dendrite level not GMSL level (#2164)
* Use new event json types in gmsl * Fix EventJSON to actually unmarshal events * Update GMSL * Bump GMSL and improve error messages * Send back the correct RespState * Update GMSL
This commit is contained in:
parent
cc688a9a38
commit
aa5c3b88de
15 changed files with 158 additions and 107 deletions
|
@ -51,7 +51,7 @@ func SendEventWithState(
|
|||
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
|
||||
origin gomatrixserverlib.ServerName, haveEventIDs map[string]bool, async bool,
|
||||
) error {
|
||||
outliers, err := state.Events()
|
||||
outliers, err := state.Events(event.RoomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -68,9 +68,10 @@ func SendEventWithState(
|
|||
})
|
||||
}
|
||||
|
||||
stateEventIDs := make([]string, len(state.StateEvents))
|
||||
for i := range state.StateEvents {
|
||||
stateEventIDs[i] = state.StateEvents[i].EventID()
|
||||
stateEvents := state.StateEvents.UntrustedEvents(event.RoomVersion)
|
||||
stateEventIDs := make([]string, len(stateEvents))
|
||||
for i := range stateEvents {
|
||||
stateEventIDs[i] = stateEvents[i].EventID()
|
||||
}
|
||||
|
||||
ires = append(ires, InputRoomEvent{
|
||||
|
|
|
@ -438,7 +438,7 @@ func (r *Inputer) fetchAuthEvents(
|
|||
isRejected := false
|
||||
nextAuthEvent:
|
||||
for _, authEvent := range gomatrixserverlib.ReverseTopologicalOrdering(
|
||||
res.AuthEvents,
|
||||
res.AuthEvents.UntrustedEvents(event.RoomVersion),
|
||||
gomatrixserverlib.TopologicalOrderByAuthEvents,
|
||||
) {
|
||||
// If we already know about this event from the database then we don't
|
||||
|
|
|
@ -18,6 +18,11 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type parsedRespState struct {
|
||||
AuthEvents []*gomatrixserverlib.Event
|
||||
StateEvents []*gomatrixserverlib.Event
|
||||
}
|
||||
|
||||
type missingStateReq struct {
|
||||
origin gomatrixserverlib.ServerName
|
||||
db *shared.RoomUpdater
|
||||
|
@ -98,7 +103,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// That's because the state will have been through state resolution once
|
||||
// already in QueryStateAfterEvent.
|
||||
trustworthy bool
|
||||
*gomatrixserverlib.RespState
|
||||
*parsedRespState
|
||||
}
|
||||
|
||||
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
|
||||
|
@ -125,7 +130,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// 1. Ensures that the state is deduplicated fully for each state-key tuple
|
||||
// 2. Ensures that we pick the latest events from both sets, in the case that
|
||||
// one of the prev_events is quite a bit older than the others
|
||||
resolvedState := &gomatrixserverlib.RespState{}
|
||||
resolvedState := &parsedRespState{}
|
||||
switch len(states) {
|
||||
case 0:
|
||||
extremityIsCreate := backwardsExtremity.Type() == gomatrixserverlib.MRoomCreate && backwardsExtremity.StateKeyEquals("")
|
||||
|
@ -140,16 +145,16 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// local state snapshot which will already have been through state res),
|
||||
// use it as-is. There's no point in resolving it again.
|
||||
if states[0].trustworthy {
|
||||
resolvedState = states[0].RespState
|
||||
resolvedState = states[0].parsedRespState
|
||||
break
|
||||
}
|
||||
// Otherwise, if it isn't trustworthy (came from federation), run it through
|
||||
// state resolution anyway for safety, in case there are duplicates.
|
||||
fallthrough
|
||||
default:
|
||||
respStates := make([]*gomatrixserverlib.RespState, len(states))
|
||||
respStates := make([]*parsedRespState, len(states))
|
||||
for i := range states {
|
||||
respStates[i] = states[i].RespState
|
||||
respStates[i] = states[i].parsedRespState
|
||||
}
|
||||
// There's more than one previous state - run them all through state res
|
||||
t.roomsMu.Lock(e.RoomID())
|
||||
|
@ -169,7 +174,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
t.hadEventsMutex.Unlock()
|
||||
|
||||
// Send outliers first so we can send the new backwards extremity without causing errors
|
||||
outliers, err := resolvedState.Events()
|
||||
outliers, err := gomatrixserverlib.OrderAuthAndStateEvents(resolvedState.AuthEvents, resolvedState.StateEvents, roomVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -239,7 +244,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
|
||||
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
|
||||
// added into the mix.
|
||||
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*gomatrixserverlib.RespState, bool, error) {
|
||||
func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (*parsedRespState, bool, error) {
|
||||
// try doing all this locally before we resort to querying federation
|
||||
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID)
|
||||
if respState != nil {
|
||||
|
@ -290,7 +295,7 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *g
|
|||
return ev
|
||||
}
|
||||
|
||||
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *gomatrixserverlib.RespState {
|
||||
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState {
|
||||
var res api.QueryStateAfterEventsResponse
|
||||
err := t.queryer.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
|
||||
RoomID: roomID,
|
||||
|
@ -345,7 +350,7 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
|
|||
queryRes.Events = nil
|
||||
}
|
||||
|
||||
return &gomatrixserverlib.RespState{
|
||||
return &parsedRespState{
|
||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
|
||||
AuthEvents: authEvents,
|
||||
}
|
||||
|
@ -354,13 +359,13 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
|
|||
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
|
||||
// the server supports.
|
||||
func (t *missingStateReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
|
||||
*gomatrixserverlib.RespState, error) {
|
||||
*parsedRespState, error) {
|
||||
|
||||
// Attempt to fetch the missing state using /state_ids and /events
|
||||
return t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
|
||||
}
|
||||
|
||||
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
|
||||
func (t *missingStateReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*parsedRespState, backwardsExtremity *gomatrixserverlib.Event) (*parsedRespState, error) {
|
||||
var authEventList []*gomatrixserverlib.Event
|
||||
var stateEventList []*gomatrixserverlib.Event
|
||||
for _, state := range states {
|
||||
|
@ -379,7 +384,7 @@ retryAllowedState:
|
|||
h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
|
||||
switch err2.(type) {
|
||||
case verifySigError:
|
||||
return &gomatrixserverlib.RespState{
|
||||
return &parsedRespState{
|
||||
AuthEvents: authEventList,
|
||||
StateEvents: resolvedStateEvents,
|
||||
}, nil
|
||||
|
@ -395,7 +400,7 @@ retryAllowedState:
|
|||
}
|
||||
return nil, err
|
||||
}
|
||||
return &gomatrixserverlib.RespState{
|
||||
return &parsedRespState{
|
||||
AuthEvents: authEventList,
|
||||
StateEvents: resolvedStateEvents,
|
||||
}, nil
|
||||
|
@ -452,12 +457,21 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
|||
// Make sure events from the missingResp are using the cache - missing events
|
||||
// will be added and duplicates will be removed.
|
||||
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
|
||||
for i, ev := range missingResp.Events {
|
||||
missingResp.Events[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
missingEvents := make([]*gomatrixserverlib.Event, len(missingResp.Events))
|
||||
for i, evJSON := range missingResp.Events {
|
||||
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(evJSON, roomVersion)
|
||||
if err != nil {
|
||||
logger.WithError(err).WithField("event", string(evJSON)).Warn("NewEventFromUntrustedJSON: failed")
|
||||
return nil, false, missingPrevEventsError{
|
||||
eventID: e.EventID(),
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
missingEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
}
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||
newEvents = gomatrixserverlib.ReverseTopologicalOrdering(missingEvents, gomatrixserverlib.TopologicalOrderByPrevEvents)
|
||||
shouldHaveSomeEventIDs := e.PrevEventIDs()
|
||||
hasPrevEvent := false
|
||||
Event:
|
||||
|
@ -498,29 +512,37 @@ Event:
|
|||
return newEvents, true, nil
|
||||
}
|
||||
|
||||
func (t *missingStateReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
respState *gomatrixserverlib.RespState, err error) {
|
||||
func (t *missingStateReq) lookupMissingStateViaState(
|
||||
ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion,
|
||||
) (respState *parsedRespState, err error) {
|
||||
state, err := t.federation.LookupState(ctx, t.origin, roomID, eventID, roomVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Check that the returned state is valid.
|
||||
if err := state.Check(ctx, t.keys, nil); err != nil {
|
||||
if err := state.Check(ctx, roomVersion, t.keys, nil); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parsedState := &parsedRespState{
|
||||
AuthEvents: make([]*gomatrixserverlib.Event, len(state.AuthEvents)),
|
||||
StateEvents: make([]*gomatrixserverlib.Event, len(state.StateEvents)),
|
||||
}
|
||||
// Cache the results of this state lookup and deduplicate anything we already
|
||||
// have in the cache, freeing up memory.
|
||||
for i, ev := range state.AuthEvents {
|
||||
state.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
// We load these as trusted as we called state.Check before which loaded them as untrusted.
|
||||
for i, evJSON := range state.AuthEvents {
|
||||
ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
|
||||
parsedState.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
}
|
||||
for i, ev := range state.StateEvents {
|
||||
state.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
for i, evJSON := range state.StateEvents {
|
||||
ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
|
||||
parsedState.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
}
|
||||
return &state, nil
|
||||
return parsedState, nil
|
||||
}
|
||||
|
||||
func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
*gomatrixserverlib.RespState, error) {
|
||||
*parsedRespState, error) {
|
||||
util.GetLogger(ctx).WithField("room_id", roomID).Infof("lookupMissingStateViaStateIDs %s", eventID)
|
||||
// fetch the state event IDs at the time of the event
|
||||
stateIDs, err := t.federation.LookupStateIDs(ctx, t.origin, roomID, eventID)
|
||||
|
@ -652,13 +674,14 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
return resp, err
|
||||
}
|
||||
|
||||
func (t *missingStateReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) (
|
||||
*gomatrixserverlib.RespState, error) { // nolint:unparam
|
||||
func (t *missingStateReq) createRespStateFromStateIDs(
|
||||
stateIDs gomatrixserverlib.RespStateIDs,
|
||||
) (*parsedRespState, error) { // nolint:unparam
|
||||
t.haveEventsMutex.Lock()
|
||||
defer t.haveEventsMutex.Unlock()
|
||||
|
||||
// create a RespState response using the response to /state_ids as a guide
|
||||
respState := gomatrixserverlib.RespState{}
|
||||
respState := parsedRespState{}
|
||||
|
||||
for i := range stateIDs.StateEventIDs {
|
||||
ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue