diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index a514127c..91b0dd44 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -193,6 +193,7 @@ type txnFederationClient interface { func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) { results := make(map[string]gomatrixserverlib.PDUResult) + var resultsMutex sync.Mutex pdus := []*gomatrixserverlib.HeaderedEvent{} for _, pdu := range t.PDUs { @@ -248,63 +249,130 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res pdus = append(pdus, event.Headered(verRes.RoomVersion)) } - // Process the events. + perRoom := map[string]chan *gomatrixserverlib.Event{} + perCount := map[string]int{} for _, e := range pdus { - evStart := time.Now() - if err := t.processEvent(ctx, e.Unwrap()); err != nil { - // If the error is due to the event itself being bad then we skip - // it and move onto the next event. We report an error so that the - // sender knows that we have skipped processing it. - // - // However if the event is due to a temporary failure in our server - // such as a database being unavailable then we should bail, and - // hope that the sender will retry when we are feeling better. - // - // It is uncertain what we should do if an event fails because - // we failed to fetch more information from the sending server. - // For example if a request to /state fails. - // If we skip the event then we risk missing the event until we - // receive another event referencing it. - // If we bail and stop processing then we risk wedging incoming - // transactions from that server forever. - if isProcessingErrorFatal(err) { - sentry.CaptureException(err) - // Any other error should be the result of a temporary error in - // our server so we should bail processing the transaction entirely. - util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) - jsonErr := util.ErrorResponse(err) - processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - return nil, &jsonErr - } else { - // Auth errors mean the event is 'rejected' which have to be silent to appease sytest - errMsg := "" - outcome := MetricsOutcomeRejected - _, rejected := err.(*gomatrixserverlib.NotAllowed) - if !rejected { - errMsg = err.Error() - outcome = MetricsOutcomeFail - } - util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( - "Failed to process incoming federation event, skipping", - ) - processEventSummary.WithLabelValues(t.work, outcome).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - results[e.EventID()] = gomatrixserverlib.PDUResult{ - Error: errMsg, + perCount[e.RoomID()]++ + } + for s, c := range perCount { + perRoom[s] = make(chan *gomatrixserverlib.Event, c) + } + for _, e := range pdus { + perRoom[e.RoomID()] <- e.Unwrap() + } + + var wg sync.WaitGroup + wg.Add(len(perRoom)) + + for _, q := range perRoom { + go func(q chan *gomatrixserverlib.Event) { + defer wg.Done() + for e := range q { + evStart := time.Now() + if err := t.processEvent(ctx, e); err != nil { + if isProcessingErrorFatal(err) { + sentry.CaptureException(err) + // Any other error should be the result of a temporary error in + // our server so we should bail processing the transaction entirely. + util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) + processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + } else { + // Auth errors mean the event is 'rejected' which have to be silent to appease sytest + errMsg := "" + outcome := MetricsOutcomeRejected + _, rejected := err.(*gomatrixserverlib.NotAllowed) + if !rejected { + errMsg = err.Error() + outcome = MetricsOutcomeFail + } + util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( + "Failed to process incoming federation event, skipping", + ) + processEventSummary.WithLabelValues(t.work, outcome).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + resultsMutex.Lock() + results[e.EventID()] = gomatrixserverlib.PDUResult{ + Error: errMsg, + } + resultsMutex.Unlock() + } + } else { + resultsMutex.Lock() + results[e.EventID()] = gomatrixserverlib.PDUResult{} + resultsMutex.Unlock() + pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) } } - } else { - results[e.EventID()] = gomatrixserverlib.PDUResult{} - pduCountTotal.WithLabelValues("success").Inc() - processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - } + }(q) } + wg.Wait() + + /* + // Process the events. + for _, e := range pdus { + evStart := time.Now() + if err := t.processEvent(ctx, e.Unwrap()); err != nil { + // If the error is due to the event itself being bad then we skip + // it and move onto the next event. We report an error so that the + // sender knows that we have skipped processing it. + // + // However if the event is due to a temporary failure in our server + // such as a database being unavailable then we should bail, and + // hope that the sender will retry when we are feeling better. + // + // It is uncertain what we should do if an event fails because + // we failed to fetch more information from the sending server. + // For example if a request to /state fails. + // If we skip the event then we risk missing the event until we + // receive another event referencing it. + // If we bail and stop processing then we risk wedging incoming + // transactions from that server forever. + if isProcessingErrorFatal(err) { + sentry.CaptureException(err) + // Any other error should be the result of a temporary error in + // our server so we should bail processing the transaction entirely. + util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) + jsonErr := util.ErrorResponse(err) + processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + return nil, &jsonErr + } else { + // Auth errors mean the event is 'rejected' which have to be silent to appease sytest + errMsg := "" + outcome := MetricsOutcomeRejected + _, rejected := err.(*gomatrixserverlib.NotAllowed) + if !rejected { + errMsg = err.Error() + outcome = MetricsOutcomeFail + } + util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( + "Failed to process incoming federation event, skipping", + ) + processEventSummary.WithLabelValues(t.work, outcome).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + results[e.EventID()] = gomatrixserverlib.PDUResult{ + Error: errMsg, + } + } + } else { + results[e.EventID()] = gomatrixserverlib.PDUResult{} + pduCountTotal.WithLabelValues("success").Inc() + processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( + float64(time.Since(evStart).Nanoseconds()) / 1000., + ) + } + } + */ + t.processEDUs(ctx) if c := len(results); c > 0 { util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)