mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Try implementing async roomserver input
This commit is contained in:
parent
7b0ad24a45
commit
57be026e81
16 changed files with 44 additions and 23 deletions
|
@ -365,6 +365,7 @@ func createRoom(
|
|||
},
|
||||
ev.Headered(roomVersion),
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed")
|
||||
return jsonerror.InternalServerError()
|
||||
|
|
|
@ -80,6 +80,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
|
|||
[]*gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
|
||||
cfg.Matrix.ServerName,
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
util.GetLogger(ctx).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
|
|
|
@ -169,7 +169,7 @@ func SetAvatarURL(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
@ -286,7 +286,7 @@ func SetDisplayName(
|
|||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
|
@ -121,7 +121,7 @@ func SendRedaction(
|
|||
JSON: jsonerror.NotFound("Room does not exist"),
|
||||
}
|
||||
}
|
||||
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil); err != nil {
|
||||
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, api.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil, false); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
|
|
@ -122,6 +122,7 @@ func SendEvent(
|
|||
},
|
||||
cfg.Matrix.ServerName,
|
||||
txnAndSessionID,
|
||||
false,
|
||||
); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
|
|
|
@ -367,5 +367,6 @@ func emit3PIDInviteEvent(
|
|||
},
|
||||
cfg.Matrix.ServerName,
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
|
|
@ -294,6 +294,7 @@ func SendJoin(
|
|||
},
|
||||
cfg.Matrix.ServerName,
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
|
|
|
@ -260,6 +260,7 @@ func SendLeave(
|
|||
},
|
||||
cfg.Matrix.ServerName,
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
|
|
|
@ -550,6 +550,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
|
|||
},
|
||||
api.DoNotSendToOtherServers,
|
||||
nil,
|
||||
false,
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -592,6 +593,7 @@ withNextEvent:
|
|||
SendAsServer: api.DoNotSendToOtherServers,
|
||||
},
|
||||
},
|
||||
false,
|
||||
); err != nil {
|
||||
return fmt.Errorf("api.SendEvents: %w", err)
|
||||
}
|
||||
|
@ -741,6 +743,7 @@ func (t *txnReq) processEventWithMissingState(
|
|||
resolvedState,
|
||||
backwardsExtremity.Headered(roomVersion),
|
||||
t.hadEvents,
|
||||
true,
|
||||
)
|
||||
if err != nil {
|
||||
return fmt.Errorf("api.SendEventWithState: %w", err)
|
||||
|
@ -761,6 +764,7 @@ func (t *txnReq) processEventWithMissingState(
|
|||
append(headeredNewEvents, e.Headered(roomVersion)),
|
||||
api.DoNotSendToOtherServers,
|
||||
nil,
|
||||
true,
|
||||
); err != nil {
|
||||
return fmt.Errorf("api.SendEvents: %w", err)
|
||||
}
|
||||
|
|
|
@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites(
|
|||
}
|
||||
|
||||
// Send all the events
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil {
|
||||
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil, false); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
}
|
||||
|
@ -180,6 +180,7 @@ func ExchangeThirdPartyInvite(
|
|||
},
|
||||
cfg.Matrix.ServerName,
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
|
||||
return jsonerror.InternalServerError()
|
||||
|
|
|
@ -250,6 +250,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
|
|||
respState,
|
||||
event.Headered(respMakeJoin.RoomVersion),
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"room_id": roomID,
|
||||
|
@ -431,6 +432,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
|
|||
&respState,
|
||||
respPeek.LatestEvent.Headered(respPeek.RoomVersion),
|
||||
nil,
|
||||
false,
|
||||
); err != nil {
|
||||
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
|
||||
}
|
||||
|
|
|
@ -86,6 +86,7 @@ type TransactionID struct {
|
|||
// InputRoomEventsRequest is a request to InputRoomEvents
|
||||
type InputRoomEventsRequest struct {
|
||||
InputRoomEvents []InputRoomEvent `json:"input_room_events"`
|
||||
Asynchronous bool `json:"async"`
|
||||
}
|
||||
|
||||
// InputRoomEventsResponse is a response to InputRoomEvents
|
||||
|
|
|
@ -27,6 +27,7 @@ func SendEvents(
|
|||
ctx context.Context, rsAPI RoomserverInternalAPI,
|
||||
kind Kind, events []*gomatrixserverlib.HeaderedEvent,
|
||||
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
|
||||
async bool,
|
||||
) error {
|
||||
ires := make([]InputRoomEvent, len(events))
|
||||
for i, event := range events {
|
||||
|
@ -38,7 +39,7 @@ func SendEvents(
|
|||
TransactionID: txnID,
|
||||
}
|
||||
}
|
||||
return SendInputRoomEvents(ctx, rsAPI, ires)
|
||||
return SendInputRoomEvents(ctx, rsAPI, ires, async)
|
||||
}
|
||||
|
||||
// SendEventWithState writes an event with the specified kind to the roomserver
|
||||
|
@ -47,7 +48,7 @@ func SendEvents(
|
|||
func SendEventWithState(
|
||||
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
|
||||
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
|
||||
haveEventIDs map[string]bool,
|
||||
haveEventIDs map[string]bool, async bool,
|
||||
) error {
|
||||
outliers, err := state.Events()
|
||||
if err != nil {
|
||||
|
@ -79,14 +80,18 @@ func SendEventWithState(
|
|||
StateEventIDs: stateEventIDs,
|
||||
})
|
||||
|
||||
return SendInputRoomEvents(ctx, rsAPI, ires)
|
||||
return SendInputRoomEvents(ctx, rsAPI, ires, async)
|
||||
}
|
||||
|
||||
// SendInputRoomEvents to the roomserver.
|
||||
func SendInputRoomEvents(
|
||||
ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
|
||||
ctx context.Context, rsAPI RoomserverInternalAPI,
|
||||
ires []InputRoomEvent, async bool,
|
||||
) error {
|
||||
request := InputRoomEventsRequest{InputRoomEvents: ires}
|
||||
request := InputRoomEventsRequest{
|
||||
InputRoomEvents: ires,
|
||||
Asynchronous: async,
|
||||
}
|
||||
var response InputRoomEventsResponse
|
||||
rsAPI.InputRoomEvents(ctx, &request, &response)
|
||||
return response.Err()
|
||||
|
|
|
@ -166,17 +166,19 @@ func (r *Inputer) InputRoomEvents(
|
|||
worker.input.push(tasks[i])
|
||||
}
|
||||
|
||||
// Wait for all of the workers to return results about our tasks.
|
||||
wg.Wait()
|
||||
if !request.Asynchronous {
|
||||
// Wait for all of the workers to return results about our tasks.
|
||||
wg.Wait()
|
||||
|
||||
// If any of the tasks returned an error, we should probably report
|
||||
// that back to the caller.
|
||||
for _, task := range tasks {
|
||||
if task.err != nil {
|
||||
response.ErrMsg = task.err.Error()
|
||||
_, rejected := task.err.(*gomatrixserverlib.NotAllowed)
|
||||
response.NotAllowed = rejected
|
||||
return
|
||||
// If any of the tasks returned an error, we should probably report
|
||||
// that back to the caller.
|
||||
for _, task := range tasks {
|
||||
if task.err != nil {
|
||||
response.ErrMsg = task.err.Error()
|
||||
_, rejected := task.err.(*gomatrixserverlib.NotAllowed)
|
||||
response.NotAllowed = rejected
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -190,7 +190,7 @@ func mustSendEvents(t *testing.T, ver gomatrixserverlib.RoomVersion, events []js
|
|||
t.Helper()
|
||||
rsAPI, dp := mustCreateRoomserverAPI(t)
|
||||
hevents := mustLoadRawEvents(t, ver, events)
|
||||
if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil); err != nil {
|
||||
if err := api.SendEvents(ctx, rsAPI, api.KindNew, hevents, testOrigin, nil, false); err != nil {
|
||||
t.Errorf("failed to SendEvents: %s", err)
|
||||
}
|
||||
return rsAPI, dp, hevents
|
||||
|
@ -336,7 +336,7 @@ func TestOutputRewritesState(t *testing.T) {
|
|||
deleteDatabase()
|
||||
rsAPI, producer := mustCreateRoomserverAPI(t)
|
||||
defer deleteDatabase()
|
||||
err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil)
|
||||
err := api.SendEvents(context.Background(), rsAPI, api.KindNew, originalEvents, testOrigin, nil, false)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to send original events: %s", err)
|
||||
}
|
||||
|
@ -371,7 +371,7 @@ func TestOutputRewritesState(t *testing.T) {
|
|||
HasState: true,
|
||||
StateEventIDs: stateIDs,
|
||||
})
|
||||
if err := api.SendInputRoomEvents(context.Background(), rsAPI, inputEvents); err != nil {
|
||||
if err := api.SendInputRoomEvents(context.Background(), rsAPI, inputEvents, false); err != nil {
|
||||
t.Fatalf("SendInputRoomEvents returned error for rewrite events: %s", err)
|
||||
}
|
||||
// we should just have one output event with the entire state of the room in it
|
||||
|
|
|
@ -649,7 +649,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event
|
|||
})
|
||||
}
|
||||
// we've got the data by this point so use a background context
|
||||
err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
|
||||
err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires, false)
|
||||
if err != nil {
|
||||
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue