From d1d82656e75d9c65bad7127474d4b0774f929b3b Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 25 Jan 2021 16:23:49 +0000 Subject: [PATCH] Persist events/state in order --- roomserver/internal/input/input.go | 45 +++-------------------- roomserver/internal/input/input_events.go | 23 +++++++----- 2 files changed, 20 insertions(+), 48 deletions(-) diff --git a/roomserver/internal/input/input.go b/roomserver/internal/input/input.go index 69cc0ca1..b16f1da5 100644 --- a/roomserver/internal/input/input.go +++ b/roomserver/internal/input/input.go @@ -39,13 +39,6 @@ type Inputer struct { latestEventsMutexes sync.Map // room ID -> sync.Mutex } -type inputTask struct { - ctx context.Context - event *api.InputRoomEvent - wg *sync.WaitGroup - err error // written back by worker, only safe to read when all tasks are done -} - // WriteOutputEvents implements OutputRoomEventWriter func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error { messages := make([]*sarama.ProducerMessage, len(updates)) @@ -94,40 +87,14 @@ func (r *Inputer) InputRoomEvents( request *api.InputRoomEventsRequest, response *api.InputRoomEventsResponse, ) { - // Create a wait group. Each task that we dispatch will call Done on - // this wait group so that we know when all of our events have been - // processed. - wg := &sync.WaitGroup{} - wg.Add(len(request.InputRoomEvents)) - tasks := make([]*inputTask, len(request.InputRoomEvents)) - for i := range request.InputRoomEvents { - tasks[i] = &inputTask{ - ctx: context.Background(), - event: &request.InputRoomEvents[i], - wg: wg, - } - go func(task *inputTask) { - hooks.Run(hooks.KindNewEventReceived, task.event.Event) - _, task.err = r.processRoomEvent(task.ctx, task.event) - if task.err == nil { - hooks.Run(hooks.KindNewEventPersisted, task.event.Event) - } - task.wg.Done() - }(tasks[i]) - } - - // Wait for all of the workers to return results about our tasks. - wg.Wait() - - // If any of the tasks returned an error, we should probably report - // that back to the caller. - for _, task := range tasks { - if task.err != nil { - response.ErrMsg = task.err.Error() - _, rejected := task.err.(*gomatrixserverlib.NotAllowed) + hooks.Run(hooks.KindNewEventReceived, &request.InputRoomEvents[i]) + if _, err := r.processRoomEvent(context.Background(), &request.InputRoomEvents[i]); err == nil { + hooks.Run(hooks.KindNewEventPersisted, &request.InputRoomEvents[i]) + } else { + response.ErrMsg = err.Error() + _, rejected := err.(*gomatrixserverlib.NotAllowed) response.NotAllowed = rejected - return } } } diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index 2a558c48..ff60ec64 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -192,15 +192,20 @@ func (r *Inputer) processRoomEvent( switch input.Kind { case api.KindNew: - if err = r.updateLatestEvents( - ctx, // context - roomInfo, // room info for the room being updated - stateAtEvent, // state at event (below) - event, // event - input.SendAsServer, // send as server - input.TransactionID, // transaction ID - input.HasState, // rewrites state? - ); err != nil { + errch := make(chan error) + go func() { + errch <- r.updateLatestEvents( + ctx, // context + roomInfo, // room info for the room being updated + stateAtEvent, // state at event (below) + event, // event + input.SendAsServer, // send as server + input.TransactionID, // transaction ID + input.HasState, // rewrites state? + ) + close(errch) + }() + if err = <-errch; err != nil { return "", fmt.Errorf("r.updateLatestEvents: %w", err) } case api.KindOld: