Async and sync roomserver input

This commit is contained in:
Neil Alexander 2021-11-08 13:02:08 +00:00
parent d6229a3c9a
commit 8cc6937fef
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
15 changed files with 79 additions and 29 deletions

View file

@ -463,6 +463,7 @@ func createRoom(
},
ev.Headered(roomVersion),
nil,
false,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEventWithState failed")
return jsonerror.InternalServerError()

View file

@ -110,6 +110,7 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
[]*gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName,
nil,
false,
); err != nil {
util.GetLogger(ctx).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()

View file

@ -169,7 +169,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -286,7 +286,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, events, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}

View file

@ -120,7 +120,7 @@ func SendRedaction(
JSON: jsonerror.NotFound("Room does not exist"),
}
}
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil); err != nil {
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, roomserverAPI.KindNew, []*gomatrixserverlib.HeaderedEvent{e}, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError()
}

View file

@ -122,6 +122,7 @@ func SendEvent(
},
cfg.Matrix.ServerName,
txnAndSessionID,
false,
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()

View file

@ -367,5 +367,6 @@ func emit3PIDInviteEvent(
},
cfg.Matrix.ServerName,
nil,
false,
)
}

View file

@ -692,6 +692,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
},
api.DoNotSendToOtherServers,
nil,
true, // asynchronous
)
}
@ -734,6 +735,7 @@ withNextEvent:
SendAsServer: api.DoNotSendToOtherServers,
},
},
false,
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}
@ -882,6 +884,7 @@ func (t *txnReq) processEventWithMissingState(
resolvedState,
backwardsExtremity.Headered(roomVersion),
hadEvents,
true,
)
if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err)
@ -902,6 +905,7 @@ func (t *txnReq) processEventWithMissingState(
append(headeredNewEvents, e.Headered(roomVersion)),
api.DoNotSendToOtherServers,
nil,
true,
); err != nil {
return fmt.Errorf("api.SendEvents: %w", err)
}

View file

@ -89,7 +89,7 @@ func CreateInvitesFrom3PIDInvites(
}
// Send all the events
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, api.KindNew, evs, cfg.Matrix.ServerName, nil, false); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -180,6 +180,7 @@ func ExchangeThirdPartyInvite(
},
cfg.Matrix.ServerName,
nil,
false,
); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()

View file

@ -96,6 +96,9 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
}
func (t *KeyChangeConsumer) onDeviceKeyMessage(m api.DeviceMessage) error {
if m.DeviceKeys == nil {
return nil
}
logger := logrus.WithField("user_id", m.UserID)
// only send key change events which originated from us

View file

@ -249,7 +249,7 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
roomserverAPI.KindNew,
respState,
event.Headered(respMakeJoin.RoomVersion),
nil,
nil, false,
); err != nil {
logrus.WithFields(logrus.Fields{
"room_id": roomID,
@ -430,7 +430,7 @@ func (r *FederationSenderInternalAPI) performOutboundPeekUsingServer(
roomserverAPI.KindNew,
&respState,
respPeek.LatestEvent.Headered(respPeek.RoomVersion),
nil,
nil, false,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}

View file

@ -86,6 +86,7 @@ type TransactionID struct {
// InputRoomEventsRequest is a request to InputRoomEvents
type InputRoomEventsRequest struct {
InputRoomEvents []InputRoomEvent `json:"input_room_events"`
Asynchronous bool `json:"async"`
}
// InputRoomEventsResponse is a response to InputRoomEvents

View file

@ -27,6 +27,7 @@ func SendEvents(
ctx context.Context, rsAPI RoomserverInternalAPI,
kind Kind, events []*gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
async bool,
) error {
ires := make([]InputRoomEvent, len(events))
for i, event := range events {
@ -38,7 +39,7 @@ func SendEvents(
TransactionID: txnID,
}
}
return SendInputRoomEvents(ctx, rsAPI, ires)
return SendInputRoomEvents(ctx, rsAPI, ires, async)
}
// SendEventWithState writes an event with the specified kind to the roomserver
@ -47,7 +48,7 @@ func SendEvents(
func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
haveEventIDs map[string]bool,
haveEventIDs map[string]bool, async bool,
) error {
outliers, err := state.Events()
if err != nil {
@ -79,14 +80,18 @@ func SendEventWithState(
StateEventIDs: stateEventIDs,
})
return SendInputRoomEvents(ctx, rsAPI, ires)
return SendInputRoomEvents(ctx, rsAPI, ires, async)
}
// SendInputRoomEvents to the roomserver.
func SendInputRoomEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent,
ctx context.Context, rsAPI RoomserverInternalAPI,
ires []InputRoomEvent, async bool,
) error {
request := InputRoomEventsRequest{InputRoomEvents: ires}
request := InputRoomEventsRequest{
InputRoomEvents: ires,
Asynchronous: async,
}
var response InputRoomEventsResponse
rsAPI.InputRoomEvents(ctx, &request, &response)
return response.Err()

View file

@ -60,7 +60,7 @@ func (r *Inputer) Start() error {
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
var inputRoomEvent api.InputRoomEvent
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
_ = msg.Ack()
_ = msg.Term()
return
}
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
@ -68,6 +68,7 @@ func (r *Inputer) Start() error {
_ = msg.InProgress()
if _, err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
sentry.CaptureException(err)
_ = msg.Respond([]byte(err.Error()))
} else {
hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
}
@ -82,10 +83,11 @@ func (r *Inputer) Start() error {
// InputRoomEvents implements api.RoomserverInternalAPI
func (r *Inputer) InputRoomEvents(
_ context.Context,
ctx context.Context,
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) {
if request.Asynchronous {
var err error
for _, e := range request.InputRoomEvents {
msg := &nats.Msg{
@ -100,11 +102,38 @@ func (r *Inputer) InputRoomEvents(
return
}
if _, err = r.JetStream.PublishMsg(msg); err != nil {
response.ErrMsg = err.Error()
return
}
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
}
} else {
responses := make(chan error, len(request.InputRoomEvents))
defer close(responses)
for _, e := range request.InputRoomEvents {
inputRoomEvent := e
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
inbox.(*phony.Inbox).Act(nil, func() {
_, err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
if err != nil {
sentry.CaptureException(err)
} else {
hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
}
responses <- err
})
}
for i := 0; i < len(request.InputRoomEvents); i++ {
select {
case <-ctx.Done():
return
case err := <-responses:
if err != nil {
response.ErrMsg = err.Error()
return
}
}
}
}
}
// WriteOutputEvents implements OutputRoomEventWriter

View file

@ -649,7 +649,7 @@ func (rc *reqCtx) injectResponseToRoomserver(res *gomatrixserverlib.MSC2836Event
})
}
// we've got the data by this point so use a background context
err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires)
err = roomserver.SendInputRoomEvents(context.Background(), rc.rsAPI, ires, false)
if err != nil {
util.GetLogger(rc.ctx).WithError(err).Error("failed to inject MSC2836EventRelationshipsResponse into the roomserver")
}

View file

@ -120,6 +120,9 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
}
func (s *OutputKeyChangeEventConsumer) onDeviceKeyMessage(m api.DeviceMessage, offset int64, partition int32) error {
if m.DeviceKeys == nil {
return nil
}
output := m.DeviceKeys
// work out who we need to notify about the new key
var queryRes roomserverAPI.QuerySharedUsersResponse