From 323a6fb54f023210a213ed8fb2a626358ef00fc3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Mon, 8 Nov 2021 09:24:16 +0000 Subject: [PATCH] Resume federation sends (#2039) * Resume federation sends * Review comments * Fix build error --- federationapi/routing/send.go | 40 ++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 3cae837c..4b5f0d66 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -148,7 +148,8 @@ type inputWorker struct { input *sendFIFOQueue } -var inputWorkers sync.Map // room ID -> *inputWorker +var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse +var inputWorkers sync.Map // room ID -> *inputWorker // Send implements /_matrix/federation/v1/send/{txnID} func Send( @@ -164,6 +165,37 @@ func Send( mu *internal.MutexByRoom, servers federationAPI.ServersInRoomProvider, ) util.JSONResponse { + // First we should check if this origin has already submitted this + // txn ID to us. If they have and the txnIDs map contains an entry, + // the transaction is still being worked on. The new client can wait + // for it to complete rather than creating more work. + index := string(request.Origin()) + "\000" + string(txnID) + v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1)) + ch := v.(chan util.JSONResponse) + if ok { + // This origin already submitted this txn ID to us, and the work + // is still taking place, so we'll just wait for it to finish. + ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5) + defer cancel() + select { + case <-ctx.Done(): + // If the caller gives up then return straight away. We don't + // want to attempt to process what they sent us any further. + return util.JSONResponse{Code: http.StatusRequestTimeout} + case res := <-ch: + // The original task just finished processing so let's return + // the result of it. + if res.Code == 0 { + return util.JSONResponse{Code: http.StatusAccepted} + } + return res + } + } + // Otherwise, store that we're currently working on this txn from + // this origin. When we're done processing, close the channel. + defer close(ch) + defer inFlightTxnsPerOrigin.Delete(index) + t := txnReq{ rsAPI: rsAPI, eduAPI: eduAPI, @@ -205,7 +237,7 @@ func Send( util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) - resp, jsonErr := t.processTransaction(httpReq.Context()) + resp, jsonErr := t.processTransaction(context.Background()) if jsonErr != nil { util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") return *jsonErr @@ -215,10 +247,12 @@ func Send( // Status code 200: // The result of processing the transaction. The server is to use this response // even in the event of one or more PDUs failing to be processed. - return util.JSONResponse{ + res := util.JSONResponse{ Code: http.StatusOK, JSON: resp, } + ch <- res + return res } type txnReq struct {