Check if there are more events to send before sleeping

This commit is contained in:
Andrew Morgan 2018-06-11 15:51:00 +01:00
parent 7aa45b30ff
commit bc2ea24445
3 changed files with 16 additions and 9 deletions

View file

@ -50,8 +50,8 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
` `
const selectEventsByApplicationServiceIDSQL = "" + const selectEventsByApplicationServiceIDSQL = "" +
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, COUNT(id) OVER() AS full_count " +
"WHERE as_id = $1 ORDER BY id ASC LIMIT $2" "FROM appservice_events WHERE as_id = $1 ORDER BY id ASC LIMIT $2"
const countEventsByApplicationServiceIDSQL = "" + const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
@ -95,19 +95,21 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
// selectEventsByApplicationServiceID takes in an application service ID and // selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service, // returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database // as well as an int later used to remove these same events from the database
// once successfully sent to an application service. // once successfully sent to an application service. The total event count is
// used by a worker to determine if more events need to be pulled from the DB
// later.
func (s *eventsStatements) selectEventsByApplicationServiceID( func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context, ctx context.Context,
applicationServiceID string, applicationServiceID string,
limit int, limit int,
) ( ) (
maxID int, maxID, totalEvents int,
events []gomatrixserverlib.ApplicationServiceEvent, events []gomatrixserverlib.ApplicationServiceEvent,
err error, err error,
) { ) {
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
if err != nil { if err != nil {
return 0, nil, err return 0, 0, nil, err
} }
defer func() { defer func() {
err = eventRows.Close() err = eventRows.Close()
@ -130,9 +132,10 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
&event.Type, &event.Type,
&event.UserID, &event.UserID,
&eventContent, &eventContent,
&totalEvents,
) )
if err != nil { if err != nil {
return 0, nil, err return 0, 0, nil, err
} }
if eventContent.Valid { if eventContent.Valid {
event.Content = gomatrixserverlib.RawJSON(eventContent.String) event.Content = gomatrixserverlib.RawJSON(eventContent.String)

View file

@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID(
ctx context.Context, ctx context.Context,
appServiceID string, appServiceID string,
limit int, limit int,
) (int, []gomatrixserverlib.ApplicationServiceEvent, error) { ) (int, int, []gomatrixserverlib.ApplicationServiceEvent, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
} }

View file

@ -101,7 +101,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.EventsReady = false ws.EventsReady = false
ws.Cond.L.Unlock() ws.Cond.L.Unlock()
maxID, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) maxID, totalEvents, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize)
if err != nil { if err != nil {
log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB", log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID) ws.AppService.ID)
@ -146,8 +146,12 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.AppService.ID) ws.AppService.ID)
return return
} }
// Only wait for more events once we've sent all the events in the database
if totalEvents <= transactionBatchSize {
waitForEvents(&ws) waitForEvents(&ws)
} }
}
} }
// waitForEvents pauses the calling goroutine while it waits for a broadcast message // waitForEvents pauses the calling goroutine while it waits for a broadcast message