Add ability to exclude an event from responses to sync requests

This commit is contained in:
Brendan Abolivier 2018-11-12 12:33:13 +00:00
parent ca72f695d8
commit a9d174c46f
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
3 changed files with 72 additions and 40 deletions

View file

@ -133,6 +133,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
msg.AddsStateEventIDs,
msg.RemovesStateEventIDs,
msg.TransactionID,
false,
)
if err != nil {
return err

View file

@ -49,7 +49,12 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events (
add_state_ids TEXT[],
remove_state_ids TEXT[],
device_id TEXT, -- The local device that sent the event, if any
transaction_id TEXT -- The transaction id used to send the event, if any
transaction_id TEXT, -- The transaction id used to send the event, if any
-- Should the event be excluded from responses to /sync requests. Useful for
-- events retrieved through backfilling that have a position in the stream
-- that relates to the moment these were retrieved rather than the moment these
-- were emitted.
exclude_from_sync BOOL DEFAULT FALSE
);
-- for event selection
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id);
@ -57,19 +62,24 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev
const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" +
" room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" +
") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id"
" room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id, exclude_from_sync" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8) RETURNING id"
const selectEventsSQL = "" +
"SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)"
"SELECT id, event_json, exclude_from_sync FROM syncapi_output_room_events WHERE event_id = ANY($1)"
const selectRecentEventsSQL = "" +
"SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
"SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id DESC LIMIT $4"
const selectRecentEventsForSyncSQL = "" +
"SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
" ORDER BY id DESC LIMIT $4"
const selectEarlyEventsSQL = "" +
"SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" +
"SELECT id, event_json, exclude_from_sync, device_id, transaction_id FROM syncapi_output_room_events" +
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
" ORDER BY id ASC LIMIT $4"
@ -78,18 +88,20 @@ const selectMaxEventIDSQL = "" +
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT id, event_json, add_state_ids, remove_state_ids" +
"SELECT id, event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND exclude_from_sync = FALSE" + // So far, this request is only used for sync responses, so we can exclude unwanted events right away
" ORDER BY id ASC"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
selectRecentEventsStmt *sql.Stmt
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
}
func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
@ -109,6 +121,9 @@ func (s *outputRoomEventsStatements) prepare(db *sql.DB) (err error) {
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
return
}
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
return
}
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
return
}
@ -142,12 +157,13 @@ func (s *outputRoomEventsStatements) selectStateInRange(
for rows.Next() {
var (
streamPos int64
eventBytes []byte
addIDs pq.StringArray
delIDs pq.StringArray
streamPos int64
eventBytes []byte
excludeFromSync bool
addIDs pq.StringArray
delIDs pq.StringArray
)
if err := rows.Scan(&streamPos, &eventBytes, &addIDs, &delIDs); err != nil {
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
return nil, nil, err
}
// Sanity check for deleted state and whine if we see it. We don't need to do anything
@ -179,8 +195,9 @@ func (s *outputRoomEventsStatements) selectStateInRange(
stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = StreamEvent{
Event: ev,
StreamPosition: types.StreamPosition(streamPos),
Event: ev,
StreamPosition: types.StreamPosition(streamPos),
ExcludeFromSync: excludeFromSync,
}
}
@ -207,7 +224,7 @@ func (s *outputRoomEventsStatements) selectMaxEventID(
func (s *outputRoomEventsStatements) insertEvent(
ctx context.Context, txn *sql.Tx,
event *gomatrixserverlib.Event, addState, removeState []string,
transactionID *api.TransactionID,
transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos int64, err error) {
var deviceID, txnID *string
if transactionID != nil {
@ -225,17 +242,26 @@ func (s *outputRoomEventsStatements) insertEvent(
pq.StringArray(removeState),
deviceID,
txnID,
excludeFromSync,
).Scan(&streamPos)
return
}
// RecentEventsInRoom returns the most recent events in the given room, up to a maximum of 'limit'.
// selectRecentEvents returns the most recent events in the given room, up to a maximum of 'limit'.
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude
// from sync.
func (s *outputRoomEventsStatements) selectRecentEvents(
ctx context.Context, txn *sql.Tx,
roomID string, fromPos, toPos types.StreamPosition, limit int,
chronologicalOrder bool,
chronologicalOrder bool, onlySyncEvents bool,
) ([]StreamEvent, error) {
stmt := common.TxStmt(txn, s.selectRecentEventsStmt)
var stmt *sql.Stmt
if onlySyncEvents {
stmt = common.TxStmt(txn, s.selectRecentEventsForSyncStmt)
} else {
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
}
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
if err != nil {
return nil, err
@ -293,13 +319,14 @@ func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) {
var result []StreamEvent
for rows.Next() {
var (
streamPos int64
eventBytes []byte
deviceID *string
txnID *string
transactionID *api.TransactionID
streamPos int64
eventBytes []byte
excludeFromSync bool
deviceID *string
txnID *string
transactionID *api.TransactionID
)
if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil {
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &deviceID, &txnID); err != nil {
return nil, err
}
// TODO: Handle redacted events
@ -316,9 +343,10 @@ func rowsToStreamEvents(rows *sql.Rows) ([]StreamEvent, error) {
}
result = append(result, StreamEvent{
Event: ev,
StreamPosition: types.StreamPosition(streamPos),
TransactionID: transactionID,
Event: ev,
StreamPosition: types.StreamPosition(streamPos),
TransactionID: transactionID,
ExcludeFromSync: excludeFromSync,
})
}
return result, nil

View file

@ -43,8 +43,9 @@ type stateDelta struct {
// position for this event.
type StreamEvent struct {
gomatrixserverlib.Event
StreamPosition types.StreamPosition
TransactionID *api.TransactionID
StreamPosition types.StreamPosition
TransactionID *api.TransactionID
ExcludeFromSync bool
}
// SyncServerDatabase represents a sync server database
@ -111,11 +112,13 @@ func (d *SyncServerDatabase) WriteEvent(
ev *gomatrixserverlib.Event,
addStateEvents []gomatrixserverlib.Event,
addStateEventIDs, removeStateEventIDs []string,
transactionID *api.TransactionID,
transactionID *api.TransactionID, excludeFromSync bool,
) (streamPos types.StreamPosition, returnErr error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var err error
pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID)
pos, err := d.events.insertEvent(
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
)
if err != nil {
return err
}
@ -198,7 +201,7 @@ func (d *SyncServerDatabase) GetEventsInRange(
if backwardOrdering {
// We need all events matching to < streamPos < from
return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false)
return d.events.selectRecentEvents(ctx, nil, roomID, to, from, limit, false, false)
}
// We need all events from < streamPos < to
@ -318,7 +321,7 @@ func (d *SyncServerDatabase) CompleteSync(
var recentStreamEvents []StreamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos,
numRecentEventsPerRoom, true,
numRecentEventsPerRoom, true, true,
)
if err != nil {
return nil, err
@ -447,7 +450,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
endPos = delta.membershipPos
}
recentStreamEvents, err := d.events.selectRecentEvents(
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, true,
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom, true, true,
)
if err != nil {
return err