diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go index 98f97433..031342d1 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/appservice_events_table.go @@ -17,11 +17,8 @@ package storage import ( "context" "database/sql" - "errors" - "strconv" "time" - "github.com/lib/pq" "github.com/matrix-org/gomatrixserverlib" ) @@ -29,45 +26,47 @@ const appserviceEventsSchema = ` -- Stores events to be sent to application services CREATE TABLE IF NOT EXISTS appservice_events ( -- An auto-incrementing id unique to each event in the table - id SERIAL NOT NULL PRIMARY KEY, + id BIGSERIAL NOT NULL PRIMARY KEY, -- The ID of the application service the event will be sent to - as_id TEXT, + as_id TEXT NOT NULL, -- The ID of the event - event_id TEXT, + event_id TEXT NOT NULL, -- Unix seconds that the event was sent at from the originating server - origin_server_ts BIGINT, + origin_server_ts BIGINT NOT NULL, -- The ID of the room that the event was sent in - room_id TEXT, + room_id TEXT NOT NULL, -- The type of the event (e.g. m.text) - type TEXT, + type TEXT NOT NULL, -- The ID of the user that sent the event - sender TEXT, - -- The JSON representation of the event. Text to avoid db JSON parsing - event_json TEXT, + sender TEXT NOT NULL, + -- The JSON representation of the event's content. Text to avoid db JSON parsing + event_content TEXT, -- The ID of the transaction that this event is a part of - txn_id INTEGER + txn_id INTEGER NOT NULL ); + +CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_event(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + - "SELECT event_id, origin_server_ts, room_id, type, sender, event_json FROM appservice_events " + - "WHERE as_id = $1 ORDER BY as_id LIMIT $2" + "SELECT event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " + + "WHERE as_id = $1 ORDER BY id ASC LIMIT $2" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" const insertEventSQL = "" + - "INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_json, txn_id) " + + "INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " + "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" -const deleteEventsByIDSQL = "" + - "DELETE FROM appservice_events WHERE event_id = ANY($1)" +const deleteEventsBeforeAndIncludingIDSQL = "" + + "DELETE FROM appservice_events WHERE event_id <= $1" type eventsStatements struct { selectEventsByApplicationServiceIDStmt *sql.Stmt countEventsByApplicationServiceIDStmt *sql.Stmt insertEventStmt *sql.Stmt - deleteEventsByIDStmt *sql.Stmt + deleteEventsBeforeAndIncludingIDStmt *sql.Stmt } func (s *eventsStatements) prepare(db *sql.DB) (err error) { @@ -85,7 +84,7 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) { if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { return } - if s.deleteEventsByIDStmt, err = db.Prepare(deleteEventsByIDSQL); err != nil { + if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil { return } @@ -111,38 +110,31 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( // Iterate through each row and store event contents for eventRows.Next() { - var eventID, originServerTimestamp, roomID, eventType, sender, eventContent *string - if err = eventRows.Scan( - &eventID, - &originServerTimestamp, - &roomID, - &eventType, - &sender, + var event gomatrixserverlib.ApplicationServiceEvent + var eventContent sql.NullString + err = eventRows.Scan( + &event.EventID, + &event.OriginServerTimestamp, + &event.RoomID, + &event.Type, + &event.UserID, &eventContent, - ); err != nil || eventID == nil || roomID == nil || eventType == nil || sender == nil || eventContent == nil { - return nil, nil, err - } - eventIDs = append(eventIDs, *eventID) - - // Get age of the event from original timestamp and current time - timestamp, err := strconv.ParseInt(*originServerTimestamp, 10, 64) + ) if err != nil { return nil, nil, err } - ageMilli := time.Now().UnixNano() / int64(time.Millisecond) - age := ageMilli - timestamp - - // Fit event content into AS event format - event := gomatrixserverlib.ApplicationServiceEvent{ - Age: age, - Content: gomatrixserverlib.RawJSON(*eventContent), - EventID: *eventID, - OriginServerTimestamp: timestamp, - RoomID: *roomID, - Sender: *sender, - Type: *eventType, - UserID: *sender, + if eventContent.Valid { + event.Content = gomatrixserverlib.RawJSON(eventContent.String) } + eventIDs = append(eventIDs, event.EventID) + + // Get age of the event from original timestamp and current time + ageMilli := time.Now().UnixNano() / int64(time.Millisecond) + event.Age = ageMilli - event.OriginServerTimestamp + + // TODO: Synapse does this. Do app services really require this? :) + event.Sender = event.UserID + events = append(events, event) } @@ -155,16 +147,13 @@ func (s *eventsStatements) countEventsByApplicationServiceID( ctx context.Context, appServiceID string, ) (int, error) { - var count *int + var count int err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) - if err != nil { + if err != nil && err != sql.ErrNoRows { return 0, err } - if count == nil { - return 0, errors.New("NULL value for application service count") - } - return *count, nil + return count, nil } // insertEvent inserts an event mapped to its corresponding application service @@ -188,11 +177,11 @@ func (s *eventsStatements) insertEvent( return } -// deleteEventsByID removes events matching given IDs from the database. -func (s *eventsStatements) deleteEventsByID( +// deleteEventsBeforeAndIncludingID removes events matching given IDs from the database. +func (s *eventsStatements) deleteEventsBeforeAndIncludingID( ctx context.Context, - eventIDs []string, + eventID string, ) (err error) { - _, err = s.deleteEventsByIDStmt.ExecContext(ctx, pq.StringArray(eventIDs)) + _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventID) return err } diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go index 0e0237f4..12b8f001 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/storage.go @@ -80,13 +80,13 @@ func (d *Database) CountEventsWithAppServiceID( return d.events.countEventsByApplicationServiceID(ctx, appServiceID) } -// RemoveEventsByID removes events from the database given a slice of their +// RemoveEventsBeforeAndIncludingID removes events from the database given a slice of their // event IDs. -func (d *Database) RemoveEventsByID( +func (d *Database) RemoveEventsBeforeAndIncludingID( ctx context.Context, - eventIDs []string, + eventID string, ) error { - return d.events.deleteEventsByID(ctx, eventIDs) + return d.events.deleteEventsBeforeAndIncludingID(ctx, eventID) } // GetTxnIDWithAppServiceID takes in an application service ID and returns the diff --git a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go index 72f4c578..ac0fad2b 100644 --- a/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go +++ b/src/github.com/matrix-org/dendrite/appservice/storage/txn_id_counter_table.go @@ -30,7 +30,7 @@ CREATE TABLE IF NOT EXISTS txn_id_counter ( ` const selectTxnIDSQL = "" + - "SELECT txn_id FROM txn_id_counter WHERE as_id = $1 LIMIT 1" + "SELECT txn_id FROM txn_id_counter WHERE as_id = $1" const upsertTxnIDSQL = "" + "INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " + @@ -64,15 +64,7 @@ func (s *txnStatements) selectTxnID( ctx context.Context, appServiceID string, ) (txnID int, err error) { - rows, err := s.selectTxnIDStmt.QueryContext(ctx, appServiceID) - if err != nil { - return - } - defer rows.Close() // nolint: errcheck - - // Scan the TxnID from the database and return - rows.Next() - err = rows.Scan(&txnID) + err = s.selectTxnIDStmt.QueryRowContext(ctx, appServiceID).Scan(&txnID) return } diff --git a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go index c454ae5c..dc736865 100644 --- a/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go +++ b/src/github.com/matrix-org/dendrite/appservice/workers/transaction_scheduler.go @@ -146,7 +146,7 @@ func worker(db *storage.Database, as config.ApplicationService, ecm map[string]i ecm[as.ID] -= eventsSent // Remove sent events from the DB - err = db.RemoveEventsByID(ctx, eventIDs) + err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1]) if err != nil { logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s", as.ID)