Handle the case where we are missing state

This commit is contained in:
Mark Haines 2017-06-02 14:14:49 +01:00
parent a024e10054
commit ee429a56f4
6 changed files with 96 additions and 53 deletions

View file

@ -109,7 +109,7 @@ func main() {
log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err) log.Panicf("Failed to setup kafka producers(%s): %s", kafkaURIs, err)
} }
routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing) routing.Setup(http.DefaultServeMux, cfg, queryAPI, roomserverProducer, keyRing, federation)
log.Fatal(http.ListenAndServe(bindAddr, nil)) log.Fatal(http.ListenAndServe(bindAddr, nil))
} }

View file

@ -40,6 +40,7 @@ func Setup(
query api.RoomserverQueryAPI, query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) { ) {
apiMux := mux.NewRouter() apiMux := mux.NewRouter()
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter() v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
@ -62,7 +63,7 @@ func Setup(
return writers.Send( return writers.Send(
req, gomatrixserverlib.TransactionID(vars["txnID"]), req, gomatrixserverlib.TransactionID(vars["txnID"]),
time.Now(), time.Now(),
cfg, query, producer, keys, cfg, query, producer, keys, federation,
) )
}, },
)) ))

View file

@ -23,25 +23,31 @@ func Send(
query api.RoomserverQueryAPI, query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse { ) util.JSONResponse {
request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys) request, errResp := gomatrixserverlib.VerifyHTTPRequest(req, now, cfg.ServerName, keys)
if request == nil { if request == nil {
return errResp return errResp
} }
var content gomatrixserverlib.Transaction t := txnReq{
if err := json.Unmarshal(request.Content(), &content); err != nil { query: query,
producer: producer,
keys: keys,
federation: federation,
}
if err := json.Unmarshal(request.Content(), &t); err != nil {
return util.JSONResponse{ return util.JSONResponse{
Code: 400, Code: 400,
JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()), JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()),
} }
} }
content.Origin = request.Origin() t.Origin = request.Origin()
content.TransactionID = txnID t.TransactionID = txnID
content.Destination = cfg.ServerName t.Destination = cfg.ServerName
resp, err := processTransaction(content, query, producer, keys) resp, err := t.processTransaction()
if err != nil { if err != nil {
return httputil.LogThenError(req, err) return httputil.LogThenError(req, err)
} }
@ -52,21 +58,24 @@ func Send(
} }
} }
func processTransaction( type txnReq struct {
t gomatrixserverlib.Transaction, gomatrixserverlib.Transaction
query api.RoomserverQueryAPI, query api.RoomserverQueryAPI
producer *producers.RoomserverProducer, producer *producers.RoomserverProducer
keys gomatrixserverlib.KeyRing, keys gomatrixserverlib.KeyRing
) (*gomatrixserverlib.RespSend, error) { federation *gomatrixserverlib.FederationClient
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Check the event signatures // Check the event signatures
if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, keys); err != nil { if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil {
return nil, err return nil, err
} }
// Process the events. // Process the events.
results := map[string]gomatrixserverlib.PDUResult{} results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range t.PDUs { for _, e := range t.PDUs {
err := processEvent(e, query, producer) err := t.processEvent(e)
if err != nil { if err != nil {
// If the error is due to the event itself being bad then we skip // If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the // it and move onto the next event. We report an error so that the
@ -106,11 +115,7 @@ type unknownRoomError string
func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e) } func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e) }
func processEvent( func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
e gomatrixserverlib.Event,
query api.RoomserverQueryAPI,
producer *producers.RoomserverProducer,
) error {
refs := e.PrevEvents() refs := e.PrevEvents()
prevEventIDs := make([]string, len(refs)) prevEventIDs := make([]string, len(refs))
for i := range refs { for i := range refs {
@ -125,7 +130,7 @@ func processEvent(
StateToFetch: needed.Tuples(), StateToFetch: needed.Tuples(),
} }
var stateResp api.QueryStateAfterEventsResponse var stateResp api.QueryStateAfterEventsResponse
if err := query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil { if err := t.query.QueryStateAfterEvents(&stateReq, &stateResp); err != nil {
return err return err
} }
@ -140,33 +145,11 @@ func processEvent(
} }
if !stateResp.PrevEventsExist { if !stateResp.PrevEventsExist {
// We are missing the previous events for this events. return t.processEventWithMissingState(e)
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
// 1) We can fill in the gap using /get_missing_events
// 2) We can leave the gap and request the state of the room at
// this event from the remote server using either /state_ids
// or /state.
// Synapse will attempt to do 1 and if that fails or if the gap is
// too large then it will attempt 2.
// Synapse will use /state_ids if possible since ususally the state
// is largely unchanged and it is more efficient to fetch a list of
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// TODO: Attempt to fetch the state using /state_ids and /events
// TODO: Attempt to fetch the state using /state
panic(fmt.Errorf("Receiving events with missing prev_events is no implemented"))
} }
// Check that the event is allowed by the state at the event. // Check that the event is allowed by the state at the event.
authUsingState := gomatrixserverlib.NewAuthEvents(nil) if err := checkAllowedByState(e, stateResp.StateEvents); err != nil {
for i := range stateResp.StateEvents {
authUsingState.AddEvent(&stateResp.StateEvents[i])
}
err := gomatrixserverlib.Allowed(e, &authUsingState)
if err != nil {
return err return err
} }
@ -174,9 +157,53 @@ func processEvent(
// TODO: Check that the event is allowed by its auth_events. // TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver // pass the event to the roomserver
if err := producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil { if err := t.producer.SendEvents([]gomatrixserverlib.Event{e}); err != nil {
return err return err
} }
return nil return nil
} }
func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserverlib.Event) error {
authUsingState := gomatrixserverlib.NewAuthEvents(nil)
for i := range stateEvents {
authUsingState.AddEvent(&stateEvents[i])
}
return gomatrixserverlib.Allowed(e, &authUsingState)
}
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
// 1) We can fill in the gap using /get_missing_events
// 2) We can leave the gap and request the state of the room at
// this event from the remote server using either /state_ids
// or /state.
// Synapse will attempt to do 1 and if that fails or if the gap is
// too large then it will attempt 2.
// Synapse will use /state_ids if possible since ususally the state
// is largely unchanged and it is more efficient to fetch a list of
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// TODO: Attempt to fetch the state using /state_ids and /events
state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID())
if err != nil {
return err
}
// Check that the returned state is valid.
if err := state.Check(t.keys); err != nil {
return err
}
// Check that the event is allowed by the state.
if err := checkAllowedByState(e, state.StateEvents); err != nil {
return err
}
// pass the event along with the state to the roomserver
if err := t.producer.SendEventWithState(state, e); err != nil {
return err
}
return nil
}

