From bc2ea2444532e664eeddef408ce69548a93a1048 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 11 Jun 2018 15:51:00 +0100 Subject: [PATCH] Check if there are more events to send before sleeping --- .../appservice/storage/appservice_events_table.go | 15 +++++++++------ .../dendrite/appservice/storage/storage.go | 2 +- .../appservice/workers/transaction_scheduler.go | 8 ++++++-- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index c6b23ac4..9020cc9b 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -50,8 +50,8 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + - "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " + - "WHERE as_id = $1 ORDER BY id ASC LIMIT $2" + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, COUNT(id) OVER() AS full_count " + + "FROM appservice_events WHERE as_id = $1 ORDER BY id ASC LIMIT $2" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" @@ -95,19 +95,21 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { // selectEventsByApplicationServiceID takes in an application service ID and // returns a slice of events that need to be sent to that application service, // as well as an int later used to remove these same events from the database -// once successfully sent to an application service. +// once successfully sent to an application service. The total event count is +// used by a worker to determine if more events need to be pulled from the DB +// later. func (s *eventsStatements) selectEventsByApplicationServiceID( ctx context.Context, applicationServiceID string, limit int, ) ( - maxID int, + maxID, totalEvents int, events []gomatrixserverlib.ApplicationServiceEvent, err error, ) { eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) if err != nil { - return 0, nil, err + return 0, 0, nil, err } defer func() { err = eventRows.Close() @@ -130,9 +132,10 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( &event.Type, &event.UserID, &eventContent, + &totalEvents, ) if err != nil { - return 0, nil, err + return 0, 0, nil, err } if eventContent.Valid { event.Content = gomatrixserverlib.RawJSON(eventContent.String) diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index 6046f1e5..4bc73c69 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID( ctx context.Context, appServiceID string, limit int, -) (int, []gomatrixserverlib.ApplicationServiceEvent, error) { +) (int, int, []gomatrixserverlib.ApplicationServiceEvent, error) { return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index 5a511c43..e3432c9a 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -101,7 +101,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.EventsReady = false ws.Cond.L.Unlock() - maxID, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) + maxID, totalEvents, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) if err != nil { log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB", ws.AppService.ID) @@ -146,7 +146,11 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) { ws.AppService.ID) return } - waitForEvents(&ws) + + // Only wait for more events once we've sent all the events in the database + if totalEvents <= transactionBatchSize { + waitForEvents(&ws) + } } }