From 3f205b1dab7880d82a7e1d96d8bbf223f8856472 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 Dec 2017 15:43:02 +0000 Subject: [PATCH] Start the unsent sending --- .../roomserver/input/latest_events.go | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go index c2f06393..ed4d7a28 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go +++ b/src/github.com/matrix-org/dendrite/roomserver/input/latest_events.go @@ -17,6 +17,7 @@ package input import ( "bytes" "context" + "sync" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/roomserver/api" @@ -26,6 +27,44 @@ import ( "github.com/matrix-org/util" ) +type sendValue struct { + finishedChan chan<- struct{} + stateAtEvent types.StateAtEvent + event gomatrixserverlib.Event + sendAsServer string + transactionID *api.TransactionID +} + +type EventSender struct { + db RoomEventDatabase + outputWriter OutputRoomEventWriter + sendingMutex sync.Mutex + sending map[types.RoomNID][]sendValue +} + +func (e *EventSender) send( + ctx context.Context, + roomNID types.RoomNID, + stateAtEvent types.StateAtEvent, + event gomatrixserverlib.Event, + sendAsServer string, + transactionID *api.TransactionID, +) { + e.sendingMutex.Lock() + defer e.sendingMutex.Unlock() + + finishedChan := make(chan struct{}) + e.sending[roomNID] = append(e.sending[roomNID], sendValue{ + finishedChan: finishedChan, + stateAtEvent: stateAtEvent, + event: event, + sendAsServer: sendAsServer, + transactionID: transactionID, + }) + + <-finishedChan +} + // updateLatestEvents updates the list of latest events for this room in the database and writes the // event to the output log. // The latest events are the events that aren't referenced by another event in the database: