Ensure we use the correct list of events when excluding repeated state

This commit is contained in:
Neil Alexander 2021-06-29 16:55:31 +01:00
parent 05ec6eb973
commit f5dff231ca
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -106,8 +106,8 @@ func Send(
eduAPI: eduAPI,
keys: keys,
federation: federation,
hadEvents: make(map[string]bool),
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
newEvents: make(map[string]bool),
keyAPI: keyAPI,
roomsMu: mu,
}
@ -167,13 +167,12 @@ type txnReq struct {
servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex
roomsMu *internal.MutexByRoom
// a list of events from the auth and prev events which we already had
hadEvents map[string]bool
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
// new events which the roomserver does not know about
newEvents map[string]bool
newEventsMutex sync.RWMutex
work string // metrics
work string // metrics
}
// A subset of FederationClient functionality that txn requires. Useful for testing.
@ -340,19 +339,6 @@ func (e missingPrevEventsError) Error() string {
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
}
func (t *txnReq) haveEventIDs() map[string]bool {
t.newEventsMutex.RLock()
defer t.newEventsMutex.RUnlock()
result := make(map[string]bool, len(t.haveEvents))
for eventID := range t.haveEvents {
if t.newEvents[eventID] {
continue
}
result[eventID] = true
}
return result
}
func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
eduCountTotal.Inc()
@ -522,6 +508,15 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
}
// Prepare a map of all the events we already had before this point, so
// that we don't send them to the roomserver again.
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
t.hadEvents[eventID] = true
}
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
t.hadEvents[eventID] = false
}
if !stateResp.RoomExists {
// TODO: When synapse receives a message for a room it is not in it
// asks the remote server for the state of the room so that it can
@ -739,7 +734,7 @@ func (t *txnReq) processEventWithMissingState(
api.KindOld,
resolvedState,
backwardsExtremity.Headered(roomVersion),
nil,
t.hadEvents,
)
if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err)
@ -1235,9 +1230,5 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}
h := t.cacheAndReturn(event.Headered(roomVersion))
t.newEventsMutex.Lock()
t.newEvents[h.EventID()] = true
t.newEventsMutex.Unlock()
return h, nil
return t.cacheAndReturn(event.Headered(roomVersion)), nil
}