mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 06:12:45 +00:00
Ensure only one transaction is used for RS input per room (#2178)
* Ensure the input API only uses a single transaction * Remove more of the dead query API call * Tidy up * Fix tests hopefully * Don't do unnecessary work for rooms that don't exist * Improve error, fix another case where transaction wasn't used properly * Add a unit test for checking single transaction on RS input API * Fix logic oops when deciding whether to use a transaction in storeEvent
This commit is contained in:
parent
a4e7d471af
commit
5106cc807c
13 changed files with 211 additions and 214 deletions
|
@ -10,7 +10,7 @@ import (
|
|||
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/state"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
@ -27,14 +27,13 @@ type missingStateReq struct {
|
|||
origin gomatrixserverlib.ServerName
|
||||
db *shared.RoomUpdater
|
||||
inputer *Inputer
|
||||
queryer *query.Queryer
|
||||
keys gomatrixserverlib.JSONVerifier
|
||||
federation fedapi.FederationInternalAPI
|
||||
roomsMu *internal.MutexByRoom
|
||||
servers []gomatrixserverlib.ServerName
|
||||
hadEvents map[string]bool
|
||||
hadEventsMutex sync.Mutex
|
||||
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
||||
haveEvents map[string]*gomatrixserverlib.Event
|
||||
haveEventsMutex sync.Mutex
|
||||
}
|
||||
|
||||
|
@ -326,20 +325,20 @@ func (t *missingStateReq) lookupStateAfterEvent(ctx context.Context, roomVersion
|
|||
for i := range respState.StateEvents {
|
||||
se := respState.StateEvents[i]
|
||||
if se.Type() == h.Type() && se.StateKeyEquals(*h.StateKey()) {
|
||||
respState.StateEvents[i] = h.Unwrap()
|
||||
respState.StateEvents[i] = h
|
||||
addedToState = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !addedToState {
|
||||
respState.StateEvents = append(respState.StateEvents, h.Unwrap())
|
||||
respState.StateEvents = append(respState.StateEvents, h)
|
||||
}
|
||||
}
|
||||
|
||||
return respState, false, nil
|
||||
}
|
||||
|
||||
func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *gomatrixserverlib.HeaderedEvent {
|
||||
func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.Event) *gomatrixserverlib.Event {
|
||||
t.haveEventsMutex.Lock()
|
||||
defer t.haveEventsMutex.Unlock()
|
||||
if cached, exists := t.haveEvents[ev.EventID()]; exists {
|
||||
|
@ -350,32 +349,49 @@ func (t *missingStateReq) cacheAndReturn(ev *gomatrixserverlib.HeaderedEvent) *g
|
|||
}
|
||||
|
||||
func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string) *parsedRespState {
|
||||
var res api.QueryStateAfterEventsResponse
|
||||
err := t.queryer.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
|
||||
RoomID: roomID,
|
||||
PrevEventIDs: []string{eventID},
|
||||
}, &res)
|
||||
if err != nil || !res.PrevEventsExist {
|
||||
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to query state after %s locally, prev exists=%v", eventID, res.PrevEventsExist)
|
||||
var res parsedRespState
|
||||
roomInfo, err := t.db.RoomInfo(ctx, roomID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
stateEvents := make([]*gomatrixserverlib.HeaderedEvent, len(res.StateEvents))
|
||||
for i, ev := range res.StateEvents {
|
||||
roomState := state.NewStateResolution(t.db, roomInfo)
|
||||
stateAtEvents, err := t.db.StateAtEventIDs(ctx, []string{eventID})
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to get state after %s locally", eventID)
|
||||
return nil
|
||||
}
|
||||
stateEntries, err := roomState.LoadCombinedStateAfterEvents(ctx, stateAtEvents)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to load combined state after %s locally", eventID)
|
||||
return nil
|
||||
}
|
||||
stateEventNIDs := make([]types.EventNID, 0, len(stateEntries))
|
||||
for _, entry := range stateEntries {
|
||||
stateEventNIDs = append(stateEventNIDs, entry.EventNID)
|
||||
}
|
||||
stateEvents, err := t.db.Events(ctx, stateEventNIDs)
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).WithField("room_id", roomID).WithError(err).Warnf("failed to load state events locally")
|
||||
return nil
|
||||
}
|
||||
res.StateEvents = make([]*gomatrixserverlib.Event, 0, len(stateEvents))
|
||||
for _, ev := range stateEvents {
|
||||
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
|
||||
// processEvent request, which is better for memory.
|
||||
stateEvents[i] = t.cacheAndReturn(ev)
|
||||
res.StateEvents = append(res.StateEvents, t.cacheAndReturn(ev.Event))
|
||||
t.hadEvent(ev.EventID())
|
||||
}
|
||||
// we should never access res.StateEvents again so we delete it here to make GC faster
|
||||
res.StateEvents = nil
|
||||
|
||||
var authEvents []*gomatrixserverlib.Event
|
||||
// encourage GC
|
||||
stateEvents, stateEventNIDs, stateEntries, stateAtEvents = nil, nil, nil, nil // nolint:ineffassign
|
||||
|
||||
missingAuthEvents := map[string]bool{}
|
||||
res.AuthEvents = make([]*gomatrixserverlib.Event, 0, len(stateEvents)*3)
|
||||
for _, ev := range stateEvents {
|
||||
t.haveEventsMutex.Lock()
|
||||
for _, ae := range ev.AuthEventIDs() {
|
||||
if aev, ok := t.haveEvents[ae]; ok {
|
||||
authEvents = append(authEvents, aev.Unwrap())
|
||||
res.AuthEvents = append(res.AuthEvents, aev)
|
||||
} else {
|
||||
missingAuthEvents[ae] = true
|
||||
}
|
||||
|
@ -389,25 +405,18 @@ func (t *missingStateReq) lookupStateAfterEventLocally(ctx context.Context, room
|
|||
for evID := range missingAuthEvents {
|
||||
missingEventList = append(missingEventList, evID)
|
||||
}
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
EventIDs: missingEventList,
|
||||
}
|
||||
util.GetLogger(ctx).WithField("count", len(missingEventList)).Debugf("Fetching missing auth events")
|
||||
var queryRes api.QueryEventsByIDResponse
|
||||
if err = t.queryer.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
|
||||
events, err := t.db.EventsFromIDs(ctx, missingEventList)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
for i, ev := range queryRes.Events {
|
||||
authEvents = append(authEvents, t.cacheAndReturn(queryRes.Events[i]).Unwrap())
|
||||
for i, ev := range events {
|
||||
res.AuthEvents = append(res.AuthEvents, t.cacheAndReturn(events[i].Event))
|
||||
t.hadEvent(ev.EventID())
|
||||
}
|
||||
queryRes.Events = nil
|
||||
}
|
||||
|
||||
return &parsedRespState{
|
||||
StateEvents: gomatrixserverlib.UnwrapEventHeaders(stateEvents),
|
||||
AuthEvents: authEvents,
|
||||
}
|
||||
return &res
|
||||
}
|
||||
|
||||
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
|
||||
|
@ -448,7 +457,7 @@ retryAllowedState:
|
|||
return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2)
|
||||
}
|
||||
util.GetLogger(ctx).Tracef("fetched event %s", missing.AuthEventID)
|
||||
resolvedStateEvents = append(resolvedStateEvents, h.Unwrap())
|
||||
resolvedStateEvents = append(resolvedStateEvents, h)
|
||||
goto retryAllowedState
|
||||
default:
|
||||
}
|
||||
|
@ -513,7 +522,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
|||
logger.Debugf("get_missing_events returned %d events", len(missingResp.Events))
|
||||
missingEvents := make([]*gomatrixserverlib.Event, 0, len(missingResp.Events))
|
||||
for _, ev := range missingResp.Events.UntrustedEvents(roomVersion) {
|
||||
missingEvents = append(missingEvents, t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap())
|
||||
missingEvents = append(missingEvents, t.cacheAndReturn(ev))
|
||||
}
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
|
@ -602,11 +611,11 @@ func (t *missingStateReq) lookupMissingStateViaState(
|
|||
// We load these as trusted as we called state.Check before which loaded them as untrusted.
|
||||
for i, evJSON := range state.AuthEvents {
|
||||
ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
|
||||
parsedState.AuthEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
parsedState.AuthEvents[i] = t.cacheAndReturn(ev)
|
||||
}
|
||||
for i, evJSON := range state.StateEvents {
|
||||
ev, _ := gomatrixserverlib.NewEventFromTrustedJSON(evJSON, false, roomVersion)
|
||||
parsedState.StateEvents[i] = t.cacheAndReturn(ev.Headered(roomVersion)).Unwrap()
|
||||
parsedState.StateEvents[i] = t.cacheAndReturn(ev)
|
||||
}
|
||||
return parsedState, nil
|
||||
}
|
||||
|
@ -634,23 +643,22 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
}
|
||||
t.haveEventsMutex.Unlock()
|
||||
|
||||
// fetch as many as we can from the roomserver
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
EventIDs: missingEventList,
|
||||
events, err := t.db.EventsFromIDs(ctx, missingEventList)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("t.db.EventsFromIDs: %w", err)
|
||||
}
|
||||
var queryRes api.QueryEventsByIDResponse
|
||||
if err = t.queryer.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i, ev := range queryRes.Events {
|
||||
queryRes.Events[i] = t.cacheAndReturn(queryRes.Events[i])
|
||||
|
||||
for i, ev := range events {
|
||||
events[i].Event = t.cacheAndReturn(events[i].Event)
|
||||
t.hadEvent(ev.EventID())
|
||||
evID := queryRes.Events[i].EventID()
|
||||
evID := events[i].EventID()
|
||||
if missing[evID] {
|
||||
delete(missing, evID)
|
||||
}
|
||||
}
|
||||
queryRes.Events = nil // allow it to be GCed
|
||||
|
||||
// encourage GC
|
||||
events = nil // nolint:ineffassign
|
||||
|
||||
concurrentRequests := 8
|
||||
missingCount := len(missing)
|
||||
|
@ -704,7 +712,7 @@ func (t *missingStateReq) lookupMissingStateViaStateIDs(ctx context.Context, roo
|
|||
|
||||
// Define what we'll do in order to fetch the missing event ID.
|
||||
fetch := func(missingEventID string) {
|
||||
var h *gomatrixserverlib.HeaderedEvent
|
||||
var h *gomatrixserverlib.Event
|
||||
h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
|
||||
switch err.(type) {
|
||||
case verifySigError:
|
||||
|
@ -759,7 +767,7 @@ func (t *missingStateReq) createRespStateFromStateIDs(
|
|||
logrus.Tracef("Missing state event in createRespStateFromStateIDs: %s", stateIDs.StateEventIDs[i])
|
||||
continue
|
||||
}
|
||||
respState.StateEvents = append(respState.StateEvents, ev.Unwrap())
|
||||
respState.StateEvents = append(respState.StateEvents, ev)
|
||||
}
|
||||
for i := range stateIDs.AuthEventIDs {
|
||||
ev, ok := t.haveEvents[stateIDs.AuthEventIDs[i]]
|
||||
|
@ -767,7 +775,7 @@ func (t *missingStateReq) createRespStateFromStateIDs(
|
|||
logrus.Tracef("Missing auth event in createRespStateFromStateIDs: %s", stateIDs.AuthEventIDs[i])
|
||||
continue
|
||||
}
|
||||
respState.AuthEvents = append(respState.AuthEvents, ev.Unwrap())
|
||||
respState.AuthEvents = append(respState.AuthEvents, ev)
|
||||
}
|
||||
// We purposefully do not do auth checks on the returned events, as they will still
|
||||
// be processed in the exact same way, just as a 'rejected' event
|
||||
|
@ -775,17 +783,14 @@ func (t *missingStateReq) createRespStateFromStateIDs(
|
|||
return &respState, nil
|
||||
}
|
||||
|
||||
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, _, missingEventID string, localFirst bool) (*gomatrixserverlib.Event, error) {
|
||||
if localFirst {
|
||||
// fetch from the roomserver
|
||||
queryReq := api.QueryEventsByIDRequest{
|
||||
EventIDs: []string{missingEventID},
|
||||
}
|
||||
var queryRes api.QueryEventsByIDResponse
|
||||
if err := t.queryer.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
|
||||
events, err := t.db.EventsFromIDs(ctx, []string{missingEventID})
|
||||
if err != nil {
|
||||
util.GetLogger(ctx).Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
|
||||
} else if len(queryRes.Events) == 1 {
|
||||
return queryRes.Events[0], nil
|
||||
} else if len(events) == 1 {
|
||||
return events[0].Event, nil
|
||||
}
|
||||
}
|
||||
var event *gomatrixserverlib.Event
|
||||
|
@ -822,7 +827,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
|
|||
util.GetLogger(ctx).WithError(err).Warnf("Couldn't validate signature of event %q from /event", event.EventID())
|
||||
return nil, verifySigError{event.EventID(), err}
|
||||
}
|
||||
return t.cacheAndReturn(event.Headered(roomVersion)), nil
|
||||
return t.cacheAndReturn(event), nil
|
||||
}
|
||||
|
||||
func checkAllowedByState(e *gomatrixserverlib.Event, stateEvents []*gomatrixserverlib.Event) error {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue