diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index 2d886746..f1bb4f71 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -365,6 +365,7 @@ func createRoom( }, ev.Headered(roomVersion), nil, + false, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/membership.go b/clientapi/routing/membership.go index bc679631..298031f1 100644 --- a/clientapi/routing/membership.go +++ b/clientapi/routing/membership.go @@ -80,6 +80,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us []*gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)}, cfg.Matrix.ServerName, nil, + false, ); err != nil { util.GetLogger(ctx).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/clientapi/routing/profile.go b/clientapi/routing/profile.go index 7bea35e5..9de1869b 100644 --- a/clientapi/routing/profile.go +++ b/clientapi/routing/profile.go @@ -169,7 +169,7 @@ func SetAvatarURL( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -286,7 +286,7 @@ func SetDisplayName( return jsonerror.InternalServerError() } - if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/redaction.go b/clientapi/routing/redaction.go index 92375974..3b6efdb9 100644 --- a/clientapi/routing/redaction.go +++ b/clientapi/routing/redaction.go @@ -121,7 +121,7 @@ func SendRedaction( JSON: jsonerror.NotFound("Room does not exist"), } } - if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil); err != nil { + if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents") return jsonerror.InternalServerError() } diff --git a/clientapi/routing/sendevent.go b/clientapi/routing/sendevent.go index 204d2592..f0498312 100644 --- a/clientapi/routing/sendevent.go +++ b/clientapi/routing/sendevent.go @@ -122,6 +122,7 @@ func SendEvent( }, cfg.Matrix.ServerName, txnAndSessionID, + false, ); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/clientapi/threepid/invites.go b/clientapi/threepid/invites.go index 53cd6b8c..985cf00c 100644 --- a/clientapi/threepid/invites.go +++ b/clientapi/threepid/invites.go @@ -367,5 +367,6 @@ func emit3PIDInviteEvent( }, cfg.Matrix.ServerName, nil, + false, ) } diff --git a/federationapi/routing/join.go b/federationapi/routing/join.go index e9414033..6be96b31 100644 --- a/federationapi/routing/join.go +++ b/federationapi/routing/join.go @@ -294,6 +294,7 @@ func SendJoin( }, cfg.Matrix.ServerName, nil, + false, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/federationapi/routing/leave.go b/federationapi/routing/leave.go index a5231004..8fbd848e 100644 --- a/federationapi/routing/leave.go +++ b/federationapi/routing/leave.go @@ -260,6 +260,7 @@ func SendLeave( }, cfg.Matrix.ServerName, nil, + false, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed") return jsonerror.InternalServerError() diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 032c0c3b..b67612c5 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -550,6 +550,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e }, api.DoNotSendToOtherServers, nil, + false, ) } @@ -592,6 +593,7 @@ withNextEvent: SendAsServer: api.DoNotSendToOtherServers, }, }, + false, ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } @@ -741,6 +743,7 @@ func (t *txnReq) processEventWithMissingState( resolvedState, backwardsExtremity.Headered(roomVersion), t.hadEvents, + true, ) if err != nil { return fmt.Errorf("api.SendEventWithState: %w", err) @@ -761,6 +764,7 @@ func (t *txnReq) processEventWithMissingState( append(headeredNewEvents, e.Headered(roomVersion)), api.DoNotSendToOtherServers, nil, + true, ); err != nil { return fmt.Errorf("api.SendEvents: %w", err) } diff --git a/federationapi/routing/threepid.go b/federationapi/routing/threepid.go index 5ba28881..fb919a59 100644 --- a/federationapi/routing/threepid.go +++ b/federationapi/routing/threepid.go @@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites( } // Send all the events - if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil { + if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil, false); err != nil { util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() } @@ -180,6 +180,7 @@ func ExchangeThirdPartyInvite( }, cfg.Matrix.ServerName, nil, + false, ); err != nil { util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed") return jsonerror.InternalServerError() diff --git a/federationsender/internal/perform.go b/federationsender/internal/perform.go index 968df247..7d8d7599 100644 --- a/federationsender/internal/perform.go +++ b/federationsender/internal/perform.go @@ -250,6 +250,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer( respState, event.Headered(respMakeJoin.RoomVersion), nil, + false, ); err != nil { logrus.WithFields(logrus.Fields{ "room_id": roomID, @@ -431,6 +432,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer( &respState, respPeek.LatestEvent.Headered(respPeek.RoomVersion), nil, + false, ); err != nil { return fmt.Errorf("r.producer.SendEventWithState: %w", err) } diff --git a/roomserver/api/input.go b/roomserver/api/input.go index 8e6e4ac7..a537e64e 100644 --- a/roomserver/api/input.go +++ b/roomserver/api/input.go @@ -86,6 +86,7 @@ type TransactionID struct { // InputRoomEventsRequest is a request to InputRoomEvents type InputRoomEventsRequest struct { InputRoomEvents []InputRoomEvent `json:"input_room_events"` + Asynchronous bool `json:"async"` } // InputRoomEventsResponse is a response to InputRoomEvents diff --git a/roomserver/api/wrapper.go b/roomserver/api/wrapper.go index 2ebe2f64..26d89804 100644 --- a/roomserver/api/wrapper.go +++ b/roomserver/api/wrapper.go @@ -27,6 +27,7 @@ func SendEvents( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, events []*gomatrixserverlib.HeaderedEvent, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, + async bool, ) error { ires := make([]InputRoomEvent, len(events)) for i, event := range events { @@ -38,7 +39,7 @@ func SendEvents( TransactionID: txnID, } } - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, async) } // SendEventWithState writes an event with the specified kind to the roomserver @@ -47,7 +48,7 @@ func SendEvents( func SendEventWithState( ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, - haveEventIDs map[string]bool, + haveEventIDs map[string]bool, async bool, ) error { outliers, err := state.Events() if err != nil { @@ -79,14 +80,18 @@ func SendEventWithState( StateEventIDs: stateEventIDs, }) - return SendInputRoomEvents(ctx, rsAPI, ires) + return SendInputRoomEvents(ctx, rsAPI, ires, async) } // SendInputRoomEvents to the roomserver. func SendInputRoomEvents( - ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, + ctx context.Context, rsAPI RoomserverInternalAPI, + ires []InputRoomEvent, async bool, ) error { - request := InputRoomEventsRequest{InputRoomEvents: ires} + request := InputRoomEventsRequest{ + InputRoomEvents: ires, + Asynchronous: async, + } var response InputRoomEventsResponse rsAPI.InputRoomEvents(ctx, &request, &response) return response.Err() diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index b8279a86..b64fc1fa 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -166,17 +166,19 @@ func (r *Inputer) InputRoomEvents( worker.input.push(tasks[i]) } - // Wait for all of the workers to return results about our tasks. - wg.Wait() + if !request.Asynchronous { + // Wait for all of the workers to return results about our tasks. + wg.Wait() - // If any of the tasks returned an error, we should probably report - // that back to the caller. - for _, task := range tasks { - if task.err != nil { - response.ErrMsg = task.err.Error() - _, rejected := task.err.(*gomatrixserverlib.NotAllowed) - response.NotAllowed = rejected - return + // If any of the tasks returned an error, we should probably report + // that back to the caller. + for _, task := range tasks { + if task.err != nil { + response.ErrMsg = task.err.Error() + _, rejected := task.err.(*gomatrixserverlib.NotAllowed) + response.NotAllowed = rejected + return + } } } } diff --git a/roomserver/roomserver_test.go b/roomserver/roomserver_test.go index 5c954007..648bd75c 100644 --- a/roomserver/roomserver_test.go +++ b/roomserver/roomserver_test.go @@ -190,7 +190,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js t.Helper() rsAPI, dp := mustCreateRoomserverAPI(t) hevents := mustLoadRawEvents(t, ver, events) - if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil { + if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil, false); err != nil { t.Errorf("failed to SendEvents: %s", err) } return rsAPI, dp, hevents @@ -336,7 +336,7 @@ func TestOutputRewritesState(t *testing.T) { deleteDatabase() rsAPI, producer := mustCreateRoomserverAPI(t) defer deleteDatabase() - err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil) + err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil, false) if err != nil { t.Fatalf("failed to send original events: %s", err) } @@ -371,7 +371,7 @@ func TestOutputRewritesState(t *testing.T) { HasState: true, StateEventIDs: stateIDs, }) - if err := api.SendInputRoomEvents(context.Background(), rsAPI, inputEvents); err != nil { + if err := api.SendInputRoomEvents(context.Background(), rsAPI, inputEvents, false); err != nil { t.Fatalf("SendInputRoomEvents returned error for rewrite events: %s", err) } // we should just have one output event with the entire state of the room in it diff --git a/setup/mscs/msc2836/msc2836.go b/setup/mscs/msc2836/msc2836.go index f47a42d9..71b965ef 100644 --- a/setup/mscs/msc2836/msc2836.go +++ b/setup/mscs/msc2836/msc2836.go @@ -649,7 +649,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event }) } // we've got the data by this point so use a background context - err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires) + err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires, false) if err != nil { util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver") }