mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Synchronous invites (#1273)
* Refactor invites to be synchronous * Fix synchronous invites * Fix client API return type for send invite error * Linter * Restore PerformError on rsAPI.PerformInvite * Update sytest-whitelist * Don't override PerformError with normal errors * Fix error passing * Un-whitelist a couple of tests * Update sytest-whitelist * Try to handle multiple invite rejections better * nolint * Update gomatrixserverlib * Fix /v1/invite test * Remove replace from go.mod
This commit is contained in:
parent
6820b3e024
commit
6cb1a65809
23 changed files with 334 additions and 443 deletions
|
@ -36,6 +36,12 @@ type FederationSenderInternalAPI interface {
|
|||
request *PerformLeaveRequest,
|
||||
response *PerformLeaveResponse,
|
||||
) error
|
||||
// Handle sending an invite to a remote server.
|
||||
PerformInvite(
|
||||
ctx context.Context,
|
||||
request *PerformInviteRequest,
|
||||
response *PerformInviteResponse,
|
||||
) error
|
||||
// Notifies the federation sender that these servers may be online and to retry sending messages.
|
||||
PerformServersAlive(
|
||||
ctx context.Context,
|
||||
|
@ -81,6 +87,16 @@ type PerformLeaveRequest struct {
|
|||
type PerformLeaveResponse struct {
|
||||
}
|
||||
|
||||
type PerformInviteRequest struct {
|
||||
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
|
||||
Event gomatrixserverlib.HeaderedEvent `json:"event"`
|
||||
InviteRoomState []gomatrixserverlib.InviteV2StrippedState `json:"invite_room_state"`
|
||||
}
|
||||
|
||||
type PerformInviteResponse struct {
|
||||
Event gomatrixserverlib.HeaderedEvent `json:"event"`
|
||||
}
|
||||
|
||||
type PerformServersAliveRequest struct {
|
||||
Servers []gomatrixserverlib.ServerName
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
|
||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||
|
@ -97,22 +96,6 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
|
|||
}).Panicf("roomserver output log: write room event failure")
|
||||
return nil
|
||||
}
|
||||
case api.OutputTypeNewInviteEvent:
|
||||
ev := &output.NewInviteEvent.Event
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_id": ev.RoomID(),
|
||||
"state_key": ev.StateKey(),
|
||||
}).Info("received invite event from roomserver")
|
||||
|
||||
if err := s.processInvite(*output.NewInviteEvent); err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write invite event failure")
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
log.WithField("type", output.Type).Debug(
|
||||
"roomserver output log: ignoring unknown output type",
|
||||
|
@ -172,51 +155,6 @@ func (s *OutputRoomEventConsumer) processMessage(ore api.OutputNewRoomEvent) err
|
|||
)
|
||||
}
|
||||
|
||||
// processInvite handles an invite event for sending over federation.
|
||||
func (s *OutputRoomEventConsumer) processInvite(oie api.OutputNewInviteEvent) error {
|
||||
// Don't try to reflect and resend invites that didn't originate from us.
|
||||
if s.cfg.Matrix.ServerName != oie.Event.Origin() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ignore invites that don't have state keys - they are invalid.
|
||||
if oie.Event.StateKey() == nil {
|
||||
return fmt.Errorf("event %q doesn't have state key", oie.Event.EventID())
|
||||
}
|
||||
|
||||
// Don't try to handle events that are actually destined for us.
|
||||
stateKey := *oie.Event.StateKey()
|
||||
_, destination, err := gomatrixserverlib.SplitID('@', stateKey)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": oie.Event.EventID(),
|
||||
"state_key": stateKey,
|
||||
}).Info("failed to split destination from state key")
|
||||
return nil
|
||||
}
|
||||
if s.cfg.Matrix.ServerName == destination {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Try to extract the room invite state. The roomserver will have stashed
|
||||
// this for us in invite_room_state if it didn't already exist.
|
||||
strippedState := []gomatrixserverlib.InviteV2StrippedState{}
|
||||
if inviteRoomState := gjson.GetBytes(oie.Event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
|
||||
if err = json.Unmarshal([]byte(inviteRoomState.Raw), &strippedState); err != nil {
|
||||
log.WithError(err).Warn("failed to extract invite_room_state from event unsigned")
|
||||
}
|
||||
}
|
||||
|
||||
// Build the invite request with the info we've got.
|
||||
inviteReq, err := gomatrixserverlib.NewInviteV2Request(&oie.Event, strippedState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
|
||||
}
|
||||
|
||||
// Send the event.
|
||||
return s.queues.SendInvite(&inviteReq)
|
||||
}
|
||||
|
||||
// joinedHostsAtEvent works out a list of matrix servers that were joined to
|
||||
// the room at the event.
|
||||
// It is important to use the state at the event for sending messages because:
|
||||
|
|
|
@ -296,6 +296,43 @@ func (r *FederationSenderInternalAPI) PerformLeave(
|
|||
)
|
||||
}
|
||||
|
||||
// PerformLeaveRequest implements api.FederationSenderInternalAPI
|
||||
func (r *FederationSenderInternalAPI) PerformInvite(
|
||||
ctx context.Context,
|
||||
request *api.PerformInviteRequest,
|
||||
response *api.PerformInviteResponse,
|
||||
) (err error) {
|
||||
if request.Event.StateKey() == nil {
|
||||
return errors.New("invite must be a state event")
|
||||
}
|
||||
|
||||
_, destination, err := gomatrixserverlib.SplitID('@', *request.Event.StateKey())
|
||||
if err != nil {
|
||||
return fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
|
||||
}
|
||||
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"event_id": request.Event.EventID(),
|
||||
"user_id": *request.Event.StateKey(),
|
||||
"room_id": request.Event.RoomID(),
|
||||
"room_version": request.RoomVersion,
|
||||
"destination": destination,
|
||||
}).Info("Sending invite")
|
||||
|
||||
inviteReq, err := gomatrixserverlib.NewInviteV2Request(&request.Event, request.InviteRoomState)
|
||||
if err != nil {
|
||||
return fmt.Errorf("gomatrixserverlib.NewInviteV2Request: %w", err)
|
||||
}
|
||||
|
||||
inviteRes, err := r.federation.SendInviteV2(ctx, destination, inviteReq)
|
||||
if err != nil {
|
||||
return fmt.Errorf("r.federation.SendInviteV2: %w", err)
|
||||
}
|
||||
|
||||
response.Event = inviteRes.Event.Headered(request.RoomVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PerformServersAlive implements api.FederationSenderInternalAPI
|
||||
func (r *FederationSenderInternalAPI) PerformServersAlive(
|
||||
ctx context.Context,
|
||||
|
|
|
@ -18,6 +18,7 @@ const (
|
|||
FederationSenderPerformDirectoryLookupRequestPath = "/federationsender/performDirectoryLookup"
|
||||
FederationSenderPerformJoinRequestPath = "/federationsender/performJoinRequest"
|
||||
FederationSenderPerformLeaveRequestPath = "/federationsender/performLeaveRequest"
|
||||
FederationSenderPerformInviteRequestPath = "/federationsender/performInviteRequest"
|
||||
FederationSenderPerformServersAlivePath = "/federationsender/performServersAlive"
|
||||
FederationSenderPerformBroadcastEDUPath = "/federationsender/performBroadcastEDU"
|
||||
)
|
||||
|
@ -49,6 +50,19 @@ func (h *httpFederationSenderInternalAPI) PerformLeave(
|
|||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
// Handle sending an invite to a remote server.
|
||||
func (h *httpFederationSenderInternalAPI) PerformInvite(
|
||||
ctx context.Context,
|
||||
request *api.PerformInviteRequest,
|
||||
response *api.PerformInviteResponse,
|
||||
) error {
|
||||
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformInviteRequest")
|
||||
defer span.Finish()
|
||||
|
||||
apiURL := h.federationSenderURL + FederationSenderPerformInviteRequestPath
|
||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||
}
|
||||
|
||||
func (h *httpFederationSenderInternalAPI) PerformServersAlive(
|
||||
ctx context.Context,
|
||||
request *api.PerformServersAliveRequest,
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
)
|
||||
|
||||
// AddRoutes adds the FederationSenderInternalAPI handlers to the http.ServeMux.
|
||||
// nolint:gocyclo
|
||||
func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Router) {
|
||||
internalAPIMux.Handle(
|
||||
FederationSenderQueryJoinedHostServerNamesInRoomPath,
|
||||
|
@ -52,6 +53,20 @@ func AddRoutes(intAPI api.FederationSenderInternalAPI, internalAPIMux *mux.Route
|
|||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(
|
||||
FederationSenderPerformInviteRequestPath,
|
||||
httputil.MakeInternalAPI("PerformInviteRequest", func(req *http.Request) util.JSONResponse {
|
||||
var request api.PerformInviteRequest
|
||||
var response api.PerformInviteResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
||||
}
|
||||
if err := intAPI.PerformInvite(req.Context(), &request, &response); err != nil {
|
||||
return util.ErrorResponse(err)
|
||||
}
|
||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||
}),
|
||||
)
|
||||
internalAPIMux.Handle(
|
||||
FederationSenderPerformDirectoryLookupRequestPath,
|
||||
httputil.MakeInternalAPI("PerformDirectoryLookupRequest", func(req *http.Request) util.JSONResponse {
|
||||
|
|
|
@ -46,20 +46,18 @@ type destinationQueue struct {
|
|||
db storage.Database
|
||||
signing *SigningInfo
|
||||
rsAPI api.RoomserverInternalAPI
|
||||
client *gomatrixserverlib.FederationClient // federation client
|
||||
origin gomatrixserverlib.ServerName // origin of requests
|
||||
destination gomatrixserverlib.ServerName // destination of requests
|
||||
running atomic.Bool // is the queue worker running?
|
||||
backingOff atomic.Bool // true if we're backing off
|
||||
statistics *statistics.ServerStatistics // statistics about this remote server
|
||||
incomingInvites chan *gomatrixserverlib.InviteV2Request // invites to send
|
||||
transactionIDMutex sync.Mutex // protects transactionID
|
||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||
pendingInvites []*gomatrixserverlib.InviteV2Request // owned by backgroundSend
|
||||
notifyPDUs chan bool // interrupts idle wait for PDUs
|
||||
notifyEDUs chan bool // interrupts idle wait for EDUs
|
||||
interruptBackoff chan bool // interrupts backoff
|
||||
client *gomatrixserverlib.FederationClient // federation client
|
||||
origin gomatrixserverlib.ServerName // origin of requests
|
||||
destination gomatrixserverlib.ServerName // destination of requests
|
||||
running atomic.Bool // is the queue worker running?
|
||||
backingOff atomic.Bool // true if we're backing off
|
||||
statistics *statistics.ServerStatistics // statistics about this remote server
|
||||
transactionIDMutex sync.Mutex // protects transactionID
|
||||
transactionID gomatrixserverlib.TransactionID // last transaction ID
|
||||
transactionCount atomic.Int32 // how many events in this transaction so far
|
||||
notifyPDUs chan bool // interrupts idle wait for PDUs
|
||||
notifyEDUs chan bool // interrupts idle wait for EDUs
|
||||
interruptBackoff chan bool // interrupts backoff
|
||||
}
|
||||
|
||||
// Send event adds the event to the pending queue for the destination.
|
||||
|
@ -138,18 +136,6 @@ func (oq *destinationQueue) sendEDU(receipt *shared.Receipt) {
|
|||
}
|
||||
}
|
||||
|
||||
// sendInvite adds the invite event to the pending queue for the
|
||||
// destination. If the queue is empty then it starts a background
|
||||
// goroutine to start sending events to that destination.
|
||||
func (oq *destinationQueue) sendInvite(ev *gomatrixserverlib.InviteV2Request) {
|
||||
if oq.statistics.Blacklisted() {
|
||||
// If the destination is blacklisted then drop the event.
|
||||
return
|
||||
}
|
||||
oq.wakeQueueIfNeeded()
|
||||
oq.incomingInvites <- ev
|
||||
}
|
||||
|
||||
// wakeQueueIfNeeded will wake up the destination queue if it is
|
||||
// not already running. If it is running but it is backing off
|
||||
// then we will interrupt the backoff, causing any federation
|
||||
|
@ -234,23 +220,6 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// We were woken up because there are new PDUs waiting in the
|
||||
// database.
|
||||
pendingEDUs = true
|
||||
case invite := <-oq.incomingInvites:
|
||||
// There's no strict ordering requirement for invites like
|
||||
// there is for transactions, so we put the invite onto the
|
||||
// front of the queue. This means that if an invite that is
|
||||
// stuck failing already, that it won't block our new invite
|
||||
// from being sent.
|
||||
oq.pendingInvites = append(
|
||||
[]*gomatrixserverlib.InviteV2Request{invite},
|
||||
oq.pendingInvites...,
|
||||
)
|
||||
// If there are any more things waiting in the channel queue
|
||||
// then read them. This is safe because we guarantee only
|
||||
// having one goroutine per destination queue, so the channel
|
||||
// isn't being consumed anywhere else.
|
||||
for len(oq.incomingInvites) > 0 {
|
||||
oq.pendingInvites = append(oq.pendingInvites, <-oq.incomingInvites)
|
||||
}
|
||||
case <-time.After(queueIdleTimeout):
|
||||
// The worker is idle so stop the goroutine. It'll get
|
||||
// restarted automatically the next time we have an event to
|
||||
|
@ -266,7 +235,6 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// It's been suggested that we should give up because the backoff
|
||||
// has exceeded a maximum allowable value. Clean up the in-memory
|
||||
// buffers at this point. The PDU clean-up is already on a defer.
|
||||
oq.cleanPendingInvites()
|
||||
log.Warnf("Blacklisting %q due to exceeding backoff threshold", oq.destination)
|
||||
return
|
||||
}
|
||||
|
@ -284,35 +252,9 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
oq.statistics.Success()
|
||||
}
|
||||
}
|
||||
|
||||
// Try sending the next invite and see what happens.
|
||||
if len(oq.pendingInvites) > 0 {
|
||||
sent, ierr := oq.nextInvites(oq.pendingInvites)
|
||||
if ierr != nil {
|
||||
// We failed to send the transaction. Mark it as a failure.
|
||||
oq.statistics.Failure()
|
||||
} else if sent > 0 {
|
||||
// If we successfully sent the invites then clear out
|
||||
// the pending invites.
|
||||
oq.statistics.Success()
|
||||
// Reallocate so that the underlying array can be GC'd, as
|
||||
// opposed to growing forever.
|
||||
oq.cleanPendingInvites()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanPendingInvites cleans out the pending invite buffer,
|
||||
// removing all references so that the underlying objects can
|
||||
// be GC'd.
|
||||
func (oq *destinationQueue) cleanPendingInvites() {
|
||||
for i := 0; i < len(oq.pendingInvites); i++ {
|
||||
oq.pendingInvites[i] = nil
|
||||
}
|
||||
oq.pendingInvites = []*gomatrixserverlib.InviteV2Request{}
|
||||
}
|
||||
|
||||
// nextTransaction creates a new transaction from the pending event
|
||||
// queue and sends it. Returns true if a transaction was sent or
|
||||
// false otherwise.
|
||||
|
@ -427,66 +369,3 @@ func (oq *destinationQueue) nextTransaction() (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
// nextInvite takes pending invite events from the queue and sends
|
||||
// them. Returns true if a transaction was sent or false otherwise.
|
||||
func (oq *destinationQueue) nextInvites(
|
||||
pendingInvites []*gomatrixserverlib.InviteV2Request,
|
||||
) (int, error) {
|
||||
done := 0
|
||||
for _, inviteReq := range pendingInvites {
|
||||
ev, roomVersion := inviteReq.Event(), inviteReq.RoomVersion()
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"room_version": roomVersion,
|
||||
"destination": oq.destination,
|
||||
}).Info("sending invite")
|
||||
|
||||
inviteRes, err := oq.client.SendInviteV2(
|
||||
context.TODO(),
|
||||
oq.destination,
|
||||
*inviteReq,
|
||||
)
|
||||
switch e := err.(type) {
|
||||
case nil:
|
||||
done++
|
||||
case gomatrix.HTTPError:
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"state_key": ev.StateKey(),
|
||||
"destination": oq.destination,
|
||||
"status_code": e.Code,
|
||||
}).WithError(err).Error("failed to send invite due to HTTP error")
|
||||
// Check whether we should do something about the error or
|
||||
// just accept it as unavoidable.
|
||||
if e.Code >= 400 && e.Code <= 499 {
|
||||
// We tried but the remote side has sent back a client error.
|
||||
// It's no use retrying because it will happen again.
|
||||
done++
|
||||
continue
|
||||
}
|
||||
return done, err
|
||||
default:
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"state_key": ev.StateKey(),
|
||||
"destination": oq.destination,
|
||||
}).WithError(err).Error("failed to send invite")
|
||||
return done, err
|
||||
}
|
||||
|
||||
invEv := inviteRes.Event.Sign(string(oq.signing.ServerName), oq.signing.KeyID, oq.signing.PrivateKey).Headered(roomVersion)
|
||||
_, err = api.SendEvents(context.TODO(), oq.rsAPI, []gomatrixserverlib.HeaderedEvent{invEv}, oq.signing.ServerName, nil)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"state_key": ev.StateKey(),
|
||||
"destination": oq.destination,
|
||||
}).WithError(err).Error("failed to return signed invite to roomserver")
|
||||
return done, err
|
||||
}
|
||||
}
|
||||
|
||||
return done, nil
|
||||
}
|
||||
|
|
|
@ -108,7 +108,6 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
|
|||
destination: destination,
|
||||
client: oqs.client,
|
||||
statistics: oqs.statistics.ForServer(destination),
|
||||
incomingInvites: make(chan *gomatrixserverlib.InviteV2Request, 128),
|
||||
notifyPDUs: make(chan bool, 1),
|
||||
notifyEDUs: make(chan bool, 1),
|
||||
interruptBackoff: make(chan bool),
|
||||
|
@ -178,51 +177,6 @@ func (oqs *OutgoingQueues) SendEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
// SendEvent sends an event to the destinations
|
||||
func (oqs *OutgoingQueues) SendInvite(
|
||||
inviteReq *gomatrixserverlib.InviteV2Request,
|
||||
) error {
|
||||
ev := inviteReq.Event()
|
||||
stateKey := ev.StateKey()
|
||||
if stateKey == nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
}).Info("Invite had no state key, dropping")
|
||||
return nil
|
||||
}
|
||||
|
||||
_, destination, err := gomatrixserverlib.SplitID('@', *stateKey)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"state_key": stateKey,
|
||||
}).Info("Failed to split destination from state key")
|
||||
return nil
|
||||
}
|
||||
|
||||
if stateapi.IsServerBannedFromRoom(
|
||||
context.TODO(),
|
||||
oqs.stateAPI,
|
||||
ev.RoomID(),
|
||||
destination,
|
||||
) {
|
||||
log.WithFields(log.Fields{
|
||||
"room_id": ev.RoomID(),
|
||||
"destination": destination,
|
||||
}).Info("Dropping invite to server which is prohibited by ACLs")
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"server_name": destination,
|
||||
}).Info("Sending invite")
|
||||
|
||||
oqs.getQueue(destination).sendInvite(inviteReq)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendEDU sends an EDU event to the destinations.
|
||||
func (oqs *OutgoingQueues) SendEDU(
|
||||
e *gomatrixserverlib.EDU, origin gomatrixserverlib.ServerName,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue