mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Roomserver fixes (#2133)
* Improve server selection somewhat * Remove things from the map when we're done * Be less panicky about auth event signatures in case they are not fatal after all * Accept HasState in all cases * Send join asynchronously * Revert "Send join asynchronously" This reverts commit 5b685bfcd0b1150a66c7b1e70fb3a3eda509efd1. * Joins and leaves use background context
This commit is contained in:
parent
567fd04428
commit
d21f3eace0
4 changed files with 36 additions and 21 deletions
|
@ -130,7 +130,10 @@ func (r *Inputer) processRoomEvent(
|
||||||
return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(missingRes.MissingAuthEventIDs) > 0 || len(missingRes.MissingPrevEventIDs) > 0 {
|
missingAuth := len(missingRes.MissingAuthEventIDs) > 0
|
||||||
|
missingPrev := !input.HasState && len(missingRes.MissingPrevEventIDs) > 0
|
||||||
|
|
||||||
|
if missingAuth || missingPrev {
|
||||||
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
|
serverReq := &fedapi.QueryJoinedHostServerNamesInRoomRequest{
|
||||||
RoomID: event.RoomID(),
|
RoomID: event.RoomID(),
|
||||||
ExcludeSelf: true,
|
ExcludeSelf: true,
|
||||||
|
@ -138,9 +141,26 @@ func (r *Inputer) processRoomEvent(
|
||||||
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
if err = r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||||
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||||
}
|
}
|
||||||
}
|
// Sort all of the servers into a map so that we can randomise
|
||||||
if input.Origin != "" {
|
// their order. Then make sure that the input origin and the
|
||||||
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
|
// event origin are first on the list.
|
||||||
|
servers := map[gomatrixserverlib.ServerName]struct{}{}
|
||||||
|
for _, server := range serverRes.ServerNames {
|
||||||
|
servers[server] = struct{}{}
|
||||||
|
}
|
||||||
|
serverRes.ServerNames = serverRes.ServerNames[:0]
|
||||||
|
if input.Origin != "" {
|
||||||
|
serverRes.ServerNames = append(serverRes.ServerNames, input.Origin)
|
||||||
|
delete(servers, input.Origin)
|
||||||
|
}
|
||||||
|
if origin := event.Origin(); origin != input.Origin {
|
||||||
|
serverRes.ServerNames = append(serverRes.ServerNames, origin)
|
||||||
|
delete(servers, origin)
|
||||||
|
}
|
||||||
|
for server := range servers {
|
||||||
|
serverRes.ServerNames = append(serverRes.ServerNames, server)
|
||||||
|
delete(servers, server)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// First of all, check that the auth events of the event are known.
|
// First of all, check that the auth events of the event are known.
|
||||||
|
@ -149,7 +169,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
authEvents := gomatrixserverlib.NewAuthEvents(nil)
|
||||||
knownEvents := map[string]*types.Event{}
|
knownEvents := map[string]*types.Event{}
|
||||||
if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
if err = r.fetchAuthEvents(ctx, logger, headered, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||||
return fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
return fmt.Errorf("r.fetchAuthEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the event is allowed by its auth events. If it isn't then
|
// Check if the event is allowed by its auth events. If it isn't then
|
||||||
|
@ -190,7 +210,6 @@ func (r *Inputer) processRoomEvent(
|
||||||
// typical federated room join) then we won't bother trying to fetch prev events
|
// typical federated room join) then we won't bother trying to fetch prev events
|
||||||
// because we may not be allowed to see them and we have no choice but to trust
|
// because we may not be allowed to see them and we have no choice but to trust
|
||||||
// the state event IDs provided to us in the join instead.
|
// the state event IDs provided to us in the join instead.
|
||||||
missingPrev := !input.HasState && len(missingRes.MissingPrevEventIDs) > 0
|
|
||||||
if missingPrev && input.Kind == api.KindNew {
|
if missingPrev && input.Kind == api.KindNew {
|
||||||
// Don't do this for KindOld events, otherwise old events that we fetch
|
// Don't do this for KindOld events, otherwise old events that we fetch
|
||||||
// to satisfy missing prev events/state will end up recursively calling
|
// to satisfy missing prev events/state will end up recursively calling
|
||||||
|
@ -204,13 +223,10 @@ func (r *Inputer) processRoomEvent(
|
||||||
federation: r.FSAPI,
|
federation: r.FSAPI,
|
||||||
keys: r.KeyRing,
|
keys: r.KeyRing,
|
||||||
roomsMu: internal.NewMutexByRoom(),
|
roomsMu: internal.NewMutexByRoom(),
|
||||||
servers: map[gomatrixserverlib.ServerName]struct{}{},
|
servers: serverRes.ServerNames,
|
||||||
hadEvents: map[string]bool{},
|
hadEvents: map[string]bool{},
|
||||||
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
|
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
|
||||||
}
|
}
|
||||||
for _, serverName := range serverRes.ServerNames {
|
|
||||||
missingState.servers[serverName] = struct{}{}
|
|
||||||
}
|
|
||||||
if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
|
if err = missingState.processEventWithMissingState(ctx, event, headered.RoomVersion); err != nil {
|
||||||
isRejected = true
|
isRejected = true
|
||||||
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
|
rejectionErr = fmt.Errorf("missingState.processEventWithMissingState: %w", err)
|
||||||
|
@ -399,12 +415,11 @@ func (r *Inputer) fetchAuthEvents(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the signatures of the event.
|
// Check the signatures of the event. If this fails then we'll simply
|
||||||
// TODO: It really makes sense for the federation API to be doing this,
|
// skip it, because gomatrixserverlib.Allowed() will notice a problem
|
||||||
// because then it can attempt another server if one serves up an event
|
// if a critical event is missing anyway.
|
||||||
// with an invalid signature. For now this will do.
|
|
||||||
if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
|
if err := authEvent.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
|
||||||
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// In order to store the new auth event, we need to know its auth chain
|
// In order to store the new auth event, we need to know its auth chain
|
||||||
|
@ -457,7 +472,7 @@ func (r *Inputer) calculateAndSetState(
|
||||||
var err error
|
var err error
|
||||||
roomState := state.NewStateResolution(r.DB, roomInfo)
|
roomState := state.NewStateResolution(r.DB, roomInfo)
|
||||||
|
|
||||||
if input.HasState && !isRejected {
|
if input.HasState {
|
||||||
// Check here if we think we're in the room already.
|
// Check here if we think we're in the room already.
|
||||||
stateAtEvent.Overwrite = true
|
stateAtEvent.Overwrite = true
|
||||||
var joinEventNIDs []types.EventNID
|
var joinEventNIDs []types.EventNID
|
||||||
|
|
|
@ -25,7 +25,7 @@ type missingStateReq struct {
|
||||||
keys gomatrixserverlib.JSONVerifier
|
keys gomatrixserverlib.JSONVerifier
|
||||||
federation fedapi.FederationInternalAPI
|
federation fedapi.FederationInternalAPI
|
||||||
roomsMu *internal.MutexByRoom
|
roomsMu *internal.MutexByRoom
|
||||||
servers map[gomatrixserverlib.ServerName]struct{}
|
servers []gomatrixserverlib.ServerName
|
||||||
hadEvents map[string]bool
|
hadEvents map[string]bool
|
||||||
hadEventsMutex sync.Mutex
|
hadEventsMutex sync.Mutex
|
||||||
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
haveEvents map[string]*gomatrixserverlib.HeaderedEvent
|
||||||
|
@ -417,7 +417,7 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e *gomatrixserve
|
||||||
}
|
}
|
||||||
|
|
||||||
var missingResp *gomatrixserverlib.RespMissingEvents
|
var missingResp *gomatrixserverlib.RespMissingEvents
|
||||||
for server := range t.servers {
|
for _, server := range t.servers {
|
||||||
var m gomatrixserverlib.RespMissingEvents
|
var m gomatrixserverlib.RespMissingEvents
|
||||||
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
|
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
|
||||||
Limit: 20,
|
Limit: 20,
|
||||||
|
@ -700,7 +700,7 @@ func (t *missingStateReq) lookupEvent(ctx context.Context, roomVersion gomatrixs
|
||||||
}
|
}
|
||||||
var event *gomatrixserverlib.Event
|
var event *gomatrixserverlib.Event
|
||||||
found := false
|
found := false
|
||||||
for serverName := range t.servers {
|
for _, serverName := range t.servers {
|
||||||
reqctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
reqctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
txn, err := t.federation.GetEvent(reqctx, serverName, missingEventID)
|
txn, err := t.federation.GetEvent(reqctx, serverName, missingEventID)
|
||||||
|
|
|
@ -51,7 +51,7 @@ func (r *Joiner) PerformJoin(
|
||||||
req *rsAPI.PerformJoinRequest,
|
req *rsAPI.PerformJoinRequest,
|
||||||
res *rsAPI.PerformJoinResponse,
|
res *rsAPI.PerformJoinResponse,
|
||||||
) {
|
) {
|
||||||
roomID, joinedVia, err := r.performJoin(ctx, req)
|
roomID, joinedVia, err := r.performJoin(context.Background(), req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithContext(ctx).WithFields(logrus.Fields{
|
logrus.WithContext(ctx).WithFields(logrus.Fields{
|
||||||
"room_id": req.RoomIDOrAlias,
|
"room_id": req.RoomIDOrAlias,
|
||||||
|
|
|
@ -52,7 +52,7 @@ func (r *Leaver) PerformLeave(
|
||||||
return nil, fmt.Errorf("user %q does not belong to this homeserver", req.UserID)
|
return nil, fmt.Errorf("user %q does not belong to this homeserver", req.UserID)
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(req.RoomID, "!") {
|
if strings.HasPrefix(req.RoomID, "!") {
|
||||||
output, err := r.performLeaveRoomByID(ctx, req, res)
|
output, err := r.performLeaveRoomByID(context.Background(), req, res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithContext(ctx).WithFields(logrus.Fields{
|
logrus.WithContext(ctx).WithFields(logrus.Fields{
|
||||||
"room_id": req.RoomID,
|
"room_id": req.RoomID,
|
||||||
|
|
Loading…
Reference in a new issue