Persist events/state in order

This commit is contained in:
Neil Alexander 2021-01-25 16:23:49 +00:00
parent 9a7b42c5ed
commit d1d82656e7
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
2 changed files with 20 additions and 48 deletions

View file

@ -39,13 +39,6 @@ type Inputer struct {
latestEventsMutexes sync.Map // room ID -> sync.Mutex
}
type inputTask struct {
ctx context.Context
event *api.InputRoomEvent
wg *sync.WaitGroup
err error // written back by worker, only safe to read when all tasks are done
}
// WriteOutputEvents implements OutputRoomEventWriter
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
messages := make([]*sarama.ProducerMessage, len(updates))
@ -94,40 +87,14 @@ func (r *Inputer) InputRoomEvents(
request *api.InputRoomEventsRequest,
response *api.InputRoomEventsResponse,
) {
// Create a wait group. Each task that we dispatch will call Done on
// this wait group so that we know when all of our events have been
// processed.
wg := &sync.WaitGroup{}
wg.Add(len(request.InputRoomEvents))
tasks := make([]*inputTask, len(request.InputRoomEvents))
for i := range request.InputRoomEvents {
tasks[i] = &inputTask{
ctx: context.Background(),
event: &request.InputRoomEvents[i],
wg: wg,
}
go func(task *inputTask) {
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
_, task.err = r.processRoomEvent(task.ctx, task.event)
if task.err == nil {
hooks.Run(hooks.KindNewEventPersisted, task.event.Event)
}
task.wg.Done()
}(tasks[i])
}
// Wait for all of the workers to return results about our tasks.
wg.Wait()
// If any of the tasks returned an error, we should probably report
// that back to the caller.
for _, task := range tasks {
if task.err != nil {
response.ErrMsg = task.err.Error()
_, rejected := task.err.(*gomatrixserverlib.NotAllowed)
hooks.Run(hooks.KindNewEventReceived, &request.InputRoomEvents[i])
if _, err := r.processRoomEvent(context.Background(), &request.InputRoomEvents[i]); err == nil {
hooks.Run(hooks.KindNewEventPersisted, &request.InputRoomEvents[i])
} else {
response.ErrMsg = err.Error()
_, rejected := err.(*gomatrixserverlib.NotAllowed)
response.NotAllowed = rejected
return
}
}
}

View file

@ -192,15 +192,20 @@ func (r *Inputer) processRoomEvent(
switch input.Kind {
case api.KindNew:
if err = r.updateLatestEvents(
ctx, // context
roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below)
event, // event
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
); err != nil {
errch := make(chan error)
go func() {
errch <- r.updateLatestEvents(
ctx, // context
roomInfo, // room info for the room being updated
stateAtEvent, // state at event (below)
event, // event
input.SendAsServer, // send as server
input.TransactionID, // transaction ID
input.HasState, // rewrites state?
)
close(errch)
}()
if err = <-errch; err != nil {
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
}
case api.KindOld: