From 9f2bc6231f7b5e3082af3d58d647a727a656b48b Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 8 Sep 2020 11:05:54 +0100 Subject: [PATCH] (broken) rewrite to use SelectPeeksInRange rather than MarkPeeksAsOld for idempotency --- syncapi/storage/postgres/peeks_table.go | 36 ++++-------- syncapi/storage/shared/syncserver.go | 77 +++++++++---------------- syncapi/storage/sqlite3/peeks_table.go | 39 ++++--------- syncapi/storage/tables/interface.go | 3 +- syncapi/types/types.go | 5 +- 5 files changed, 54 insertions(+), 106 deletions(-) diff --git a/syncapi/storage/postgres/peeks_table.go b/syncapi/storage/postgres/peeks_table.go index c8938a2a..43b5532e 100644 --- a/syncapi/storage/postgres/peeks_table.go +++ b/syncapi/storage/postgres/peeks_table.go @@ -31,7 +31,6 @@ CREATE TABLE IF NOT EXISTS syncapi_peeks ( room_id TEXT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, - new BOOL NOT NULL DEFAULT true, deleted BOOL NOT NULL DEFAULT false, -- When the peek was created in UNIX epoch ms. creation_ts BIGINT NOT NULL @@ -52,15 +51,14 @@ const deletePeekSQL = "" + const deletePeeksSQL = "" + "UPDATE syncapi_peeks SET deleted=true, id=nextval('syncapi_stream_id') WHERE room_id = $1 AND user_id = $2 RETURNING id" -const selectPeeksSQL = "" + - "SELECT room_id, new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND deleted=false" +// we care about all the peeks which were created in this range, deleted in this range, +// or were created before this range but haven't been deleted yet. +const selectPeeksInRangeSQL = "" + + "SELECT room_id, deleted, (id > $3 AND id <= $4) AS new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND ((id <= $3 AND NOT deleted) OR new)" const selectPeekingDevicesSQL = "" + "SELECT room_id, user_id, device_id FROM syncapi_peeks WHERE deleted=false" -const markPeeksAsOldSQL = "" + - "UPDATE syncapi_peeks SET id=nextval('syncapi_stream_id'), new=false WHERE user_id = $1 AND device_id = $2 AND deleted=false RETURNING id" - const selectMaxPeekIDSQL = "" + "SELECT MAX(id) FROM syncapi_peeks" @@ -69,9 +67,8 @@ type peekStatements struct { insertPeekStmt *sql.Stmt deletePeekStmt *sql.Stmt deletePeeksStmt *sql.Stmt - selectPeeksStmt *sql.Stmt + selectPeeksInRangeStmt *sql.Stmt selectPeekingDevicesStmt *sql.Stmt - markPeeksAsOldStmt *sql.Stmt selectMaxPeekIDStmt *sql.Stmt } @@ -92,15 +89,12 @@ func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) { if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil { return nil, err } - if s.selectPeeksStmt, err = db.Prepare(selectPeeksSQL); err != nil { + if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil { return nil, err } if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil { return nil, err } - if s.markPeeksAsOldStmt, err = db.Prepare(markPeeksAsOldSQL); err != nil { - return nil, err - } if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil { return nil, err } @@ -132,18 +126,18 @@ func (s *peekStatements) DeletePeeks( return } -func (s *peekStatements) SelectPeeks( - ctx context.Context, txn *sql.Tx, userID, deviceID string, +func (s *peekStatements) SelectPeeksInRange( + ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range, ) (peeks []types.Peek, err error) { - rows, err := sqlutil.TxStmt(txn, s.selectPeeksStmt).QueryContext(ctx, userID, deviceID) + rows, err := sqlutil.TxStmt(txn, s.selectPeeksInRangeStmt).QueryContext(ctx, userID, deviceID, r.Low(), r.High()) if err != nil { return } - defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeks: rows.close() failed") + defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeksInRange: rows.close() failed") for rows.Next() { peek := types.Peek{} - if err = rows.Scan(&peek.RoomID, &peek.New); err != nil { + if err = rows.Scan(&peek.RoomID, &peek.Deleted, &peek.New); err != nil { return } peeks = append(peeks, peek) @@ -152,14 +146,6 @@ func (s *peekStatements) SelectPeeks( return peeks, rows.Err() } -func (s *peekStatements) MarkPeeksAsOld( - ctx context.Context, txn *sql.Tx, userID, deviceID string, -) (streamPos types.StreamPosition, err error) { - stmt := sqlutil.TxStmt(txn, s.markPeeksAsOldStmt) - err = stmt.QueryRowContext(ctx, userID, deviceID).Scan(&streamPos) - return -} - func (s *peekStatements) SelectPeekingDevices( ctx context.Context, ) (peekingDevices map[string][]types.PeekingDevice, err error) { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 32c799d1..91d14ab3 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -694,19 +694,21 @@ func (d *Database) getResponseWithPDUsForCompleteSync( } // Add peeked rooms. - peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, deviceID) + peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r) if err != nil { return } for _, peek := range peeks { - var jr *types.JoinResponse - jr, err = d.getJoinResponseForCompleteSync( - ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, - ) - if err != nil { - return + if !peek.Deleted { + var jr *types.JoinResponse + jr, err = d.getJoinResponseForCompleteSync( + ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom, + ) + if err != nil { + return + } + res.Rooms.Peek[peek.RoomID] = *jr } - res.Rooms.Peek[peek.RoomID] = *jr } if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil { @@ -1034,13 +1036,12 @@ func (d *Database) getStateDeltas( // find out which rooms this user is peeking, if any. // We do this before joins so any peeks get overwritten - peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.ID) + peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) if err != nil { return nil, nil, err } // add peek blocks - newPeeks := false for _, peek := range peeks { if peek.New { // send full room state down instead of a delta @@ -1050,23 +1051,13 @@ func (d *Database) getStateDeltas( return nil, nil, err } state[peek.RoomID] = s - newPeeks = true } - - deltas = append(deltas, stateDelta{ - membership: gomatrixserverlib.Peek, - stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]), - roomID: peek.RoomID, - }) - } - - if newPeeks { - err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { - _, err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID) - return err - }) - if err != nil { - return nil, nil, err + if !peek.Deleted { + deltas = append(deltas, stateDelta{ + membership: gomatrixserverlib.Peek, + stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]), + roomID: peek.RoomID, + }) } } @@ -1130,35 +1121,23 @@ func (d *Database) getStateDeltasForFullStateSync( // Use a reasonable initial capacity deltas := make(map[string]stateDelta) - peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.ID) + peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r) if err != nil { return nil, nil, err } // Add full states for all peeking rooms - newPeeks := false for _, peek := range peeks { - if peek.New { - newPeeks = true - } - s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) - if stateErr != nil { - return nil, nil, stateErr - } - deltas[peek.RoomID] = stateDelta{ - membership: gomatrixserverlib.Peek, - stateEvents: d.StreamEventsToEvents(device, s), - roomID: peek.RoomID, - } - } - - if newPeeks { - err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { - _, err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID) - return err - }) - if err != nil { - return nil, nil, err + if !peek.Deleted { + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + if stateErr != nil { + return nil, nil, stateErr + } + deltas[peek.RoomID] = stateDelta{ + membership: gomatrixserverlib.Peek, + stateEvents: d.StreamEventsToEvents(device, s), + roomID: peek.RoomID, + } } } diff --git a/syncapi/storage/sqlite3/peeks_table.go b/syncapi/storage/sqlite3/peeks_table.go index c6722901..c3aacfd3 100644 --- a/syncapi/storage/sqlite3/peeks_table.go +++ b/syncapi/storage/sqlite3/peeks_table.go @@ -31,7 +31,6 @@ CREATE TABLE IF NOT EXISTS syncapi_peeks ( room_id TEXT NOT NULL, user_id TEXT NOT NULL, device_id TEXT NOT NULL, - new BOOL NOT NULL DEFAULT true, deleted BOOL NOT NULL DEFAULT false, -- When the peek was created in UNIX epoch ms. creation_ts INTEGER NOT NULL @@ -52,15 +51,14 @@ const deletePeekSQL = "" + const deletePeeksSQL = "" + "UPDATE syncapi_peeks SET deleted=true, id=$1 WHERE room_id = $2 AND user_id = $3" -const selectPeeksSQL = "" + - "SELECT room_id, new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND deleted=false" +// we care about all the peeks which were created in this range, deleted in this range, +// or were created before this range but haven't been deleted yet. +const selectPeeksInRangeSQL = "" + + "SELECT room_id, deleted, (id > $3 AND id <= $4) AS new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND ((id <= $3 AND NOT deleted) OR new)" const selectPeekingDevicesSQL = "" + "SELECT room_id, user_id, device_id FROM syncapi_peeks WHERE deleted=false" -const markPeeksAsOldSQL = "" + - "UPDATE syncapi_peeks SET new=false, id=$1 WHERE user_id = $2 AND device_id = $3 AND deleted=false" - const selectMaxPeekIDSQL = "" + "SELECT MAX(id) FROM syncapi_peeks" @@ -70,9 +68,8 @@ type peekStatements struct { insertPeekStmt *sql.Stmt deletePeekStmt *sql.Stmt deletePeeksStmt *sql.Stmt - selectPeeksStmt *sql.Stmt + selectPeeksInRangeStmt *sql.Stmt selectPeekingDevicesStmt *sql.Stmt - markPeeksAsOldStmt *sql.Stmt selectMaxPeekIDStmt *sql.Stmt } @@ -94,15 +91,12 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *streamIDStatements) (tables.Peeks if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil { return nil, err } - if s.selectPeeksStmt, err = db.Prepare(selectPeeksSQL); err != nil { + if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil { return nil, err } if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil { return nil, err } - if s.markPeeksAsOldStmt, err = db.Prepare(markPeeksAsOldSQL); err != nil { - return nil, err - } if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil { return nil, err } @@ -143,18 +137,18 @@ func (s *peekStatements) DeletePeeks( return } -func (s *peekStatements) SelectPeeks( - ctx context.Context, txn *sql.Tx, userID, deviceID string, +func (s *peekStatements) SelectPeeksInRange( + ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range, ) (peeks []types.Peek, err error) { - rows, err := sqlutil.TxStmt(txn, s.selectPeeksStmt).QueryContext(ctx, userID, deviceID) + rows, err := sqlutil.TxStmt(txn, s.selectPeeksInRangeStmt).QueryContext(ctx, userID, deviceID, r.Low(), r.High()) if err != nil { return } - defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeks: rows.close() failed") + defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeksInRange: rows.close() failed") for rows.Next() { peek := types.Peek{} - if err = rows.Scan(&peek.RoomID, &peek.New); err != nil { + if err = rows.Scan(&peek.RoomID, &peek.Deleted, &peek.New); err != nil { return } peeks = append(peeks, peek) @@ -163,17 +157,6 @@ func (s *peekStatements) SelectPeeks( return peeks, rows.Err() } -func (s *peekStatements) MarkPeeksAsOld( - ctx context.Context, txn *sql.Tx, userID, deviceID string, -) (streamPos types.StreamPosition, err error) { - streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn) - if err != nil { - return - } - _, err = sqlutil.TxStmt(txn, s.markPeeksAsOldStmt).ExecContext(ctx, streamPos, userID, deviceID) - return -} - func (s *peekStatements) SelectPeekingDevices( ctx context.Context, ) (peekingDevices map[string][]types.PeekingDevice, err error) { diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index d189d7e3..631746c6 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -43,9 +43,8 @@ type Peeks interface { InsertPeek(ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string) (streamPos types.StreamPosition, err error) DeletePeek(ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string) (streamPos types.StreamPosition, err error) DeletePeeks(ctx context.Context, txn *sql.Tx, roomID, userID string) (streamPos types.StreamPosition, err error) - SelectPeeks(ctxt context.Context, txn *sql.Tx, userID, deviceID string) (peeks []types.Peek, err error) + SelectPeeksInRange(ctxt context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) SelectPeekingDevices(ctxt context.Context) (peekingDevices map[string][]types.PeekingDevice, err error) - MarkPeeksAsOld(ctxt context.Context, txn *sql.Tx, userID, deviceID string) (streamPos types.StreamPosition, err error) SelectMaxPeekID(ctx context.Context, txn *sql.Tx) (id int64, err error) } diff --git a/syncapi/types/types.go b/syncapi/types/types.go index 0c737a29..2499976e 100644 --- a/syncapi/types/types.go +++ b/syncapi/types/types.go @@ -516,6 +516,7 @@ type PeekingDevice struct { } type Peek struct { - RoomID string - New bool + RoomID string + New bool + Deleted bool }