mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 06:12:45 +00:00
only send new events to RS; add tests for /state_ids and /event (#1011)
* only send new events to RS; add tests for /state_ids and /event * Review comments: send in auth event order * Ignore order of state events for this test as RespState.Events is non-deterministic
This commit is contained in:
parent
1294852270
commit
3b98535dc5
3 changed files with 183 additions and 23 deletions
|
@ -309,9 +309,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
|
|||
// TODO: Attempt to fill in the gap using /get_missing_events
|
||||
|
||||
// Attempt to fetch the missing state using /state_ids and /events
|
||||
var respState *gomatrixserverlib.RespState
|
||||
var err error
|
||||
respState, err = t.lookupMissingStateViaStateIDs(e, roomVersion)
|
||||
respState, haveEventIDs, err := t.lookupMissingStateViaStateIDs(e, roomVersion)
|
||||
if err != nil {
|
||||
// Fallback to /state
|
||||
util.GetLogger(t.context).WithError(err).Warn("processEventWithMissingState failed to /state_ids, falling back to /state")
|
||||
|
@ -343,8 +341,9 @@ retryAllowedState:
|
|||
return err
|
||||
}
|
||||
|
||||
// pass the event along with the state to the roomserver
|
||||
return t.producer.SendEventWithState(t.context, respState, e.Headered(roomVersion))
|
||||
// pass the event along with the state to the roomserver using a background context so we don't
|
||||
// needlessly expire
|
||||
return t.producer.SendEventWithState(context.Background(), respState, e.Headered(roomVersion), haveEventIDs)
|
||||
}
|
||||
|
||||
func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
|
@ -361,28 +360,30 @@ func (t *txnReq) lookupMissingStateViaState(e gomatrixserverlib.Event, roomVersi
|
|||
}
|
||||
|
||||
func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) (
|
||||
*gomatrixserverlib.RespState, error) {
|
||||
*gomatrixserverlib.RespState, map[string]bool, error) {
|
||||
|
||||
// fetch the state event IDs at the time of the event
|
||||
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, e.RoomID(), e.EventID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// fetch as many as we can from the roomserver, do them as 2 calls rather than
|
||||
// 1 to try to reduce the number of parameters in the bulk query this will use
|
||||
haveEventMap := make(map[string]*gomatrixserverlib.HeaderedEvent, len(stateIDs.StateEventIDs))
|
||||
haveEventIDs := make(map[string]bool)
|
||||
for _, eventList := range [][]string{stateIDs.StateEventIDs, stateIDs.AuthEventIDs} {
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
EventIDs: eventList,
|
||||
}
|
||||
var queryRes api.QueryEventsByIDResponse
|
||||
if err := t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
|
||||
return nil, err
|
||||
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
// allow indexing of current state by event ID
|
||||
for i := range queryRes.Events {
|
||||
haveEventMap[queryRes.Events[i].EventID()] = &queryRes.Events[i]
|
||||
haveEventIDs[queryRes.Events[i].EventID()] = true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -404,26 +405,29 @@ func (t *txnReq) lookupMissingStateViaStateIDs(e gomatrixserverlib.Event, roomVe
|
|||
}).Info("Fetching missing state at event")
|
||||
|
||||
for missingEventID := range missing {
|
||||
txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID)
|
||||
var txn gomatrixserverlib.Transaction
|
||||
txn, err = t.federation.GetEvent(t.context, t.Origin, missingEventID)
|
||||
if err != nil {
|
||||
util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
for _, pdu := range txn.PDUs {
|
||||
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
|
||||
var event gomatrixserverlib.Event
|
||||
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
|
||||
if err != nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
|
||||
return nil, unmarshalError{err}
|
||||
return nil, nil, unmarshalError{err}
|
||||
}
|
||||
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
||||
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
|
||||
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
||||
return nil, verifySigError{event.EventID(), err}
|
||||
return nil, nil, verifySigError{event.EventID(), err}
|
||||
}
|
||||
h := event.Headered(roomVersion)
|
||||
haveEventMap[event.EventID()] = &h
|
||||
}
|
||||
}
|
||||
return t.createRespStateFromStateIDs(stateIDs, haveEventMap)
|
||||
resp, err := t.createRespStateFromStateIDs(stateIDs, haveEventMap)
|
||||
return resp, haveEventIDs, err
|
||||
}
|
||||
|
||||
func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStateIDs, haveEventMap map[string]*gomatrixserverlib.HeaderedEvent) (
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue