Add storage APIs for getting unsent events

This commit is contained in:
Erik Johnston 2017-12-18 15:21:14 +00:00
parent 25f9fda0bb
commit c112c68d8e
2 changed files with 44 additions and 0 deletions

View file

@ -115,6 +115,9 @@ const bulkSelectEventNIDSQL = "" +
const selectMaxEventDepthSQL = "" + const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)" "SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid = ANY($1)"
const selectUnsentEventsSQL = "" +
"SELECT event_nid FROM roomserver_events WHERE NOT sent_to_output"
type eventStatements struct { type eventStatements struct {
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
selectEventStmt *sql.Stmt selectEventStmt *sql.Stmt
@ -129,6 +132,7 @@ type eventStatements struct {
bulkSelectEventIDStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt
bulkSelectEventNIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt
selectMaxEventDepthStmt *sql.Stmt selectMaxEventDepthStmt *sql.Stmt
selectUnsentEventsStmt *sql.Stmt
} }
func (s *eventStatements) prepare(db *sql.DB) (err error) { func (s *eventStatements) prepare(db *sql.DB) (err error) {
@ -151,6 +155,7 @@ func (s *eventStatements) prepare(db *sql.DB) (err error) {
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
{&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL}, {&s.selectMaxEventDepthStmt, selectMaxEventDepthSQL},
{&s.selectUnsentEventsStmt, selectUnsentEventsSQL},
}.prepare(db) }.prepare(db)
} }
@ -408,3 +413,23 @@ func eventNIDsAsArray(eventNIDs []types.EventNID) pq.Int64Array {
} }
return nids return nids
} }
func (s *eventStatements) getUnsentEventNids(ctx context.Context) ([]types.EventNID, error) {
rows, err := s.selectUnsentEventsStmt.QueryContext(ctx)
if err != nil {
return nil, err
}
defer rows.Close() // nolint: errcheck
var results []types.EventNID
for rows.Next() {
var eventNID int64
if err = rows.Scan(&eventNID); err != nil {
return nil, err
}
results = append(results, types.EventNID(eventNID))
}
return results, nil
}

View file

@ -17,6 +17,7 @@ package storage
import ( import (
"context" "context"
"database/sql" "database/sql"
"sort"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
@ -666,6 +667,24 @@ func (d *Database) EventsFromIDs(ctx context.Context, eventIDs []string) ([]type
return d.Events(ctx, nids) return d.Events(ctx, nids)
} }
// UnsentEvents gets a list of events that have persisted but haven't yet been
// confirmed sent down the kaffka stream. Events should be sent in order.
func (d *Database) UnsentEvents(ctx context.Context) ([]types.Event, error) {
nids, err := d.statements.getUnsentEventNids(ctx)
if err != nil {
return nil, err
}
events, err := d.Events(ctx, nids)
if err != nil {
return nil, err
}
sort.Slice(events, func(i, j int) bool { return events[i].EventNID < events[j].EventNID })
return events, nil
}
type transaction struct { type transaction struct {
ctx context.Context ctx context.Context
txn *sql.Tx txn *sql.Tx