Asynchronous roomserver input from federation API /send

This commit is contained in:
Neil Alexander 2021-06-30 15:01:19 +01:00
parent 2647f6e9c5
commit ca42568640
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
4 changed files with 42 additions and 15 deletions

View file

@ -556,6 +556,7 @@ func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) e
}, },
api.DoNotSendToOtherServers, api.DoNotSendToOtherServers,
nil, nil,
api.InputOptionAsync,
) )
} }
@ -598,6 +599,7 @@ withNextEvent:
SendAsServer: api.DoNotSendToOtherServers, SendAsServer: api.DoNotSendToOtherServers,
}, },
}, },
api.InputOptionAsync,
); err != nil { ); err != nil {
return fmt.Errorf("api.SendEvents: %w", err) return fmt.Errorf("api.SendEvents: %w", err)
} }
@ -747,6 +749,7 @@ func (t *txnReq) processEventWithMissingState(
resolvedState, resolvedState,
backwardsExtremity.Headered(roomVersion), backwardsExtremity.Headered(roomVersion),
t.hadEvents, t.hadEvents,
api.InputOptionAsync,
) )
if err != nil { if err != nil {
return fmt.Errorf("api.SendEventWithState: %w", err) return fmt.Errorf("api.SendEventWithState: %w", err)
@ -767,6 +770,7 @@ func (t *txnReq) processEventWithMissingState(
append(headeredNewEvents, e.Headered(roomVersion)), append(headeredNewEvents, e.Headered(roomVersion)),
api.DoNotSendToOtherServers, api.DoNotSendToOtherServers,
nil, nil,
api.InputOptionAsync,
); err != nil { ); err != nil {
return fmt.Errorf("api.SendEvents: %w", err) return fmt.Errorf("api.SendEvents: %w", err)
} }

View file

@ -86,6 +86,9 @@ type TransactionID struct {
// InputRoomEventsRequest is a request to InputRoomEvents // InputRoomEventsRequest is a request to InputRoomEvents
type InputRoomEventsRequest struct { type InputRoomEventsRequest struct {
InputRoomEvents []InputRoomEvent `json:"input_room_events"` InputRoomEvents []InputRoomEvent `json:"input_room_events"`
// Asynchronous requests will queue up work into the roomserver and
// return straight away, rather than waiting for the results.
Asynchronous bool `json:"async"`
} }
// InputRoomEventsResponse is a response to InputRoomEvents // InputRoomEventsResponse is a response to InputRoomEvents

View file

@ -22,11 +22,18 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type InputOption int
const (
InputOptionAsync InputOption = iota
)
// SendEvents to the roomserver The events are written with KindNew. // SendEvents to the roomserver The events are written with KindNew.
func SendEvents( func SendEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, ctx context.Context, rsAPI RoomserverInternalAPI,
kind Kind, events []*gomatrixserverlib.HeaderedEvent, kind Kind, events []*gomatrixserverlib.HeaderedEvent,
sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID, sendAsServer gomatrixserverlib.ServerName, txnID *TransactionID,
options ...InputOption,
) error { ) error {
ires := make([]InputRoomEvent, len(events)) ires := make([]InputRoomEvent, len(events))
for i, event := range events { for i, event := range events {
@ -38,7 +45,7 @@ func SendEvents(
TransactionID: txnID, TransactionID: txnID,
} }
} }
return SendInputRoomEvents(ctx, rsAPI, ires) return SendInputRoomEvents(ctx, rsAPI, ires, options...)
} }
// SendEventWithState writes an event with the specified kind to the roomserver // SendEventWithState writes an event with the specified kind to the roomserver
@ -47,7 +54,7 @@ func SendEvents(
func SendEventWithState( func SendEventWithState(
ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind, ctx context.Context, rsAPI RoomserverInternalAPI, kind Kind,
state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent, state *gomatrixserverlib.RespState, event *gomatrixserverlib.HeaderedEvent,
haveEventIDs map[string]bool, haveEventIDs map[string]bool, options ...InputOption,
) error { ) error {
outliers, err := state.Events() outliers, err := state.Events()
if err != nil { if err != nil {
@ -79,14 +86,25 @@ func SendEventWithState(
StateEventIDs: stateEventIDs, StateEventIDs: stateEventIDs,
}) })
return SendInputRoomEvents(ctx, rsAPI, ires) return SendInputRoomEvents(ctx, rsAPI, ires, options...)
} }
// SendInputRoomEvents to the roomserver. // SendInputRoomEvents to the roomserver.
func SendInputRoomEvents( func SendInputRoomEvents(
ctx context.Context, rsAPI RoomserverInternalAPI, ires []InputRoomEvent, ctx context.Context, rsAPI RoomserverInternalAPI,
ires []InputRoomEvent, options ...InputOption,
) error { ) error {
request := InputRoomEventsRequest{InputRoomEvents: ires} async := false
for _, opt := range options {
switch opt {
case InputOptionAsync:
async = true
}
}
request := InputRoomEventsRequest{
InputRoomEvents: ires,
Asynchronous: async,
}
var response InputRoomEventsResponse var response InputRoomEventsResponse
rsAPI.InputRoomEvents(ctx, &request, &response) rsAPI.InputRoomEvents(ctx, &request, &response)
return response.Err() return response.Err()

View file

@ -166,17 +166,19 @@ func (r *Inputer) InputRoomEvents(
worker.input.push(tasks[i]) worker.input.push(tasks[i])
} }
// Wait for all of the workers to return results about our tasks. if !request.Asynchronous {
wg.Wait() // Wait for all of the workers to return results about our tasks.
wg.Wait()
// If any of the tasks returned an error, we should probably report // If any of the tasks returned an error, we should probably report
// that back to the caller. // that back to the caller.
for _, task := range tasks { for _, task := range tasks {
if task.err != nil { if task.err != nil {
response.ErrMsg = task.err.Error() response.ErrMsg = task.err.Error()
_, rejected := task.err.(*gomatrixserverlib.NotAllowed) _, rejected := task.err.(*gomatrixserverlib.NotAllowed)
response.NotAllowed = rejected response.NotAllowed = rejected
return return
}
} }
} }
} }