diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 9c750923..892f09b0 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -106,8 +106,8 @@ func Send( eduAPI: eduAPI, keys: keys, federation: federation, + hadEvents: make(map[string]bool), haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent), - newEvents: make(map[string]bool), keyAPI: keyAPI, roomsMu: mu, } @@ -167,13 +167,12 @@ type txnReq struct { servers []gomatrixserverlib.ServerName serversMutex sync.RWMutex roomsMu *internal.MutexByRoom + // a list of events from the auth and prev events which we already had + hadEvents map[string]bool // local cache of events for auth checks, etc - this may include events // which the roomserver is unaware of. haveEvents map[string]*gomatrixserverlib.HeaderedEvent - // new events which the roomserver does not know about - newEvents map[string]bool - newEventsMutex sync.RWMutex - work string // metrics + work string // metrics } // A subset of FederationClient functionality that txn requires. Useful for testing. @@ -340,19 +339,6 @@ func (e missingPrevEventsError) Error() string { return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err) } -func (t *txnReq) haveEventIDs() map[string]bool { - t.newEventsMutex.RLock() - defer t.newEventsMutex.RUnlock() - result := make(map[string]bool, len(t.haveEvents)) - for eventID := range t.haveEvents { - if t.newEvents[eventID] { - continue - } - result[eventID] = true - } - return result -} - func (t *txnReq) processEDUs(ctx context.Context) { for _, e := range t.EDUs { eduCountTotal.Inc() @@ -517,6 +503,15 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err) } + // Prepare a map of all the events we already had before this point, so + // that we don't send them to the roomserver again. + for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) { + t.hadEvents[eventID] = true + } + for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) { + t.hadEvents[eventID] = false + } + if !stateResp.RoomExists { // TODO: When synapse receives a message for a room it is not in it // asks the remote server for the state of the room so that it can @@ -739,7 +734,7 @@ func (t *txnReq) processEventWithMissingState( api.KindOld, resolvedState, backwardsExtremity.Headered(roomVersion), - t.haveEventIDs(), + t.hadEvents, ) if err != nil { return fmt.Errorf("api.SendEventWithState: %w", err) @@ -1235,9 +1230,5 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib. util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID()) return nil, verifySigError{event.EventID(), err} } - h := t.cacheAndReturn(event.Headered(roomVersion)) - t.newEventsMutex.Lock() - t.newEvents[h.EventID()] = true - t.newEventsMutex.Unlock() - return h, nil + return t.cacheAndReturn(event.Headered(roomVersion)), nil }