diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a514127c..7766bfb1 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -556,6 +556,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e }, api.DoNotSendToOtherServers, nil, + api.InputOptionAsync, ) } @@ -598,6 +599,7 @@ withNextEvent: SendAsServer: api.DoNotSendToOtherServers, }, }, + api.InputOptionAsync, ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } @@ -747,6 +749,7 @@ func (t *txnReq) processEventWithMissingState( resolvedState, backwardsExtremity.Headered(roomVersion), t.hadEvents, + api.InputOptionAsync, ) if err != nil { return fmt.Errorf("api.SendEventWithState: %w", err) @@ -767,6 +770,7 @@ func (t *txnReq) processEventWithMissingState( append(headeredNewEvents, e.Headered(roomVersion)), api.DoNotSendToOtherServers, nil, + api.InputOptionAsync, ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 8e6e4ac7..3e8b8662 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -86,6 +86,9 @@ type TransactionID struct { // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { InputRoomEvents []InputRoomEvent `json:"input_room_events"` + // Asynchronous requests will queue up work into the roomserver and + // return straight away, rather than waiting for the results. + Asynchronous bool `json:"async"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index 2ebe2f64..8909a70c 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -22,11 +22,18 @@ import ( "github.com/matrix-org/util" ) +type InputOption int + +const ( + InputOptionAsync InputOption = iota +) + // SendEvents to the roomserver The events are written with KindNew. func SendEvents( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, events []*gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, + options ...InputOption, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { @@ -38,7 +45,7 @@ func SendEvents( TransactionID: txnID, } } - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, options...) } // SendEventWithState writes an event with the specified kind to the roomserver @@ -47,7 +54,7 @@ func SendEvents( func SendEventWithState( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, - haveEventIDs map[string]bool, + haveEventIDs map[string]bool, options ...InputOption, ) error { outliers, err := state.Events() if err != nil { @@ -79,14 +86,25 @@ func SendEventWithState( StateEventIDs: stateEventIDs, }) - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, options...) } // SendInputRoomEvents to the roomserver. func SendInputRoomEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + ires []InputRoomEvent, options ...InputOption, ) error { - request := InputRoomEventsRequest{InputRoomEvents: ires} + async := false + for _, opt := range options { + switch opt { + case InputOptionAsync: + async = true + } + } + request := InputRoomEventsRequest{ + InputRoomEvents: ires, + Asynchronous: async, + } var response InputRoomEventsResponse rsAPI.InputRoomEvents(ctx, &request, &response) return response.Err() diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index b8279a86..b64fc1fa 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -166,17 +166,19 @@ func (r *Inputer) InputRoomEvents( worker.input.push(tasks[i]) } - // Wait for all of the workers to return results about our tasks. - wg.Wait() + if !request.Asynchronous { + // Wait for all of the workers to return results about our tasks. + wg.Wait() - // If any of the tasks returned an error, we should probably report - // that back to the caller. - for _, task := range tasks { - if task.err != nil { - response.ErrMsg = task.err.Error() - _, rejected := task.err.(*gomatrixserverlib.NotAllowed) - response.NotAllowed = rejected - return + // If any of the tasks returned an error, we should probably report + // that back to the caller. + for _, task := range tasks { + if task.err != nil { + response.ErrMsg = task.err.Error() + _, rejected := task.err.(*gomatrixserverlib.NotAllowed) + response.NotAllowed = rejected + return + } } } }