This commit is contained in:
Erik Johnston 2017-12-20 09:46:02 +00:00
parent 3f205b1dab
commit ed375d1d76

View file

@ -17,7 +17,6 @@ package input
import ( import (
"bytes" "bytes"
"context" "context"
"sync"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/roomserver/api"
@ -27,8 +26,7 @@ import (
"github.com/matrix-org/util" "github.com/matrix-org/util"
) )
type sendValue struct { type EventSenderValue struct {
finishedChan chan<- struct{}
stateAtEvent types.StateAtEvent stateAtEvent types.StateAtEvent
event gomatrixserverlib.Event event gomatrixserverlib.Event
sendAsServer string sendAsServer string
@ -38,31 +36,37 @@ type sendValue struct {
type EventSender struct { type EventSender struct {
db RoomEventDatabase db RoomEventDatabase
outputWriter OutputRoomEventWriter outputWriter OutputRoomEventWriter
sendingMutex sync.Mutex linearizer common.Linearizer
sending map[types.RoomNID][]sendValue
} }
func (e *EventSender) send( func (e *EventSender) Send(
ctx context.Context, ctx context.Context,
roomNID types.RoomNID, roomNID types.RoomNID,
stateAtEvent types.StateAtEvent, stateAtEvent types.StateAtEvent,
event gomatrixserverlib.Event, event gomatrixserverlib.Event,
sendAsServer string, sendAsServer string,
transactionID *api.TransactionID, transactionID *api.TransactionID,
) { ) (err error) {
e.sendingMutex.Lock() e.linearizer.Await(event.RoomID(), func() {
defer e.sendingMutex.Unlock() err = updateLatestEvents(ctx, e.db, e.outputWriter, roomNID, stateAtEvent, event, sendAsServer, transactionID)
finishedChan := make(chan struct{})
e.sending[roomNID] = append(e.sending[roomNID], sendValue{
finishedChan: finishedChan,
stateAtEvent: stateAtEvent,
event: event,
sendAsServer: sendAsServer,
transactionID: transactionID,
}) })
<-finishedChan return
}
func (e *EventSender) SendMany(
ctx context.Context,
roomNID types.RoomNID,
events []EventSenderValue,
) (err error) {
e.linearizer.Await(event.RoomID(), func() {
for i := range events {
err = updateLatestEvents(ctx, e.db, e.outputWriter, roomNID, stateAtEvent, event, sendAsServer, transactionID)
if err != nil {
return
}
}
})
} }
// updateLatestEvents updates the list of latest events for this room in the database and writes the // updateLatestEvents updates the list of latest events for this room in the database and writes the