mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-11 22:33:40 +00:00
Ensure we use the correct list of events when excluding repeated state
This commit is contained in:
parent
71eebd6bc3
commit
3a1d14ceec
1 changed files with 15 additions and 24 deletions
|
@ -106,8 +106,8 @@ func Send(
|
||||||
eduAPI: eduAPI,
|
eduAPI: eduAPI,
|
||||||
keys: keys,
|
keys: keys,
|
||||||
federation: federation,
|
federation: federation,
|
||||||
|
hadEvents: make(map[string]bool),
|
||||||
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
|
haveEvents: make(map[string]*gomatrixserverlib.HeaderedEvent),
|
||||||
newEvents: make(map[string]bool),
|
|
||||||
keyAPI: keyAPI,
|
keyAPI: keyAPI,
|
||||||
roomsMu: mu,
|
roomsMu: mu,
|
||||||
}
|
}
|
||||||
|
@ -167,13 +167,12 @@ type txnReq struct {
|
||||||
servers []gomatrixserverlib.ServerName
|
servers []gomatrixserverlib.ServerName
|
||||||
serversMutex sync.RWMutex
|
serversMutex sync.RWMutex
|
||||||
roomsMu *internal.MutexByRoom
|
roomsMu *internal.MutexByRoom
|
||||||
|
// a list of events from the auth and prev events which we already had
|
||||||
|
hadEvents map[string]bool
|
||||||
// local cache of events for auth checks, etc - this may include events
|
// local cache of events for auth checks, etc - this may include events
|
||||||
// which the roomserver is unaware of.
|
// which the roomserver is unaware of.
|
||||||
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
||||||
// new events which the roomserver does not know about
|
work string // metrics
|
||||||
newEvents map[string]bool
|
|
||||||
newEventsMutex sync.RWMutex
|
|
||||||
work string // metrics
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
||||||
|
@ -340,19 +339,6 @@ func (e missingPrevEventsError) Error() string {
|
||||||
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
|
return fmt.Sprintf("unable to get prev_events for event %q: %s", e.eventID, e.err)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *txnReq) haveEventIDs() map[string]bool {
|
|
||||||
t.newEventsMutex.RLock()
|
|
||||||
defer t.newEventsMutex.RUnlock()
|
|
||||||
result := make(map[string]bool, len(t.haveEvents))
|
|
||||||
for eventID := range t.haveEvents {
|
|
||||||
if t.newEvents[eventID] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
result[eventID] = true
|
|
||||||
}
|
|
||||||
return result
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *txnReq) processEDUs(ctx context.Context) {
|
func (t *txnReq) processEDUs(ctx context.Context) {
|
||||||
for _, e := range t.EDUs {
|
for _, e := range t.EDUs {
|
||||||
eduCountTotal.Inc()
|
eduCountTotal.Inc()
|
||||||
|
@ -517,6 +503,15 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
|
||||||
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
|
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare a map of all the events we already had before this point, so
|
||||||
|
// that we don't send them to the roomserver again.
|
||||||
|
for _, eventID := range append(e.AuthEventIDs(), e.PrevEventIDs()...) {
|
||||||
|
t.hadEvents[eventID] = true
|
||||||
|
}
|
||||||
|
for _, eventID := range append(stateResp.MissingAuthEventIDs, stateResp.MissingPrevEventIDs...) {
|
||||||
|
t.hadEvents[eventID] = false
|
||||||
|
}
|
||||||
|
|
||||||
if !stateResp.RoomExists {
|
if !stateResp.RoomExists {
|
||||||
// TODO: When synapse receives a message for a room it is not in it
|
// TODO: When synapse receives a message for a room it is not in it
|
||||||
// asks the remote server for the state of the room so that it can
|
// asks the remote server for the state of the room so that it can
|
||||||
|
@ -739,7 +734,7 @@ func (t *txnReq) processEventWithMissingState(
|
||||||
api.KindOld,
|
api.KindOld,
|
||||||
resolvedState,
|
resolvedState,
|
||||||
backwardsExtremity.Headered(roomVersion),
|
backwardsExtremity.Headered(roomVersion),
|
||||||
t.haveEventIDs(),
|
t.hadEvents,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("api.SendEventWithState: %w", err)
|
return fmt.Errorf("api.SendEventWithState: %w", err)
|
||||||
|
@ -1235,9 +1230,5 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
|
||||||
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
||||||
return nil, verifySigError{event.EventID(), err}
|
return nil, verifySigError{event.EventID(), err}
|
||||||
}
|
}
|
||||||
h := t.cacheAndReturn(event.Headered(roomVersion))
|
return t.cacheAndReturn(event.Headered(roomVersion)), nil
|
||||||
t.newEventsMutex.Lock()
|
|
||||||
t.newEvents[h.EventID()] = true
|
|
||||||
t.newEventsMutex.Unlock()
|
|
||||||
return h, nil
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue