From ce5dfbebf98a55c0efe4f86ba30956916bf0b3ad Mon Sep 17 00:00:00 2001 From: Kegsay Date: Tue, 12 May 2020 16:24:28 +0100 Subject: [PATCH] Implement /get_missing_events (#1022) * WIP get_missing_events work * More WIP get_missing_events work * First working /get_missing_events implementation Flakey currently due to racing between /sync and /send * Final tweaks * Remove log lines * Linting * go mod tidy * Clamp min depth to 0 * sort events by depth because sytest makes me sad Specifically I think it's https://github.com/matrix-org/sytest/blob/4172585c2521ec6d640b4b580080276da1ab5353/lib/SyTest/Federation/Client.pm#L265 to blame here. --- federationapi/routing/join.go | 19 ++ federationapi/routing/routing.go | 22 ++ federationapi/routing/send.go | 505 ++++++++++++++++++++++------ federationapi/routing/send_test.go | 318 ++++++++++++------ roomserver/internal/input_events.go | 2 +- sytest-whitelist | 7 + 6 files changed, 675 insertions(+), 198 deletions(-) diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index ada89e4a..3533bbba 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -17,6 +17,7 @@ package routing import ( "fmt" "net/http" + "sort" "time" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -275,6 +276,12 @@ func SendJoin( } } + // sort events deterministically by depth (lower is earlier) + // We also do this because sytest's basic federation server isn't good at using the correct + // state if these lists are randomised, resulting in flakey tests. :( + sort.Sort(eventsByDepth(stateAndAuthChainResponse.StateEvents)) + sort.Sort(eventsByDepth(stateAndAuthChainResponse.AuthChainEvents)) + // https://matrix.org/docs/spec/server_server/latest#put-matrix-federation-v1-send-join-roomid-eventid return util.JSONResponse{ Code: http.StatusOK, @@ -285,3 +292,15 @@ func SendJoin( }, } } + +type eventsByDepth []gomatrixserverlib.HeaderedEvent + +func (e eventsByDepth) Len() int { + return len(e) +} +func (e eventsByDepth) Swap(i, j int) { + e[i], e[j] = e[j], e[i] +} +func (e eventsByDepth) Less(i, j int) bool { + return e[i].Depth() < e[j].Depth() +} diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index a5b8ce24..67f3a957 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -226,6 +226,28 @@ func Setup( }, )).Methods(http.MethodGet) + v1fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI( + "federation_send_join", cfg.Matrix.ServerName, keys, + func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { + vars, err := common.URLDecodeMapValues(mux.Vars(httpReq)) + if err != nil { + return util.ErrorResponse(err) + } + roomID := vars["roomID"] + eventID := vars["eventID"] + res := SendJoin( + httpReq, request, cfg, rsAPI, producer, keys, roomID, eventID, + ) + return util.JSONResponse{ + Headers: res.Headers, + Code: res.Code, + JSON: []interface{}{ + res.Code, res.JSON, + }, + } + }, + )).Methods(http.MethodPut) + v2fedmux.Handle("/send_join/{roomID}/{eventID}", common.MakeFedAPI( "federation_send_join", cfg.Matrix.ServerName, keys, func(httpReq *http.Request, request *gomatrixserverlib.FederationRequest) util.JSONResponse { diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index e6f91d94..99022074 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -48,6 +48,8 @@ func Send( eduProducer: eduProducer, keys: keys, federation: federation, + haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + newEvents: make(map[string]bool), } var txnEvents struct { @@ -105,6 +107,11 @@ type txnReq struct { eduProducer *producers.EDUServerProducer keys gomatrixserverlib.JSONVerifier federation txnFederationClient + // local cache of events for auth checks, etc - this may include events + // which the roomserver is unaware of. + haveEvents map[string]*gomatrixserverlib.HeaderedEvent + // new events which the roomserver does not know about + newEvents map[string]bool } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -114,6 +121,8 @@ type txnFederationClient interface { ) LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) + LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, + roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { @@ -148,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { // Process the events. for _, e := range pdus { - err := t.processEvent(e.Unwrap()) + err := t.processEvent(e.Unwrap(), true) if err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -168,7 +177,9 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { switch err.(type) { case roomNotFoundError: case *gomatrixserverlib.NotAllowed: + case missingPrevEventsError: default: + util.GetLogger(t.context).Warnf("Processing %s failed: %s", e.EventID(), err) // Any other error should be the result of a temporary error in // our server so we should bail processing the transaction entirely. return nil, err @@ -197,12 +208,30 @@ type verifySigError struct { eventID string err error } +type missingPrevEventsError struct { + eventID string + err error +} func (e roomNotFoundError) Error() string { return fmt.Sprintf("room %q not found", e.roomID) } func (e unmarshalError) Error() string { return fmt.Sprintf("unable to parse event: %s", e.err) } func (e verifySigError) Error() string { return fmt.Sprintf("unable to verify signature of event %q: %s", e.eventID, e.err) } +func (e missingPrevEventsError) Error() string { + return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err) +} + +func (t *txnReq) haveEventIDs() map[string]bool { + result := make(map[string]bool, len(t.haveEvents)) + for eventID := range t.haveEvents { + if t.newEvents[eventID] { + continue + } + result[eventID] = true + } + return result +} func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { for _, e := range edus { @@ -227,7 +256,7 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { } } -func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { +func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error { prevEventIDs := e.PrevEventIDs() // Fetch the state needed to authenticate the event. @@ -253,21 +282,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { } if !stateResp.PrevEventsExist { - return t.processEventWithMissingState(e, stateResp.RoomVersion) + return t.processEventWithMissingState(e, stateResp.RoomVersion, isInboundTxn) } // Check that the event is allowed by the state at the event. - var events []gomatrixserverlib.Event - for _, headeredEvent := range stateResp.StateEvents { - events = append(events, headeredEvent.Unwrap()) - } - if err := checkAllowedByState(e, events); err != nil { + if err := checkAllowedByState(e, gomatrixserverlib.UnwrapEventHeaders(stateResp.StateEvents)); err != nil { return err } - // TODO: Check that the roomserver has a copy of all of the auth_events. - // TODO: Check that the event is allowed by its auth_events. - // pass the event to the roomserver _, err := t.producer.SendEvents( t.context, @@ -291,7 +313,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver return gomatrixserverlib.Allowed(e, &authUsingState) } -func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error { +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error { // We are missing the previous events for this events. // This means that there is a gap in our view of the history of the // room. There two ways that we can handle such a gap: @@ -306,49 +328,315 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer // event ids and then use /event to fetch the individual events. // However not all version of synapse support /state_ids so you may // need to fallback to /state. - // TODO: Attempt to fill in the gap using /get_missing_events - // Attempt to fetch the missing state using /state_ids and /events - respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion) + // Attempt to fill in the gap using /get_missing_events + // This will either: + // - fill in the gap completely then process event `e` returning no backwards extremity + // - fail to fill in the gap and tell us to terminate the transaction err=not nil + // - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction + backwardsExtremity, err := t.getMissingEvents(e, roomVersion, isInboundTxn) if err != nil { - // Fallback to /state - util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state") - respState, err = t.lookupMissingStateViaState(e, roomVersion) - if err != nil { - return err - } + return err + } + if backwardsExtremity == nil { + // we filled in the gap! + return nil } - // Check that the event is allowed by the state. -retryAllowedState: - if err := checkAllowedByState(e, respState.StateEvents); err != nil { - switch missing := err.(type) { - case gomatrixserverlib.MissingAuthEventError: - // An auth event was missing so let's look up that event over federation - for _, s := range respState.StateEvents { - if s.EventID() != missing.AuthEventID { - continue - } - err = t.processEventWithMissingState(s, roomVersion) - // If there was no error retrieving the event from federation then - // we assume that it succeeded, so retry the original state check - if err == nil { - goto retryAllowedState - } - } - default: + // at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity. + // security: we have to do state resolution on the new backwards extremity (TODO: WHY) + // Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query + // the state AFTER all the prev_events for this event, then mix in our current room state and apply state resolution + // to that to get the state before the event. + var states []*gomatrixserverlib.RespState + needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples() + for _, prevEventID := range backwardsExtremity.PrevEventIDs() { + var prevState *gomatrixserverlib.RespState + prevState, err = t.lookupStateAfterEvent(roomVersion, backwardsExtremity.RoomID(), prevEventID, needed) + if err != nil { + util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID) + return err } + states = append(states, prevState) + } + // mix in the current room state + currState, err := t.lookupCurrentState(backwardsExtremity) + if err != nil { + util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup current room state") + return err + } + states = append(states, currState) + resolvedState, err := t.resolveStatesAndCheck(roomVersion, states, backwardsExtremity) + if err != nil { + util.GetLogger(t.context).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID()) return err } // pass the event along with the state to the roomserver using a background context so we don't // needlessly expire - return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs) + return t.producer.SendEventWithState(context.Background(), resolvedState, e.Headered(roomVersion), t.haveEventIDs()) } -func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( +// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event) +// added into the mix. +func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) { + // try doing all this locally before we resort to querying federation + respState := t.lookupStateAfterEventLocally(roomID, eventID, needed) + if respState != nil { + return respState, nil + } + + respState, err := t.lookupStateBeforeEvent(roomVersion, roomID, eventID) + if err != nil { + return nil, err + } + + // fetch the event we're missing and add it to the pile + h, err := t.lookupEvent(roomVersion, eventID, false) + if err != nil { + return nil, err + } + t.haveEvents[h.EventID()] = h + if h.StateKey() != nil { + addedToState := false + for i := range respState.StateEvents { + se := respState.StateEvents[i] + if se.Type() == h.Type() && se.StateKeyEquals(*h.StateKey()) { + respState.StateEvents[i] = h.Unwrap() + addedToState = true + break + } + } + if !addedToState { + respState.StateEvents = append(respState.StateEvents, h.Unwrap()) + } + } + + return respState, nil +} + +func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState { + var res api.QueryStateAfterEventsResponse + err := t.rsAPI.QueryStateAfterEvents(t.context, &api.QueryStateAfterEventsRequest{ + RoomID: roomID, + PrevEventIDs: []string{eventID}, + StateToFetch: needed, + }, &res) + if err != nil || !res.PrevEventsExist { + util.GetLogger(t.context).WithError(err).Warnf("failed to query state after %s locally", eventID) + return nil + } + for i, ev := range res.StateEvents { + t.haveEvents[ev.EventID()] = &res.StateEvents[i] + } + var authEvents []gomatrixserverlib.Event + missingAuthEvents := make(map[string]bool) + for _, ev := range res.StateEvents { + for _, ae := range ev.AuthEventIDs() { + aev, ok := t.haveEvents[ae] + if ok { + authEvents = append(authEvents, aev.Unwrap()) + } else { + missingAuthEvents[ae] = true + } + } + } + // QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't + // have stored the event. + var missingEventList []string + for evID := range missingAuthEvents { + missingEventList = append(missingEventList, evID) + } + queryReq := api.QueryEventsByIDRequest{ + EventIDs: missingEventList, + } + util.GetLogger(t.context).Infof("Fetching missing auth events: %v", missingEventList) + var queryRes api.QueryEventsByIDResponse + if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil { + return nil + } + for i := range queryRes.Events { + evID := queryRes.Events[i].EventID() + t.haveEvents[evID] = &queryRes.Events[i] + authEvents = append(authEvents, queryRes.Events[i].Unwrap()) + } + + evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents) + return &gomatrixserverlib.RespState{ + StateEvents: evs, + AuthEvents: authEvents, + } +} + +func (t *txnReq) lookupCurrentState(newEvent *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) { + // Ask the roomserver for information about this room + queryReq := api.QueryLatestEventsAndStateRequest{ + RoomID: newEvent.RoomID(), + StateToFetch: gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*newEvent}).Tuples(), + } + var queryRes api.QueryLatestEventsAndStateResponse + if err := t.rsAPI.QueryLatestEventsAndState(t.context, &queryReq, &queryRes); err != nil { + return nil, fmt.Errorf("lookupCurrentState rsAPI.QueryLatestEventsAndState: %w", err) + } + evs := gomatrixserverlib.UnwrapEventHeaders(queryRes.StateEvents) + return &gomatrixserverlib.RespState{ + StateEvents: evs, + AuthEvents: evs, + }, nil +} + +// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what +// the server supports. +func (t *txnReq) lookupStateBeforeEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) ( respState *gomatrixserverlib.RespState, err error) { - state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion) + + util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s", eventID) + + // Attempt to fetch the missing state using /state_ids and /events + respState, err = t.lookupMissingStateViaStateIDs(roomID, eventID, roomVersion) + if err != nil { + // Fallback to /state + util.GetLogger(t.context).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state") + respState, err = t.lookupMissingStateViaState(roomID, eventID, roomVersion) + } + return +} + +func (t *txnReq) resolveStatesAndCheck(roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) { + var authEventList []gomatrixserverlib.Event + var stateEventList []gomatrixserverlib.Event + for _, state := range states { + authEventList = append(authEventList, state.AuthEvents...) + stateEventList = append(stateEventList, state.StateEvents...) + } + resolvedStateEvents, err := gomatrixserverlib.ResolveConflicts(roomVersion, stateEventList, authEventList) + if err != nil { + return nil, err + } + // apply the current event +retryAllowedState: + if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil { + switch missing := err.(type) { + case gomatrixserverlib.MissingAuthEventError: + h, err2 := t.lookupEvent(roomVersion, missing.AuthEventID, true) + if err2 != nil { + return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2) + } + util.GetLogger(t.context).Infof("fetched event %s", missing.AuthEventID) + resolvedStateEvents = append(resolvedStateEvents, h.Unwrap()) + goto retryAllowedState + default: + } + return nil, err + } + return &gomatrixserverlib.RespState{ + AuthEvents: authEventList, + StateEvents: resolvedStateEvents, + }, nil +} + +// getMissingEvents returns a nil backwardsExtremity if missing events were fetched and handled, else returns the new backwards extremity which we should +// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events +// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns. +// This means that we may recursively call this function, as we spider back up prev_events to the min depth. +func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) { + if !isInboundTxn { + // we've recursed here, so just take a state snapshot please! + return &e, nil + } + logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID()) + needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e}) + // query latest events (our trusted forward extremities) + req := api.QueryLatestEventsAndStateRequest{ + RoomID: e.RoomID(), + StateToFetch: needed.Tuples(), + } + var res api.QueryLatestEventsAndStateResponse + if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil { + logger.WithError(err).Warn("Failed to query latest events") + return &e, nil + } + latestEvents := make([]string, len(res.LatestEvents)) + for i := range res.LatestEvents { + latestEvents[i] = res.LatestEvents[i].EventID + } + // this server just sent us an event for which we do not know its prev_events - ask that server for those prev_events. + minDepth := int(res.Depth) - 20 + if minDepth < 0 { + minDepth = 0 + } + missingResp, err := t.federation.LookupMissingEvents(t.context, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{ + Limit: 20, + // synapse uses the min depth they've ever seen in that room + MinDepth: minDepth, + // The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events. + EarliestEvents: latestEvents, + // The event IDs to retrieve the previous events for. + LatestEvents: []string{e.EventID()}, + }, roomVersion) + + // security: how we handle failures depends on whether or not this event will become the new forward extremity for the room. + // There's 2 scenarios to consider: + // - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true) + // - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false) + // In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room, + // as it was called in response to an inbound txn which had it as a prev_event. + // In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad + // because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked. + // https://github.com/matrix-org/synapse/pull/3456 + // https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335 + // For now, we do not allow Case B, so reject the event. + if err != nil { + logger.WithError(err).Errorf( + "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", + t.Origin, + ) + return nil, missingPrevEventsError{ + eventID: e.EventID(), + err: err, + } + } + logger.Infof("get_missing_events returned %d events", len(missingResp.Events)) + + // topologically sort and sanity check that we are making forward progress + newEvents := gomatrixserverlib.ReverseTopologicalOrdering(missingResp.Events, gomatrixserverlib.TopologicalOrderByPrevEvents) + shouldHaveSomeEventIDs := e.PrevEventIDs() + hasPrevEvent := false +Event: + for _, pe := range shouldHaveSomeEventIDs { + for _, ev := range newEvents { + if ev.EventID() == pe { + hasPrevEvent = true + break Event + } + } + } + if !hasPrevEvent { + err = fmt.Errorf("called /get_missing_events but server %s didn't return any prev_events with IDs %v", t.Origin, shouldHaveSomeEventIDs) + logger.WithError(err).Errorf( + "%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can", + t.Origin, + ) + return nil, missingPrevEventsError{ + eventID: e.EventID(), + err: err, + } + } + // process the missing events then the event which started this whole thing + for _, ev := range append(newEvents, e) { + err := t.processEvent(ev, false) + if err != nil { + return nil, err + } + } + + // we processed everything! + return nil, nil +} + +func (t *txnReq) lookupMissingStateViaState(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( + respState *gomatrixserverlib.RespState, err error) { + state, err := t.federation.LookupState(t.context, t.Origin, roomID, eventID, roomVersion) if err != nil { return nil, err } @@ -359,78 +647,64 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi return &state, nil } -func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) ( - *gomatrixserverlib.RespState, map[string]bool, error) { - +func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( + *gomatrixserverlib.RespState, error) { + util.GetLogger(t.context).Infof("lookupMissingStateViaStateIDs %s", eventID) // fetch the state event IDs at the time of the event - stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID()) + stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, roomID, eventID) if err != nil { - return nil, nil, err + return nil, err } - - // fetch as many as we can from the roomserver, do them as 2 calls rather than - // 1 to try to reduce the number of parameters in the bulk query this will use - haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs)) - haveEventIDs := make(map[string]bool) - for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} { - queryReq := api.QueryEventsByIDRequest{ - EventIDs: eventList, - } - var queryRes api.QueryEventsByIDResponse - if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil { - return nil, nil, err - } - // allow indexing of current state by event ID - for i := range queryRes.Events { - haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i] - haveEventIDs[queryRes.Events[i].EventID()] = true - } - } - // work out which auth/state IDs are missing wantIDs := append(stateIDs.StateEventIDs, stateIDs.AuthEventIDs...) missing := make(map[string]bool) + var missingEventList []string for _, sid := range wantIDs { - if _, ok := haveEventMap[sid]; !ok { - missing[sid] = true + if _, ok := t.haveEvents[sid]; !ok { + if !missing[sid] { + missing[sid] = true + missingEventList = append(missingEventList, sid) + } } } + + // fetch as many as we can from the roomserver + queryReq := api.QueryEventsByIDRequest{ + EventIDs: missingEventList, + } + var queryRes api.QueryEventsByIDResponse + if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil { + return nil, err + } + for i := range queryRes.Events { + evID := queryRes.Events[i].EventID() + t.haveEvents[evID] = &queryRes.Events[i] + if missing[evID] { + delete(missing, evID) + } + } + util.GetLogger(t.context).WithFields(logrus.Fields{ "missing": len(missing), - "event_id": e.EventID(), - "room_id": e.RoomID(), - "already_have": len(haveEventMap), + "event_id": eventID, + "room_id": roomID, "total_state": len(stateIDs.StateEventIDs), "total_auth_events": len(stateIDs.AuthEventIDs), }).Info("Fetching missing state at event") for missingEventID := range missing { - var txn gomatrixserverlib.Transaction - txn, err = t.federation.GetEvent(t.context, t.Origin, missingEventID) + var h *gomatrixserverlib.HeaderedEvent + h, err = t.lookupEvent(roomVersion, missingEventID, false) if err != nil { - util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID") - return nil, nil, err - } - for _, pdu := range txn.PDUs { - var event gomatrixserverlib.Event - event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) - if err != nil { - util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) - return nil, nil, unmarshalError{err} - } - if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { - util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) - return nil, nil, verifySigError{event.EventID(), err} - } - h := event.Headered(roomVersion) - haveEventMap[event.EventID()] = &h + return nil, err } + t.haveEvents[h.EventID()] = h } - resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap) - return resp, haveEventIDs, err + resp, err := t.createRespStateFromStateIDs(stateIDs) + return resp, err } -func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) ( +func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs) ( *gomatrixserverlib.RespState, error) { // create a RespState response using the response to /state_ids as a guide respState := gomatrixserverlib.RespState{ @@ -439,22 +713,55 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat } for i := range stateIDs.StateEventIDs { - ev, ok := haveEventMap[stateIDs.StateEventIDs[i]] + ev, ok := t.haveEvents[stateIDs.StateEventIDs[i]] if !ok { return nil, fmt.Errorf("missing state event %s", stateIDs.StateEventIDs[i]) } respState.StateEvents[i] = ev.Unwrap() } for i := range stateIDs.AuthEventIDs { - ev, ok := haveEventMap[stateIDs.AuthEventIDs[i]] + ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]] if !ok { return nil, fmt.Errorf("missing auth event %s", stateIDs.AuthEventIDs[i]) } respState.AuthEvents[i] = ev.Unwrap() } - // Check that the returned state is valid. - if err := respState.Check(t.context, t.keys); err != nil { - return nil, err - } + // We purposefully do not do auth checks on the returned events, as they will still + // be processed in the exact same way, just as a 'rejected' event + // TODO: Add a field to HeaderedEvent to indicate if the event is rejected. return &respState, nil } + +func (t *txnReq) lookupEvent(roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) { + if localFirst { + // fetch from the roomserver + queryReq := api.QueryEventsByIDRequest{ + EventIDs: []string{missingEventID}, + } + var queryRes api.QueryEventsByIDResponse + if err := t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil { + util.GetLogger(t.context).Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err) + } else if len(queryRes.Events) == 1 { + return &queryRes.Events[0], nil + } + } + txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID) + if err != nil || len(txn.PDUs) == 0 { + util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID") + return nil, err + } + pdu := txn.PDUs[0] + var event gomatrixserverlib.Event + event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion) + if err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID()) + return nil, unmarshalError{err} + } + if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil { + util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) + return nil, verifySigError{event.EventID(), err} + } + h := event.Headered(roomVersion) + t.newEvents[h.EventID()] = true + return &h, nil +} diff --git a/federationapi/routing/send_test.go b/federationapi/routing/send_test.go index 89d28aa1..cb8aec6f 100644 --- a/federationapi/routing/send_test.go +++ b/federationapi/routing/send_test.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "reflect" - "sort" "testing" "time" @@ -79,9 +78,10 @@ func (p *testEDUProducer) InputTypingEvent( } type testRoomserverAPI struct { - inputRoomEvents []api.InputRoomEvent - queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse - queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse + inputRoomEvents []api.InputRoomEvent + queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse + queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse + queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse } func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {} @@ -92,6 +92,9 @@ func (t *testRoomserverAPI) InputRoomEvents( response *api.InputRoomEventsResponse, ) error { t.inputRoomEvents = append(t.inputRoomEvents, request.InputRoomEvents...) + for _, ire := range request.InputRoomEvents { + fmt.Println("InputRoomEvents: ", ire.Event.EventID()) + } return nil } @@ -117,6 +120,13 @@ func (t *testRoomserverAPI) QueryLatestEventsAndState( request *api.QueryLatestEventsAndStateRequest, response *api.QueryLatestEventsAndStateResponse, ) error { + r := t.queryLatestEventsAndState(request) + response.QueryLatestEventsAndStateRequest = *request + response.RoomExists = r.RoomExists + response.RoomVersion = testRoomVersion + response.LatestEvents = r.LatestEvents + response.StateEvents = r.StateEvents + response.Depth = r.Depth return nil } @@ -152,7 +162,7 @@ func (t *testRoomserverAPI) QueryMembershipForUser( request *api.QueryMembershipForUserRequest, response *api.QueryMembershipForUserResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query a list of membership events for a room @@ -161,7 +171,7 @@ func (t *testRoomserverAPI) QueryMembershipsForRoom( request *api.QueryMembershipsForRoomRequest, response *api.QueryMembershipsForRoomResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query a list of invite event senders for a user in a room. @@ -170,7 +180,7 @@ func (t *testRoomserverAPI) QueryInvitesForUser( request *api.QueryInvitesForUserRequest, response *api.QueryInvitesForUserResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query whether a server is allowed to see an event @@ -179,7 +189,7 @@ func (t *testRoomserverAPI) QueryServerAllowedToSeeEvent( request *api.QueryServerAllowedToSeeEventRequest, response *api.QueryServerAllowedToSeeEventResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query missing events for a room from roomserver @@ -188,7 +198,7 @@ func (t *testRoomserverAPI) QueryMissingEvents( request *api.QueryMissingEventsRequest, response *api.QueryMissingEventsResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query to get state and auth chain for a (potentially hypothetical) event. @@ -199,7 +209,7 @@ func (t *testRoomserverAPI) QueryStateAndAuthChain( request *api.QueryStateAndAuthChainRequest, response *api.QueryStateAndAuthChainResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Query a given amount (or less) of events prior to a given set of events. @@ -208,7 +218,7 @@ func (t *testRoomserverAPI) QueryBackfill( request *api.QueryBackfillRequest, response *api.QueryBackfillResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Asks for the default room version as preferred by the server. @@ -217,7 +227,7 @@ func (t *testRoomserverAPI) QueryRoomVersionCapabilities( request *api.QueryRoomVersionCapabilitiesRequest, response *api.QueryRoomVersionCapabilitiesResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Asks for the room version for a given room. @@ -236,7 +246,7 @@ func (t *testRoomserverAPI) SetRoomAlias( req *api.SetRoomAliasRequest, response *api.SetRoomAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Get the room ID for an alias @@ -245,7 +255,7 @@ func (t *testRoomserverAPI) GetRoomIDForAlias( req *api.GetRoomIDForAliasRequest, response *api.GetRoomIDForAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Get all known aliases for a room ID @@ -254,7 +264,7 @@ func (t *testRoomserverAPI) GetAliasesForRoomID( req *api.GetAliasesForRoomIDRequest, response *api.GetAliasesForRoomIDResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Get the user ID of the creator of an alias @@ -263,7 +273,7 @@ func (t *testRoomserverAPI) GetCreatorIDForAlias( req *api.GetCreatorIDForAliasRequest, response *api.GetCreatorIDForAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } // Remove a room alias @@ -272,18 +282,20 @@ func (t *testRoomserverAPI) RemoveRoomAlias( req *api.RemoveRoomAliasRequest, response *api.RemoveRoomAliasResponse, ) error { - return nil + return fmt.Errorf("not implemented") } type txnFedClient struct { - state map[string]gomatrixserverlib.RespState // event_id to response - stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response - getEvent map[string]gomatrixserverlib.Transaction // event_id to response + state map[string]gomatrixserverlib.RespState // event_id to response + stateIDs map[string]gomatrixserverlib.RespStateIDs // event_id to response + getEvent map[string]gomatrixserverlib.Transaction // event_id to response + getMissingEvents func(gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) } func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) ( res gomatrixserverlib.RespState, err error, ) { + fmt.Println("testFederationClient.LookupState", eventID) r, ok := c.state[eventID] if !ok { err = fmt.Errorf("txnFedClient: no /state for event %s", eventID) @@ -293,6 +305,7 @@ func (c *txnFedClient) LookupState(ctx context.Context, s gomatrixserverlib.Serv return } func (c *txnFedClient) LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error) { + fmt.Println("testFederationClient.LookupStateIDs", eventID) r, ok := c.stateIDs[eventID] if !ok { err = fmt.Errorf("txnFedClient: no /state_ids for event %s", eventID) @@ -302,6 +315,7 @@ func (c *txnFedClient) LookupStateIDs(ctx context.Context, s gomatrixserverlib.S return } func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error) { + fmt.Println("testFederationClient.GetEvent", eventID) r, ok := c.getEvent[eventID] if !ok { err = fmt.Errorf("txnFedClient: no /event for event ID %s", eventID) @@ -310,6 +324,10 @@ func (c *txnFedClient) GetEvent(ctx context.Context, s gomatrixserverlib.ServerN res = r return } +func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents, + roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error) { + return c.getMissingEvents(missing) +} func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq { t := &txnReq{ @@ -319,6 +337,8 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat eduProducer: producers.NewEDUServerProducer(&testEDUProducer{}), keys: &testNopJSONVerifier{}, federation: fedClient, + haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), + newEvents: make(map[string]bool), } t.PDUs = pdus t.Origin = testOrigin @@ -368,6 +388,9 @@ NextTuple: } func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []gomatrixserverlib.HeaderedEvent) { + for _, g := range got { + fmt.Println("GOT ", g.Event.EventID()) + } if len(got) != len(want) { t.Errorf("wrong number of InputRoomEvents: got %d want %d", len(got), len(want)) return @@ -424,32 +447,167 @@ func TestTransactionFailAuthChecks(t *testing.T) { assertInputRoomEvents(t, rsAPI.inputRoomEvents, nil) // expect no messages to be sent to the roomserver } -// The purpose of this test is to check that when there are missing prev_events that state is fetched via /state_ids -// and /event and not /state. It works by setting PrevEventsExist=false in the roomserver query response, resulting in -// a call to /state_ids which returns the whole room state. It should attempt to fetch as many of these events from the -// roomserver FIRST, resulting in a call to QueryEventsByID. However, this will be missing the m.room.power_levels event which -// should then be requested via /event. The net result is that the transaction should succeed and there should be 2 -// new events, first the m.room.power_levels event we were missing, then the transaction PDU. +// The purpose of this test is to make sure that when an event is received for which we do not know the prev_events, +// we request them from /get_missing_events. It works by setting PrevEventsExist=false in the roomserver query response, +// resulting in a call to /get_missing_events which returns the missing prev event. Both events should be processed in +// topological order and sent to the roomserver. +func TestTransactionFetchMissingPrevEvents(t *testing.T) { + haveEvent := testEvents[len(testEvents)-3] + prevEvent := testEvents[len(testEvents)-2] + inputEvent := testEvents[len(testEvents)-1] + + var rsAPI *testRoomserverAPI // ref here so we can refer to inputRoomEvents inside these functions + rsAPI = &testRoomserverAPI{ + queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { + // we expect this to be called three times: + // - first with input event to realise there's a gap + // - second with the prevEvent to realise there is no gap + // - third with the input event to realise there is no longer a gap + prevEventsExist := false + if len(req.PrevEventIDs) == 1 { + switch req.PrevEventIDs[0] { + case haveEvent.EventID(): + prevEventsExist = true + case prevEvent.EventID(): + // we only have this event if we've been send prevEvent + if len(rsAPI.inputRoomEvents) == 1 && rsAPI.inputRoomEvents[0].Event.EventID() == prevEvent.EventID() { + prevEventsExist = true + } + } + } + + return api.QueryStateAfterEventsResponse{ + PrevEventsExist: prevEventsExist, + RoomExists: true, + StateEvents: fromStateTuples(req.StateToFetch, nil), + } + }, + queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse { + return api.QueryLatestEventsAndStateResponse{ + RoomExists: true, + Depth: haveEvent.Depth(), + LatestEvents: []gomatrixserverlib.EventReference{ + haveEvent.EventReference(), + }, + StateEvents: fromStateTuples(req.StateToFetch, nil), + } + }, + } + + cli := &txnFedClient{ + getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) { + if !reflect.DeepEqual(missing.EarliestEvents, []string{haveEvent.EventID()}) { + t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, haveEvent.EventID()) + } + if !reflect.DeepEqual(missing.LatestEvents, []string{inputEvent.EventID()}) { + t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, inputEvent.EventID()) + } + return gomatrixserverlib.RespMissingEvents{ + Events: []gomatrixserverlib.Event{ + prevEvent.Unwrap(), + }, + }, nil + }, + } + + pdus := []json.RawMessage{ + inputEvent.JSON(), + } + txn := mustCreateTransaction(rsAPI, cli, pdus) + mustProcessTransaction(t, txn, nil) + assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{prevEvent, inputEvent}) +} + +// The purpose of this test is to check that when there are missing prev_events and we still haven't been able to fill +// in the hole with /get_missing_events that the state BEFORE the events we want to persist is fetched via /state_ids +// and /event. It works by setting PrevEventsExist=false in the roomserver query response, resulting in +// a call to /get_missing_events which returns 1 out of the 2 events it needs to fill in the gap. Synapse and Dendrite +// both give up after 1x /get_missing_events call, relying on requesting the state AFTER the missing event in order to +// continue. The DAG looks something like: +// FE GME TXN +// A ---> B ---> C ---> D +// TXN=event in the txn, GME=response to /get_missing_events, FE=roomserver's forward extremity. Should result in: +// - /state_ids?event=B is requested, then /event/B to get the state AFTER B. B is a state event. +// - state resolution is done to check C is allowed. +// This results in B being sent as an outlier FIRST, then C,D. func TestTransactionFetchMissingStateByStateIDs(t *testing.T) { - missingStateEvent := testStateEvents[gomatrixserverlib.StateKeyTuple{ + eventA := testEvents[len(testEvents)-5] + // this is also len(testEvents)-4 + eventB := testStateEvents[gomatrixserverlib.StateKeyTuple{ EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: "", }] - rsAPI := &testRoomserverAPI{ + eventC := testEvents[len(testEvents)-3] + eventD := testEvents[len(testEvents)-2] + fmt.Println("a:", eventA.EventID()) + fmt.Println("b:", eventB.EventID()) + fmt.Println("c:", eventC.EventID()) + fmt.Println("d:", eventD.EventID()) + var rsAPI *testRoomserverAPI + rsAPI = &testRoomserverAPI{ queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { + omitTuples := []gomatrixserverlib.StateKeyTuple{ + { + EventType: gomatrixserverlib.MRoomPowerLevels, + StateKey: "", + }, + } + askingForEvent := req.PrevEventIDs[0] + haveEventB := false + haveEventC := false + for _, ev := range rsAPI.inputRoomEvents { + switch ev.Event.EventID() { + case eventB.EventID(): + haveEventB = true + omitTuples = nil // include event B now + case eventC.EventID(): + haveEventC = true + } + } + prevEventExists := false + if askingForEvent == eventC.EventID() { + prevEventExists = haveEventC + } else if askingForEvent == eventB.EventID() { + prevEventExists = haveEventB + } + var stateEvents []gomatrixserverlib.HeaderedEvent + if prevEventExists { + stateEvents = fromStateTuples(req.StateToFetch, omitTuples) + } return api.QueryStateAfterEventsResponse{ - // setting this to false should trigger a call to /state_ids - PrevEventsExist: false, + PrevEventsExist: prevEventExists, RoomExists: true, - StateEvents: nil, + StateEvents: stateEvents, + } + }, + queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse { + omitTuples := []gomatrixserverlib.StateKeyTuple{ + {EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""}, + } + return api.QueryLatestEventsAndStateResponse{ + RoomExists: true, + Depth: eventA.Depth(), + LatestEvents: []gomatrixserverlib.EventReference{ + eventA.EventReference(), + }, + StateEvents: fromStateTuples(req.StateToFetch, omitTuples), } }, queryEventsByID: func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse { var res api.QueryEventsByIDResponse + fmt.Println("queryEventsByID ", req.EventIDs) for _, wantEventID := range req.EventIDs { for _, ev := range testStateEvents { - // roomserver is missing the power levels event - if wantEventID == missingStateEvent.EventID() { + // roomserver is missing the power levels event unless it's been sent to us recently as an outlier + if wantEventID == eventB.EventID() { + fmt.Println("Asked for pl event") + for _, inEv := range rsAPI.inputRoomEvents { + fmt.Println("recv ", inEv.Event.EventID()) + if inEv.Event.EventID() == wantEventID { + res.Events = append(res.Events, inEv.Event) + break + } + } continue } if ev.EventID() == wantEventID { @@ -461,91 +619,55 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) { return res }, } - inputEvent := testEvents[len(testEvents)-1] + // /state_ids for event B returns every state event but B (it's the state before) + var authEventIDs []string var stateEventIDs []string for _, ev := range testStateEvents { + if ev.EventID() == eventB.EventID() { + continue + } + // state res checks what auth events you give it, and this isn't a valid auth event + if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility { + authEventIDs = append(authEventIDs, ev.EventID()) + } stateEventIDs = append(stateEventIDs, ev.EventID()) } cli := &txnFedClient{ - // /state_ids returns all the state events stateIDs: map[string]gomatrixserverlib.RespStateIDs{ - inputEvent.EventID(): gomatrixserverlib.RespStateIDs{ + eventB.EventID(): { StateEventIDs: stateEventIDs, - AuthEventIDs: stateEventIDs, + AuthEventIDs: authEventIDs, }, }, - // /event for the missing state event returns it + // /event for event B returns it getEvent: map[string]gomatrixserverlib.Transaction{ - missingStateEvent.EventID(): gomatrixserverlib.Transaction{ + eventB.EventID(): { PDUs: []json.RawMessage{ - missingStateEvent.JSON(), + eventB.JSON(), }, }, }, - } - - pdus := []json.RawMessage{ - testData[len(testData)-1], // a message event - } - txn := mustCreateTransaction(rsAPI, cli, pdus) - mustProcessTransaction(t, txn, nil) - assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{missingStateEvent, inputEvent}) -} - -// The purpose of this test is to check that when there are missing prev_events and /state_ids fails, that we fallback to -// calling /state which returns the entire room state at that event. It works by setting PrevEventsExist=false in the -// roomserver query response, resulting in a call to /state_ids which fails (unset). It should then fetch via /state. -func TestTransactionFetchMissingStateByFallbackState(t *testing.T) { - rsAPI := &testRoomserverAPI{ - queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse { - return api.QueryStateAfterEventsResponse{ - // setting this to false should trigger a call to /state_ids - PrevEventsExist: false, - RoomExists: true, - StateEvents: nil, + // /get_missing_events should be done exactly once + getMissingEvents: func(missing gomatrixserverlib.MissingEvents) (res gomatrixserverlib.RespMissingEvents, err error) { + if !reflect.DeepEqual(missing.EarliestEvents, []string{eventA.EventID()}) { + t.Errorf("call to /get_missing_events wrong earliest events: got %v want %v", missing.EarliestEvents, eventA.EventID()) } - }, - } - inputEvent := testEvents[len(testEvents)-1] - // first 5 events are the state events, in auth event order. - stateEvents := testEvents[:5] - - cli := &txnFedClient{ - // /state_ids purposefully unset - stateIDs: nil, - // /state returns the state at that event (which is the current state) - state: map[string]gomatrixserverlib.RespState{ - inputEvent.EventID(): gomatrixserverlib.RespState{ - AuthEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents), - StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents), - }, + if !reflect.DeepEqual(missing.LatestEvents, []string{eventD.EventID()}) { + t.Errorf("call to /get_missing_events wrong latest events: got %v want %v", missing.LatestEvents, eventD.EventID()) + } + // just return event C, not event B so /state_ids logic kicks in as there will STILL be missing prev_events + return gomatrixserverlib.RespMissingEvents{ + Events: []gomatrixserverlib.Event{ + eventC.Unwrap(), + }, + }, nil }, } pdus := []json.RawMessage{ - testData[len(testData)-1], // a message event + eventD.JSON(), } txn := mustCreateTransaction(rsAPI, cli, pdus) mustProcessTransaction(t, txn, nil) - // the roomserver should get all state events and the new input event - // TODO: it should really be only giving the missing ones - got := rsAPI.inputRoomEvents - if len(got) != len(stateEvents)+1 { - t.Fatalf("wrong number of InputRoomEvents: got %d want %d", len(got), len(stateEvents)+1) - } - last := got[len(got)-1] - if last.Event.EventID() != inputEvent.EventID() { - t.Errorf("last event should be the input event but it wasn't. got %s want %s", last.Event.EventID(), inputEvent.EventID()) - } - gots := make([]string, len(stateEvents)) - wants := make([]string, len(stateEvents)) - for i := range stateEvents { - gots[i] = got[i].Event.EventID() - wants[i] = stateEvents[i].EventID() - } - sort.Strings(gots) - sort.Strings(wants) - if !reflect.DeepEqual(gots, wants) { - t.Errorf("state events returned mismatch, got (sorted): %+v want %+v", gots, wants) - } + assertInputRoomEvents(t, rsAPI.inputRoomEvents, []gomatrixserverlib.HeaderedEvent{eventB, eventC, eventD}) } diff --git a/roomserver/internal/input_events.go b/roomserver/internal/input_events.go index a0bfaa2e..f4d3bb8f 100644 --- a/roomserver/internal/input_events.go +++ b/roomserver/internal/input_events.go @@ -55,7 +55,7 @@ func processRoomEvent( // Check that the event passes authentication checks and work out the numeric IDs for the auth events. authEventNIDs, err := checkAuthEvents(ctx, db, headered, input.AuthEventIDs) if err != nil { - logrus.WithError(err).WithField("event_id", event.EventID()).Error("processRoomEvent.checkAuthEvents failed for event") + logrus.WithError(err).WithField("event_id", event.EventID()).WithField("auth_event_ids", input.AuthEventIDs).Error("processRoomEvent.checkAuthEvents failed for event") return } diff --git a/sytest-whitelist b/sytest-whitelist index 6e1b4f63..6dbc7ab2 100644 --- a/sytest-whitelist +++ b/sytest-whitelist @@ -264,4 +264,11 @@ User can invite local user to room with version 5 remote user can join room with version 5 User can invite remote user to room with version 5 Remote user can backfill in a room with version 5 +Inbound federation can receive v1 /send_join +Inbound federation can get state for a room +Inbound federation of state requires event_id as a mandatory paramater +Inbound federation can get state_ids for a room +Inbound federation of state_ids requires event_id as a mandatory paramater +Federation rejects inbound events where the prev_events cannot be found +Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state Alternative server names do not cause a routing loop