diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go index 5bad939f..ff5cd689 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/events_table.go @@ -115,6 +115,9 @@ const bulkSelectEventNIDSQL = "" + const selectMaxEventDepthSQL = "" + "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)" +const selectUnsentEventsSQL = "" + + "SELECT event_nid FROM roomserver_events WHERE NOT sent_to_output" + type eventStatements struct { insertEventStmt *sql.Stmt selectEventStmt *sql.Stmt @@ -129,6 +132,7 @@ type eventStatements struct { bulkSelectEventIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt + selectUnsentEventsStmt *sql.Stmt } func (s *eventStatements) prepare(db *sql.DB) (err error) { @@ -151,6 +155,7 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) { {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, + {&s.selectUnsentEventsStmt, selectUnsentEventsSQL}, }.prepare(db) } @@ -408,3 +413,23 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array { } return nids } + +func (s *eventStatements) getUnsentEventNids(ctx context.Context) ([]types.EventNID, error) { + rows, err := s.selectUnsentEventsStmt.QueryContext(ctx) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + var results []types.EventNID + for rows.Next() { + var eventNID int64 + if err = rows.Scan(&eventNID); err != nil { + return nil, err + } + + results = append(results, types.EventNID(eventNID)) + } + + return results, nil +} diff --git a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go index b94036c9..6c55d035 100644 --- a/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/roomserver/storage/storage.go @@ -17,6 +17,7 @@ package storage import ( "context" "database/sql" + "sort" // Import the postgres database driver. _ "github.com/lib/pq" @@ -666,6 +667,24 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type return d.Events(ctx, nids) } +// UnsentEvents gets a list of events that have persisted but haven't yet been +// confirmed sent down the kaffka stream. Events should be sent in order. +func (d *Database) UnsentEvents(ctx context.Context) ([]types.Event, error) { + nids, err := d.statements.getUnsentEventNids(ctx) + if err != nil { + return nil, err + } + + events, err := d.Events(ctx, nids) + if err != nil { + return nil, err + } + + sort.Slice(events, func(i, j int) bool { return events[i].EventNID < events[j].EventNID }) + + return events, nil +} + type transaction struct { ctx context.Context txn *sql.Tx