mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-30 04:52:46 +00:00
Fetch missing auth events, implement QueryMissingAuthPrevEvents, try other servers in room for /event and /get_missing_events (#1450)
* Try to ask other servers in the room for missing events if the origin won't provide them * Logging * More logging * Implement QueryMissingAuthPrevEvents * Try to get missing auth events badly * Use processEvent * Logging * Update QueryMissingAuthPrevEvents * Try to find missing auth events * Patchy fix for test * Logging tweaks * Send auth events as outliers * Update check in QueryMissingAuthPrevEvents * Error responses * More return codes * Don't return error on reject/soft-fail since it was ultimately handled * More tweaks * More error tweaks
This commit is contained in:
parent
4ff7ac7b65
commit
738b829a23
12 changed files with 291 additions and 81 deletions
|
@ -206,10 +206,10 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res
|
|||
return nil, &jsonErr
|
||||
} else {
|
||||
// Auth errors mean the event is 'rejected' which have to be silent to appease sytest
|
||||
errMsg := ""
|
||||
_, rejected := err.(*gomatrixserverlib.NotAllowed)
|
||||
errMsg := err.Error()
|
||||
if rejected {
|
||||
errMsg = ""
|
||||
if !rejected {
|
||||
errMsg = err.Error()
|
||||
}
|
||||
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
|
||||
"Failed to process incoming federation event, skipping",
|
||||
|
@ -345,17 +345,17 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
|
|||
}
|
||||
|
||||
func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, isInboundTxn bool) error {
|
||||
prevEventIDs := e.PrevEventIDs()
|
||||
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
|
||||
|
||||
// Fetch the state needed to authenticate the event.
|
||||
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
|
||||
stateReq := api.QueryStateAfterEventsRequest{
|
||||
// Work out if the roomserver knows everything it needs to know to auth
|
||||
// the event.
|
||||
stateReq := api.QueryMissingAuthPrevEventsRequest{
|
||||
RoomID: e.RoomID(),
|
||||
PrevEventIDs: prevEventIDs,
|
||||
StateToFetch: needed.Tuples(),
|
||||
AuthEventIDs: e.AuthEventIDs(),
|
||||
PrevEventIDs: e.PrevEventIDs(),
|
||||
}
|
||||
var stateResp api.QueryStateAfterEventsResponse
|
||||
if err := t.rsAPI.QueryStateAfterEvents(ctx, &stateReq, &stateResp); err != nil {
|
||||
var stateResp api.QueryMissingAuthPrevEventsResponse
|
||||
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -369,7 +369,53 @@ func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, is
|
|||
return roomNotFoundError{e.RoomID()}
|
||||
}
|
||||
|
||||
if !stateResp.PrevEventsExist {
|
||||
if len(stateResp.MissingAuthEventIDs) > 0 {
|
||||
logger.Infof("Event refers to %d unknown auth_events", len(stateResp.MissingAuthEventIDs))
|
||||
|
||||
servers := []gomatrixserverlib.ServerName{t.Origin}
|
||||
serverReq := &api.QueryServerJoinedToRoomRequest{
|
||||
RoomID: e.RoomID(),
|
||||
}
|
||||
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
||||
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
||||
servers = append(servers, serverRes.ServerNames...)
|
||||
logger.Infof("Found %d server(s) to query for missing events", len(servers))
|
||||
}
|
||||
|
||||
getAuthEvent:
|
||||
for _, missingAuthEventID := range stateResp.MissingAuthEventIDs {
|
||||
for _, server := range servers {
|
||||
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
|
||||
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
|
||||
if err != nil {
|
||||
continue // try the next server
|
||||
}
|
||||
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(tx.PDUs[0], stateResp.RoomVersion)
|
||||
if err != nil {
|
||||
logger.WithError(err).Errorf("Failed to unmarshal auth event %q", missingAuthEventID)
|
||||
continue // try the next server
|
||||
}
|
||||
if err = api.SendInputRoomEvents(
|
||||
context.Background(),
|
||||
t.rsAPI,
|
||||
[]api.InputRoomEvent{
|
||||
{
|
||||
Kind: api.KindOutlier,
|
||||
Event: ev.Headered(stateResp.RoomVersion),
|
||||
AuthEventIDs: ev.AuthEventIDs(),
|
||||
SendAsServer: api.DoNotSendToOtherServers,
|
||||
},
|
||||
},
|
||||
); err != nil {
|
||||
logger.WithError(err).Errorf("Failed to send auth event %q to roomserver", missingAuthEventID)
|
||||
continue getAuthEvent // move onto the next event
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(stateResp.MissingPrevEventIDs) > 0 {
|
||||
logger.Infof("Event refers to %d unknown prev_events", len(stateResp.MissingPrevEventIDs))
|
||||
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion, isInboundTxn)
|
||||
}
|
||||
|
||||
|
@ -611,6 +657,7 @@ retryAllowedState:
|
|||
// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
|
||||
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
|
||||
// This means that we may recursively call this function, as we spider back up prev_events to the min depth.
|
||||
// nolint:gocyclo
|
||||
func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
|
||||
if !isInboundTxn {
|
||||
// we've recursed here, so just take a state snapshot please!
|
||||
|
@ -637,15 +684,46 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
|||
if minDepth < 0 {
|
||||
minDepth = 0
|
||||
}
|
||||
missingResp, err := t.federation.LookupMissingEvents(ctx, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
|
||||
Limit: 20,
|
||||
// synapse uses the min depth they've ever seen in that room
|
||||
MinDepth: minDepth,
|
||||
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
|
||||
EarliestEvents: latestEvents,
|
||||
// The event IDs to retrieve the previous events for.
|
||||
LatestEvents: []string{e.EventID()},
|
||||
}, roomVersion)
|
||||
|
||||
servers := []gomatrixserverlib.ServerName{t.Origin}
|
||||
serverReq := &api.QueryServerJoinedToRoomRequest{
|
||||
RoomID: e.RoomID(),
|
||||
}
|
||||
serverRes := &api.QueryServerJoinedToRoomResponse{}
|
||||
if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
|
||||
servers = append(servers, serverRes.ServerNames...)
|
||||
logger.Infof("Found %d server(s) to query for missing events", len(servers))
|
||||
}
|
||||
|
||||
var missingResp *gomatrixserverlib.RespMissingEvents
|
||||
for _, server := range servers {
|
||||
var m gomatrixserverlib.RespMissingEvents
|
||||
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
|
||||
Limit: 20,
|
||||
// synapse uses the min depth they've ever seen in that room
|
||||
MinDepth: minDepth,
|
||||
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
|
||||
EarliestEvents: latestEvents,
|
||||
// The event IDs to retrieve the previous events for.
|
||||
LatestEvents: []string{e.EventID()},
|
||||
}, roomVersion); err == nil {
|
||||
missingResp = &m
|
||||
break
|
||||
} else {
|
||||
logger.WithError(err).Errorf("%s pushed us an event but %q did not respond to /get_missing_events", t.Origin, server)
|
||||
}
|
||||
}
|
||||
|
||||
if missingResp == nil {
|
||||
logger.WithError(err).Errorf(
|
||||
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||
t.Origin, len(servers),
|
||||
)
|
||||
return nil, missingPrevEventsError{
|
||||
eventID: e.EventID(),
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
|
||||
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
|
||||
// There's 2 scenarios to consider:
|
||||
|
@ -658,16 +736,6 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event
|
|||
// https://github.com/matrix-org/synapse/pull/3456
|
||||
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
|
||||
// For now, we do not allow Case B, so reject the event.
|
||||
if err != nil {
|
||||
logger.WithError(err).Errorf(
|
||||
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can",
|
||||
t.Origin,
|
||||
)
|
||||
return nil, missingPrevEventsError{
|
||||
eventID: e.EventID(),
|
||||
err: err,
|
||||
}
|
||||
}
|
||||
logger.Infof("get_missing_events returned %d events", len(missingResp.Events))
|
||||
|
||||
// topologically sort and sanity check that we are making forward progress
|
||||
|
|
|
@ -77,10 +77,11 @@ func (p *testEDUProducer) InputSendToDeviceEvent(
|
|||
}
|
||||
|
||||
type testRoomserverAPI struct {
|
||||
inputRoomEvents []api.InputRoomEvent
|
||||
queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse
|
||||
queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse
|
||||
queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse
|
||||
inputRoomEvents []api.InputRoomEvent
|
||||
queryMissingAuthPrevEvents func(*api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse
|
||||
queryStateAfterEvents func(*api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse
|
||||
queryEventsByID func(req *api.QueryEventsByIDRequest) api.QueryEventsByIDResponse
|
||||
queryLatestEventsAndState func(*api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse
|
||||
}
|
||||
|
||||
func (t *testRoomserverAPI) SetFederationSenderAPI(fsAPI fsAPI.FederationSenderInternalAPI) {}
|
||||
|
@ -162,6 +163,20 @@ func (t *testRoomserverAPI) QueryStateAfterEvents(
|
|||
return nil
|
||||
}
|
||||
|
||||
// Query the state after a list of events in a room from the room server.
|
||||
func (t *testRoomserverAPI) QueryMissingAuthPrevEvents(
|
||||
ctx context.Context,
|
||||
request *api.QueryMissingAuthPrevEventsRequest,
|
||||
response *api.QueryMissingAuthPrevEventsResponse,
|
||||
) error {
|
||||
response.RoomVersion = testRoomVersion
|
||||
res := t.queryMissingAuthPrevEvents(request)
|
||||
response.RoomExists = res.RoomExists
|
||||
response.MissingAuthEventIDs = res.MissingAuthEventIDs
|
||||
response.MissingPrevEventIDs = res.MissingPrevEventIDs
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query a list of events by event ID.
|
||||
func (t *testRoomserverAPI) QueryEventsByID(
|
||||
ctx context.Context,
|
||||
|
@ -453,11 +468,11 @@ func assertInputRoomEvents(t *testing.T, got []api.InputRoomEvent, want []gomatr
|
|||
// to the roomserver. It's the most basic test possible.
|
||||
func TestBasicTransaction(t *testing.T) {
|
||||
rsAPI := &testRoomserverAPI{
|
||||
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
|
||||
return api.QueryStateAfterEventsResponse{
|
||||
PrevEventsExist: true,
|
||||
RoomExists: true,
|
||||
StateEvents: fromStateTuples(req.StateToFetch, nil),
|
||||
queryMissingAuthPrevEvents: func(req *api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse {
|
||||
return api.QueryMissingAuthPrevEventsResponse{
|
||||
RoomExists: true,
|
||||
MissingAuthEventIDs: []string{},
|
||||
MissingPrevEventIDs: []string{},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -473,14 +488,11 @@ func TestBasicTransaction(t *testing.T) {
|
|||
// as it does the auth check.
|
||||
func TestTransactionFailAuthChecks(t *testing.T) {
|
||||
rsAPI := &testRoomserverAPI{
|
||||
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
|
||||
return api.QueryStateAfterEventsResponse{
|
||||
PrevEventsExist: true,
|
||||
RoomExists: true,
|
||||
// omit the create event so auth checks fail
|
||||
StateEvents: fromStateTuples(req.StateToFetch, []gomatrixserverlib.StateKeyTuple{
|
||||
{EventType: gomatrixserverlib.MRoomCreate, StateKey: ""},
|
||||
}),
|
||||
queryMissingAuthPrevEvents: func(req *api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse {
|
||||
return api.QueryMissingAuthPrevEventsResponse{
|
||||
RoomExists: true,
|
||||
MissingAuthEventIDs: []string{"create_event"},
|
||||
MissingPrevEventIDs: []string{},
|
||||
}
|
||||
},
|
||||
}
|
||||
|
@ -504,28 +516,24 @@ func TestTransactionFetchMissingPrevEvents(t *testing.T) {
|
|||
|
||||
var rsAPI *testRoomserverAPI // ref here so we can refer to inputRoomEvents inside these functions
|
||||
rsAPI = &testRoomserverAPI{
|
||||
queryStateAfterEvents: func(req *api.QueryStateAfterEventsRequest) api.QueryStateAfterEventsResponse {
|
||||
// we expect this to be called three times:
|
||||
// - first with input event to realise there's a gap
|
||||
// - second with the prevEvent to realise there is no gap
|
||||
// - third with the input event to realise there is no longer a gap
|
||||
prevEventsExist := false
|
||||
queryMissingAuthPrevEvents: func(req *api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse {
|
||||
missingPrevEvent := []string{"missing_prev_event"}
|
||||
if len(req.PrevEventIDs) == 1 {
|
||||
switch req.PrevEventIDs[0] {
|
||||
case haveEvent.EventID():
|
||||
prevEventsExist = true
|
||||
missingPrevEvent = []string{}
|
||||
case prevEvent.EventID():
|
||||
// we only have this event if we've been send prevEvent
|
||||
if len(rsAPI.inputRoomEvents) == 1 && rsAPI.inputRoomEvents[0].Event.EventID() == prevEvent.EventID() {
|
||||
prevEventsExist = true
|
||||
missingPrevEvent = []string{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return api.QueryStateAfterEventsResponse{
|
||||
PrevEventsExist: prevEventsExist,
|
||||
RoomExists: true,
|
||||
StateEvents: fromStateTuples(req.StateToFetch, nil),
|
||||
return api.QueryMissingAuthPrevEventsResponse{
|
||||
RoomExists: true,
|
||||
MissingAuthEventIDs: []string{},
|
||||
MissingPrevEventIDs: missingPrevEvent,
|
||||
}
|
||||
},
|
||||
queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse {
|
||||
|
@ -626,6 +634,38 @@ func TestTransactionFetchMissingStateByStateIDs(t *testing.T) {
|
|||
StateEvents: stateEvents,
|
||||
}
|
||||
},
|
||||
|
||||
queryMissingAuthPrevEvents: func(req *api.QueryMissingAuthPrevEventsRequest) api.QueryMissingAuthPrevEventsResponse {
|
||||
askingForEvent := req.PrevEventIDs[0]
|
||||
haveEventB := false
|
||||
haveEventC := false
|
||||
for _, ev := range rsAPI.inputRoomEvents {
|
||||
switch ev.Event.EventID() {
|
||||
case eventB.EventID():
|
||||
haveEventB = true
|
||||
case eventC.EventID():
|
||||
haveEventC = true
|
||||
}
|
||||
}
|
||||
prevEventExists := false
|
||||
if askingForEvent == eventC.EventID() {
|
||||
prevEventExists = haveEventC
|
||||
} else if askingForEvent == eventB.EventID() {
|
||||
prevEventExists = haveEventB
|
||||
}
|
||||
|
||||
var missingPrevEvent []string
|
||||
if !prevEventExists {
|
||||
missingPrevEvent = []string{"test"}
|
||||
}
|
||||
|
||||
return api.QueryMissingAuthPrevEventsResponse{
|
||||
RoomExists: true,
|
||||
MissingAuthEventIDs: []string{},
|
||||
MissingPrevEventIDs: missingPrevEvent,
|
||||
}
|
||||
},
|
||||
|
||||
queryLatestEventsAndState: func(req *api.QueryLatestEventsAndStateRequest) api.QueryLatestEventsAndStateResponse {
|
||||
omitTuples := []gomatrixserverlib.StateKeyTuple{
|
||||
{EventType: gomatrixserverlib.MRoomPowerLevels, StateKey: ""},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue