From ee429a56f4c6e6a4ae7a08ab5d71600168f1567b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 2 Jun 2017 14:14:49 +0100 Subject: [PATCH] Handle the case where we are missing state --- .../dendrite-federation-api-server/main.go | 2 +- .../dendrite/federationapi/routing/routing.go | 3 +- .../dendrite/federationapi/writers/send.go | 117 +++++++++++------- .../dendrite/roomserver/query/query.go | 9 +- .../roomserver/storage/events_table.go | 12 +- .../dendrite/roomserver/types/types.go | 6 + 6 files changed, 96 insertions(+), 53 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go index 642388df..268eecbe 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-federation-api-server/main.go @@ -109,7 +109,7 @@ func main() { log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err) } - routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing) + routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation) log.Fatal(http.ListenAndServe(bindAddr, nil)) } diff --git a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go index e9461135..0f20d011 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/federationapi/routing/routing.go @@ -40,6 +40,7 @@ func Setup( query api.RoomserverQueryAPI, producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, + federation *gomatrixserverlib.FederationClient, ) { apiMux := mux.NewRouter() v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter() @@ -62,7 +63,7 @@ func Setup( return writers.Send( req, gomatrixserverlib.TransactionID(vars["txnID"]), time.Now(), - cfg, query, producer, keys, + cfg, query, producer, keys, federation, ) }, )) diff --git a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go index da6ed34d..89acd5b9 100644 --- a/src/github.com/matrix-org/dendrite/federationapi/writers/send.go +++ b/src/github.com/matrix-org/dendrite/federationapi/writers/send.go @@ -23,25 +23,31 @@ func Send( query api.RoomserverQueryAPI, producer *producers.RoomserverProducer, keys gomatrixserverlib.KeyRing, + federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys) if request == nil { return errResp } - var content gomatrixserverlib.Transaction - if err := json.Unmarshal(request.Content(), &content); err != nil { + t := txnReq{ + query: query, + producer: producer, + keys: keys, + federation: federation, + } + if err := json.Unmarshal(request.Content(), &t); err != nil { return util.JSONResponse{ Code: 400, JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()), } } - content.Origin = request.Origin() - content.TransactionID = txnID - content.Destination = cfg.ServerName + t.Origin = request.Origin() + t.TransactionID = txnID + t.Destination = cfg.ServerName - resp, err := processTransaction(content, query, producer, keys) + resp, err := t.processTransaction() if err != nil { return httputil.LogThenError(req, err) } @@ -52,21 +58,24 @@ func Send( } } -func processTransaction( - t gomatrixserverlib.Transaction, - query api.RoomserverQueryAPI, - producer *producers.RoomserverProducer, - keys gomatrixserverlib.KeyRing, -) (*gomatrixserverlib.RespSend, error) { +type txnReq struct { + gomatrixserverlib.Transaction + query api.RoomserverQueryAPI + producer *producers.RoomserverProducer + keys gomatrixserverlib.KeyRing + federation *gomatrixserverlib.FederationClient +} + +func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { // Check the event signatures - if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, keys); err != nil { + if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil { return nil, err } // Process the events. results := map[string]gomatrixserverlib.PDUResult{} for _, e := range t.PDUs { - err := processEvent(e, query, producer) + err := t.processEvent(e) if err != nil { // If the error is due to the event itself being bad then we skip // it and move onto the next event. We report an error so that the @@ -106,11 +115,7 @@ type unknownRoomError string func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e) } -func processEvent( - e gomatrixserverlib.Event, - query api.RoomserverQueryAPI, - producer *producers.RoomserverProducer, -) error { +func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { refs := e.PrevEvents() prevEventIDs := make([]string, len(refs)) for i := range refs { @@ -125,7 +130,7 @@ func processEvent( StateToFetch: needed.Tuples(), } var stateResp api.QueryStateAfterEventsResponse - if err := query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil { + if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil { return err } @@ -140,33 +145,11 @@ func processEvent( } if !stateResp.PrevEventsExist { - // We are missing the previous events for this events. - // This means that there is a gap in our view of the history of the - // room. There two ways that we can handle such a gap: - // 1) We can fill in the gap using /get_missing_events - // 2) We can leave the gap and request the state of the room at - // this event from the remote server using either /state_ids - // or /state. - // Synapse will attempt to do 1 and if that fails or if the gap is - // too large then it will attempt 2. - // Synapse will use /state_ids if possible since ususally the state - // is largely unchanged and it is more efficient to fetch a list of - // event ids and then use /event to fetch the individual events. - // However not all version of synapse support /state_ids so you may - // need to fallback to /state. - // TODO: Attempt to fill in the gap using /get_missing_events - // TODO: Attempt to fetch the state using /state_ids and /events - // TODO: Attempt to fetch the state using /state - panic(fmt.Errorf("Receiving events with missing prev_events is no implemented")) + return t.processEventWithMissingState(e) } // Check that the event is allowed by the state at the event. - authUsingState := gomatrixserverlib.NewAuthEvents(nil) - for i := range stateResp.StateEvents { - authUsingState.AddEvent(&stateResp.StateEvents[i]) - } - err := gomatrixserverlib.Allowed(e, &authUsingState) - if err != nil { + if err := checkAllowedByState(e, stateResp.StateEvents); err != nil { return err } @@ -174,9 +157,53 @@ func processEvent( // TODO: Check that the event is allowed by its auth_events. // pass the event to the roomserver - if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { + if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { return err } return nil } + +func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error { + authUsingState := gomatrixserverlib.NewAuthEvents(nil) + for i := range stateEvents { + authUsingState.AddEvent(&stateEvents[i]) + } + return gomatrixserverlib.Allowed(e, &authUsingState) +} + +func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error { + // We are missing the previous events for this events. + // This means that there is a gap in our view of the history of the + // room. There two ways that we can handle such a gap: + // 1) We can fill in the gap using /get_missing_events + // 2) We can leave the gap and request the state of the room at + // this event from the remote server using either /state_ids + // or /state. + // Synapse will attempt to do 1 and if that fails or if the gap is + // too large then it will attempt 2. + // Synapse will use /state_ids if possible since ususally the state + // is largely unchanged and it is more efficient to fetch a list of + // event ids and then use /event to fetch the individual events. + // However not all version of synapse support /state_ids so you may + // need to fallback to /state. + // TODO: Attempt to fill in the gap using /get_missing_events + // TODO: Attempt to fetch the state using /state_ids and /events + state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID()) + if err != nil { + return err + } + // Check that the returned state is valid. + if err := state.Check(t.keys); err != nil { + return err + } + // Check that the event is allowed by the state. + if err := checkAllowedByState(e, state.StateEvents); err != nil { + return err + } + // pass the event along with the state to the roomserver + if err := t.producer.SendEventWithState(state, e); err != nil { + return err + } + return nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/query/query.go b/src/github.com/matrix-org/dendrite/roomserver/query/query.go index 6f236e93..1b1820f0 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/query/query.go +++ b/src/github.com/matrix-org/dendrite/roomserver/query/query.go @@ -97,9 +97,12 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents( prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs) if err != nil { - // TODO: Check if the error was because we are missing events from the - // database or are missing state at events from the database. - return err + switch err.(type) { + case types.MissingEventError: + return nil + default: + return err + } } response.PrevEventsExist = true diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index acaf43b6..abbfaac8 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -194,7 +194,9 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S // However it should be possible debug this by replaying queries or entries from the input kafka logs. // If this turns out to be impossible and we do need the debug information here, it would be better // to do it as a separate query rather than slowing down/complicating the common case. - return nil, fmt.Errorf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)) + return nil, types.MissingEventError( + fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)), + ) } return results, err } @@ -218,11 +220,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types return nil, err } if result.BeforeStateSnapshotNID == 0 { - return nil, fmt.Errorf("storage: missing state for event NID %d", result.EventNID) + return nil, types.MissingEventError( + fmt.Sprintf("storage: missing state for event NID %d", result.EventNID), + ) } } if i != len(eventIDs) { - return nil, fmt.Errorf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)) + return nil, types.MissingEventError( + fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)), + ) } return results, err } diff --git a/src/github.com/matrix-org/dendrite/roomserver/types/types.go b/src/github.com/matrix-org/dendrite/roomserver/types/types.go index f728696b..b255b64b 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/types/types.go +++ b/src/github.com/matrix-org/dendrite/roomserver/types/types.go @@ -168,3 +168,9 @@ type RoomRecentEventsUpdater interface { // Rollback the transaction. Rollback() error } + +// A MissingEventError is an error that happened because the roomserver was +// missing requested events from its database. +type MissingEventError string + +func (e MissingEventError) Error() string { return string(e) }