Fix up database interaction

This commit is contained in:
Andrew Morgan 2018-06-08 16:36:03 +01:00
parent 1bda57e6d4
commit 1f67fd9b89
4 changed files with 53 additions and 72 deletions

View file

@ -17,11 +17,8 @@ package storage
import ( import (
"context" "context"
"database/sql" "database/sql"
"errors"
"strconv"
"time" "time"
"github.com/lib/pq"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -29,45 +26,47 @@ const appserviceEventsSchema = `
-- Stores events to be sent to application services -- Stores events to be sent to application services
CREATE TABLE IF NOT EXISTS appservice_events ( CREATE TABLE IF NOT EXISTS appservice_events (
-- An auto-incrementing id unique to each event in the table -- An auto-incrementing id unique to each event in the table
id SERIAL NOT NULL PRIMARY KEY, id BIGSERIAL NOT NULL PRIMARY KEY,
-- The ID of the application service the event will be sent to -- The ID of the application service the event will be sent to
as_id TEXT, as_id TEXT NOT NULL,
-- The ID of the event -- The ID of the event
event_id TEXT, event_id TEXT NOT NULL,
-- Unix seconds that the event was sent at from the originating server -- Unix seconds that the event was sent at from the originating server
origin_server_ts BIGINT, origin_server_ts BIGINT NOT NULL,
-- The ID of the room that the event was sent in -- The ID of the room that the event was sent in
room_id TEXT, room_id TEXT NOT NULL,
-- The type of the event (e.g. m.text) -- The type of the event (e.g. m.text)
type TEXT, type TEXT NOT NULL,
-- The ID of the user that sent the event -- The ID of the user that sent the event
sender TEXT, sender TEXT NOT NULL,
-- The JSON representation of the event. Text to avoid db JSON parsing -- The JSON representation of the event's content. Text to avoid db JSON parsing
event_json TEXT, event_content TEXT,
-- The ID of the transaction that this event is a part of -- The ID of the transaction that this event is a part of
txn_id INTEGER txn_id INTEGER NOT NULL
); );
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_event(as_id);
` `
const selectEventsByApplicationServiceIDSQL = "" + const selectEventsByApplicationServiceIDSQL = "" +
"SELECT event_id, origin_server_ts, room_id, type, sender, event_json FROM appservice_events " + "SELECT event_id, origin_server_ts, room_id, type, sender, event_content FROM appservice_events " +
"WHERE as_id = $1 ORDER BY as_id LIMIT $2" "WHERE as_id = $1 ORDER BY id ASC LIMIT $2"
const countEventsByApplicationServiceIDSQL = "" + const countEventsByApplicationServiceIDSQL = "" +
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1" "SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
const insertEventSQL = "" + const insertEventSQL = "" +
"INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_json, txn_id) " + "INSERT INTO appservice_events(as_id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id) " +
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
const deleteEventsByIDSQL = "" + const deleteEventsBeforeAndIncludingIDSQL = "" +
"DELETE FROM appservice_events WHERE event_id = ANY($1)" "DELETE FROM appservice_events WHERE event_id <= $1"
type eventsStatements struct { type eventsStatements struct {
selectEventsByApplicationServiceIDStmt *sql.Stmt selectEventsByApplicationServiceIDStmt *sql.Stmt
countEventsByApplicationServiceIDStmt *sql.Stmt countEventsByApplicationServiceIDStmt *sql.Stmt
insertEventStmt *sql.Stmt insertEventStmt *sql.Stmt
deleteEventsByIDStmt *sql.Stmt deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
} }
func (s *eventsStatements) prepare(db *sql.DB) (err error) { func (s *eventsStatements) prepare(db *sql.DB) (err error) {
@ -85,7 +84,7 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil { if s.insertEventStmt, err = db.Prepare(insertEventSQL); err != nil {
return return
} }
if s.deleteEventsByIDStmt, err = db.Prepare(deleteEventsByIDSQL); err != nil { if s.deleteEventsBeforeAndIncludingIDStmt, err = db.Prepare(deleteEventsBeforeAndIncludingIDSQL); err != nil {
return return
} }
@ -111,38 +110,31 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
// Iterate through each row and store event contents // Iterate through each row and store event contents
for eventRows.Next() { for eventRows.Next() {
var eventID, originServerTimestamp, roomID, eventType, sender, eventContent *string var event gomatrixserverlib.ApplicationServiceEvent
if err = eventRows.Scan( var eventContent sql.NullString
&eventID, err = eventRows.Scan(
&originServerTimestamp, &event.EventID,
&roomID, &event.OriginServerTimestamp,
&eventType, &event.RoomID,
&sender, &event.Type,
&event.UserID,
&eventContent, &eventContent,
); err != nil || eventID == nil || roomID == nil || eventType == nil || sender == nil || eventContent == nil { )
return nil, nil, err
}
eventIDs = append(eventIDs, *eventID)
// Get age of the event from original timestamp and current time
timestamp, err := strconv.ParseInt(*originServerTimestamp, 10, 64)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
ageMilli := time.Now().UnixNano() / int64(time.Millisecond) if eventContent.Valid {
age := ageMilli - timestamp event.Content = gomatrixserverlib.RawJSON(eventContent.String)
// Fit event content into AS event format
event := gomatrixserverlib.ApplicationServiceEvent{
Age: age,
Content: gomatrixserverlib.RawJSON(*eventContent),
EventID: *eventID,
OriginServerTimestamp: timestamp,
RoomID: *roomID,
Sender: *sender,
Type: *eventType,
UserID: *sender,
} }
eventIDs = append(eventIDs, event.EventID)
// Get age of the event from original timestamp and current time
ageMilli := time.Now().UnixNano() / int64(time.Millisecond)
event.Age = ageMilli - event.OriginServerTimestamp
// TODO: Synapse does this. Do app services really require this? :)
event.Sender = event.UserID
events = append(events, event) events = append(events, event)
} }
@ -155,16 +147,13 @@ func (s *eventsStatements) countEventsByApplicationServiceID(
ctx context.Context, ctx context.Context,
appServiceID string, appServiceID string,
) (int, error) { ) (int, error) {
var count *int var count int
err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count) err := s.countEventsByApplicationServiceIDStmt.QueryRowContext(ctx, appServiceID).Scan(&count)
if err != nil { if err != nil && err != sql.ErrNoRows {
return 0, err return 0, err
} }
if count == nil {
return 0, errors.New("NULL value for application service count")
}
return *count, nil return count, nil
} }
// insertEvent inserts an event mapped to its corresponding application service // insertEvent inserts an event mapped to its corresponding application service
@ -188,11 +177,11 @@ func (s *eventsStatements) insertEvent(
return return
} }
// deleteEventsByID removes events matching given IDs from the database. // deleteEventsBeforeAndIncludingID removes events matching given IDs from the database.
func (s *eventsStatements) deleteEventsByID( func (s *eventsStatements) deleteEventsBeforeAndIncludingID(
ctx context.Context, ctx context.Context,
eventIDs []string, eventID string,
) (err error) { ) (err error) {
_, err = s.deleteEventsByIDStmt.ExecContext(ctx, pq.StringArray(eventIDs)) _, err = s.deleteEventsBeforeAndIncludingIDStmt.ExecContext(ctx, eventID)
return err return err
} }

View file

@ -80,13 +80,13 @@ func (d *Database) CountEventsWithAppServiceID(
return d.events.countEventsByApplicationServiceID(ctx, appServiceID) return d.events.countEventsByApplicationServiceID(ctx, appServiceID)
} }
// RemoveEventsByID removes events from the database given a slice of their // RemoveEventsBeforeAndIncludingID removes events from the database given a slice of their
// event IDs. // event IDs.
func (d *Database) RemoveEventsByID( func (d *Database) RemoveEventsBeforeAndIncludingID(
ctx context.Context, ctx context.Context,
eventIDs []string, eventID string,
) error { ) error {
return d.events.deleteEventsByID(ctx, eventIDs) return d.events.deleteEventsBeforeAndIncludingID(ctx, eventID)
} }
// GetTxnIDWithAppServiceID takes in an application service ID and returns the // GetTxnIDWithAppServiceID takes in an application service ID and returns the

View file

@ -30,7 +30,7 @@ CREATE TABLE IF NOT EXISTS txn_id_counter (
` `
const selectTxnIDSQL = "" + const selectTxnIDSQL = "" +
"SELECT txn_id FROM txn_id_counter WHERE as_id = $1 LIMIT 1" "SELECT txn_id FROM txn_id_counter WHERE as_id = $1"
const upsertTxnIDSQL = "" + const upsertTxnIDSQL = "" +
"INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " + "INSERT INTO txn_id_counter(as_id, txn_id) VALUES ($1, $2) " +
@ -64,15 +64,7 @@ func (s *txnStatements) selectTxnID(
ctx context.Context, ctx context.Context,
appServiceID string, appServiceID string,
) (txnID int, err error) { ) (txnID int, err error) {
rows, err := s.selectTxnIDStmt.QueryContext(ctx, appServiceID) err = s.selectTxnIDStmt.QueryRowContext(ctx, appServiceID).Scan(&txnID)
if err != nil {
return
}
defer rows.Close() // nolint: errcheck
// Scan the TxnID from the database and return
rows.Next()
err = rows.Scan(&txnID)
return return
} }

View file

@ -146,7 +146,7 @@ func worker(db *storage.Database, as config.ApplicationService, ecm map[string]i
ecm[as.ID] -= eventsSent ecm[as.ID] -= eventsSent
// Remove sent events from the DB // Remove sent events from the DB
err = db.RemoveEventsByID(ctx, eventIDs) err = db.RemoveEventsBeforeAndIncludingID(ctx, eventIDs[len(eventIDs)-1])
if err != nil { if err != nil {
logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s", logrus.WithError(err).Errorf("unable to remove appservice events from the database for %s",
as.ID) as.ID)