Delete by int ID instead of string.

This was causing some events to not be deleted, as < an eventID doesn't
really make much sense.
This commit is contained in:
Andrew Morgan 2018-06-11 15:07:52 +01:00
parent cac8b9d14c
commit 7aa45b30ff
3 changed files with 23 additions and 16 deletions

View file

@ -50,7 +50,7 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
` `
const selectEventsByApplicationServiceIDSQL = "" + const selectEventsByApplicationServiceIDSQL = "" +
"SELECT event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " + "SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " +
"WHERE as_id = $1 ORDER BY id ASC LIMIT $2" "WHERE as_id = $1 ORDER BY id ASC LIMIT $2"
const countEventsByApplicationServiceIDSQL = "" + const countEventsByApplicationServiceIDSQL = "" +
@ -61,7 +61,7 @@ const insertEventSQL = "" +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
const deleteEventsBeforeAndIncludingIDSQL = "" + const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE event_id <= $1" "DELETE FROM appservice_events WHERE id <= $1"
type eventsStatements struct { type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt selectEventsByApplicationServiceIDStmt *sql.Stmt
@ -93,19 +93,21 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
} }
// selectEventsByApplicationServiceID takes in an application service ID and // selectEventsByApplicationServiceID takes in an application service ID and
// returns a slice of events that need to be sent to that application service. // returns a slice of events that need to be sent to that application service,
// as well as an int later used to remove these same events from the database
// once successfully sent to an application service.
func (s *eventsStatements) selectEventsByApplicationServiceID( func (s *eventsStatements) selectEventsByApplicationServiceID(
ctx context.Context, ctx context.Context,
applicationServiceID string, applicationServiceID string,
limit int, limit int,
) ( ) (
eventIDs []string, maxID int,
events []gomatrixserverlib.ApplicationServiceEvent, events []gomatrixserverlib.ApplicationServiceEvent,
err error, err error,
) { ) {
eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit) eventRows, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
if err != nil { if err != nil {
return nil, nil, err return 0, nil, err
} }
defer func() { defer func() {
err = eventRows.Close() err = eventRows.Close()
@ -119,7 +121,9 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
for eventRows.Next() { for eventRows.Next() {
var event gomatrixserverlib.ApplicationServiceEvent var event gomatrixserverlib.ApplicationServiceEvent
var eventContent sql.NullString var eventContent sql.NullString
var id int
err = eventRows.Scan( err = eventRows.Scan(
&id,
&event.EventID, &event.EventID,
&event.OriginServerTimestamp, &event.OriginServerTimestamp,
&event.RoomID, &event.RoomID,
@ -128,12 +132,14 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
&eventContent, &eventContent,
) )
if err != nil { if err != nil {
return nil, nil, err return 0, nil, err
} }
if eventContent.Valid { if eventContent.Valid {
event.Content = gomatrixserverlib.RawJSON(eventContent.String) event.Content = gomatrixserverlib.RawJSON(eventContent.String)
} }
eventIDs = append(eventIDs, event.EventID) if id > maxID {
maxID = id
}
// Get age of the event from original timestamp and current time // Get age of the event from original timestamp and current time
ageMilli := time.Now().UnixNano() / int64(time.Millisecond) ageMilli := time.Now().UnixNano() / int64(time.Millisecond)
@ -187,8 +193,8 @@ func (s *eventsStatements) insertEvent(
// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. // deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsBeforeAndIncludingID( func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context, ctx context.Context,
eventID string, eventTableID int,
) (err error) { ) (err error) {
_, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventID) _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventTableID)
return err return err
} }

View file

@ -67,7 +67,7 @@ func (d *Database) GetEventsWithAppServiceID(
ctx context.Context, ctx context.Context,
appServiceID string, appServiceID string,
limit int, limit int,
) ([]string, []gomatrixserverlib.ApplicationServiceEvent, error) { ) (int, []gomatrixserverlib.ApplicationServiceEvent, error) {
return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit)
} }
@ -80,13 +80,14 @@ func (d *Database) CountEventsWithAppServiceID(
return d.events.countEventsByApplicationServiceID(ctx, appServiceID) return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
} }
// RemoveEventsBeforeAndIncludingID removes events from the database given a slice of their // RemoveEventsBeforeAndIncludingID removes all events from the database that
// event IDs. // are less than or equal to a given maximum ID. IDs here are implemented as a
// serial, thus this should always delete events in chronological order.
func (d *Database) RemoveEventsBeforeAndIncludingID( func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context, ctx context.Context,
eventID string, eventTableID int,
) error { ) error {
return d.events.deleteEventsBeforeAndIncludingID(ctx, eventID) return d.events.deleteEventsBeforeAndIncludingID(ctx, eventTableID)
} }
// GetTxnIDWithAppServiceID takes in an application service ID and returns the // GetTxnIDWithAppServiceID takes in an application service ID and returns the

View file

@ -101,7 +101,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.EventsReady = false ws.EventsReady = false
ws.Cond.L.Unlock() ws.Cond.L.Unlock()
eventIDs, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize) maxID, events, err := db.GetEventsWithAppServiceID(ctx, ws.AppService.ID, transactionBatchSize)
if err != nil { if err != nil {
log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB", log.WithError(err).Errorf("appservice %s worker unable to read queued events from DB",
ws.AppService.ID) ws.AppService.ID)
@ -132,7 +132,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
ws.Backoff = 0 ws.Backoff = 0
// Remove sent events from the DB // Remove sent events from the DB
err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) err = db.RemoveEventsBeforeAndIncludingID(ctx, maxID)
if err != nil { if err != nil {
log.WithError(err).Fatalf("unable to remove appservice events from the database for %s", log.WithError(err).Fatalf("unable to remove appservice events from the database for %s",
ws.AppService.ID) ws.AppService.ID)