View file

@ -97,9 +97,12 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs) prevStates, err := r.DB.StateAtEventIDs(request.PrevEventIDs)
if err != nil { if err != nil {
// TODO: Check if the error was because we are missing events from the switch err.(type) {
// database or are missing state at events from the database. case types.MissingEventError:
return err return nil
default:
return err
}
} }
response.PrevEventsExist = true response.PrevEventsExist = true

View file

@ -194,7 +194,9 @@ func (s *eventStatements) bulkSelectStateEventByID(eventIDs []string) ([]types.S
// However it should be possible debug this by replaying queries or entries from the input kafka logs. // However it should be possible debug this by replaying queries or entries from the input kafka logs.
// If this turns out to be impossible and we do need the debug information here, it would be better // If this turns out to be impossible and we do need the debug information here, it would be better
// to do it as a separate query rather than slowing down/complicating the common case. // to do it as a separate query rather than slowing down/complicating the common case.
return nil, fmt.Errorf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)) return nil, types.MissingEventError(
fmt.Sprintf("storage: state event IDs missing from the database (%d != %d)", i, len(eventIDs)),
)
} }
return results, err return results, err
} }
@ -218,11 +220,15 @@ func (s *eventStatements) bulkSelectStateAtEventByID(eventIDs []string) ([]types
return nil, err return nil, err
} }
if result.BeforeStateSnapshotNID == 0 { if result.BeforeStateSnapshotNID == 0 {
return nil, fmt.Errorf("storage: missing state for event NID %d", result.EventNID) return nil, types.MissingEventError(
fmt.Sprintf("storage: missing state for event NID %d", result.EventNID),
)
} }
} }
if i != len(eventIDs) { if i != len(eventIDs) {
return nil, fmt.Errorf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)) return nil, types.MissingEventError(
fmt.Sprintf("storage: event IDs missing from the database (%d != %d)", i, len(eventIDs)),
)
} }
return results, err return results, err
} }

View file

@ -168,3 +168,9 @@ type RoomRecentEventsUpdater interface {
// Rollback the transaction. // Rollback the transaction.
Rollback() error Rollback() error
} }
// A MissingEventError is an error that happened because the roomserver was
// missing requested events from its database.
type MissingEventError string
func (e MissingEventError) Error() string { return string(e) }