diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 91b0dd44..e41b80f7 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -270,6 +270,21 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res for e := range q { evStart := time.Now() if err := t.processEvent(ctx, e); err != nil { + // If the error is due to the event itself being bad then we skip + // it and move onto the next event. We report an error so that the + // sender knows that we have skipped processing it. + // + // However if the event is due to a temporary failure in our server + // such as a database being unavailable then we should bail, and + // hope that the sender will retry when we are feeling better. + // + // It is uncertain what we should do if an event fails because + // we failed to fetch more information from the sending server. + // For example if a request to /state fails. + // If we skip the event then we risk missing the event until we + // receive another event referencing it. + // If we bail and stop processing then we risk wedging incoming + // transactions from that server forever. if isProcessingErrorFatal(err) { sentry.CaptureException(err) // Any other error should be the result of a temporary error in @@ -314,64 +329,10 @@ func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.Res wg.Wait() - /* - // Process the events. - for _, e := range pdus { - evStart := time.Now() - if err := t.processEvent(ctx, e.Unwrap()); err != nil { - // If the error is due to the event itself being bad then we skip - // it and move onto the next event. We report an error so that the - // sender knows that we have skipped processing it. - // - // However if the event is due to a temporary failure in our server - // such as a database being unavailable then we should bail, and - // hope that the sender will retry when we are feeling better. - // - // It is uncertain what we should do if an event fails because - // we failed to fetch more information from the sending server. - // For example if a request to /state fails. - // If we skip the event then we risk missing the event until we - // receive another event referencing it. - // If we bail and stop processing then we risk wedging incoming - // transactions from that server forever. - if isProcessingErrorFatal(err) { - sentry.CaptureException(err) - // Any other error should be the result of a temporary error in - // our server so we should bail processing the transaction entirely. - util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err) - jsonErr := util.ErrorResponse(err) - processEventSummary.WithLabelValues(t.work, MetricsOutcomeFatal).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - return nil, &jsonErr - } else { - // Auth errors mean the event is 'rejected' which have to be silent to appease sytest - errMsg := "" - outcome := MetricsOutcomeRejected - _, rejected := err.(*gomatrixserverlib.NotAllowed) - if !rejected { - errMsg = err.Error() - outcome = MetricsOutcomeFail - } - util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn( - "Failed to process incoming federation event, skipping", - ) - processEventSummary.WithLabelValues(t.work, outcome).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - results[e.EventID()] = gomatrixserverlib.PDUResult{ - Error: errMsg, - } - } - } else { - results[e.EventID()] = gomatrixserverlib.PDUResult{} - pduCountTotal.WithLabelValues("success").Inc() - processEventSummary.WithLabelValues(t.work, MetricsOutcomeOK).Observe( - float64(time.Since(evStart).Nanoseconds()) / 1000., - ) - } - } - */ + for k := range perRoom { + close(perRoom[k]) + perRoom[k] = nil + } t.processEDUs(ctx) if c := len(results); c > 0 {