mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
(broken) rewrite to use SelectPeeksInRange rather than MarkPeeksAsOld for idempotency
This commit is contained in:
parent
7f41f39684
commit
9f2bc6231f
5 changed files with 54 additions and 106 deletions
|
@ -31,7 +31,6 @@ CREATE TABLE IF NOT EXISTS syncapi_peeks (
|
|||
room_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
new BOOL NOT NULL DEFAULT true,
|
||||
deleted BOOL NOT NULL DEFAULT false,
|
||||
-- When the peek was created in UNIX epoch ms.
|
||||
creation_ts BIGINT NOT NULL
|
||||
|
@ -52,15 +51,14 @@ const deletePeekSQL = "" +
|
|||
const deletePeeksSQL = "" +
|
||||
"UPDATE syncapi_peeks SET deleted=true, id=nextval('syncapi_stream_id') WHERE room_id = $1 AND user_id = $2 RETURNING id"
|
||||
|
||||
const selectPeeksSQL = "" +
|
||||
"SELECT room_id, new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND deleted=false"
|
||||
// we care about all the peeks which were created in this range, deleted in this range,
|
||||
// or were created before this range but haven't been deleted yet.
|
||||
const selectPeeksInRangeSQL = "" +
|
||||
"SELECT room_id, deleted, (id > $3 AND id <= $4) AS new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND ((id <= $3 AND NOT deleted) OR new)"
|
||||
|
||||
const selectPeekingDevicesSQL = "" +
|
||||
"SELECT room_id, user_id, device_id FROM syncapi_peeks WHERE deleted=false"
|
||||
|
||||
const markPeeksAsOldSQL = "" +
|
||||
"UPDATE syncapi_peeks SET id=nextval('syncapi_stream_id'), new=false WHERE user_id = $1 AND device_id = $2 AND deleted=false RETURNING id"
|
||||
|
||||
const selectMaxPeekIDSQL = "" +
|
||||
"SELECT MAX(id) FROM syncapi_peeks"
|
||||
|
||||
|
@ -69,9 +67,8 @@ type peekStatements struct {
|
|||
insertPeekStmt *sql.Stmt
|
||||
deletePeekStmt *sql.Stmt
|
||||
deletePeeksStmt *sql.Stmt
|
||||
selectPeeksStmt *sql.Stmt
|
||||
selectPeeksInRangeStmt *sql.Stmt
|
||||
selectPeekingDevicesStmt *sql.Stmt
|
||||
markPeeksAsOldStmt *sql.Stmt
|
||||
selectMaxPeekIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
|
@ -92,15 +89,12 @@ func NewPostgresPeeksTable(db *sql.DB) (tables.Peeks, error) {
|
|||
if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectPeeksStmt, err = db.Prepare(selectPeeksSQL); err != nil {
|
||||
if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.markPeeksAsOldStmt, err = db.Prepare(markPeeksAsOldSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -132,18 +126,18 @@ func (s *peekStatements) DeletePeeks(
|
|||
return
|
||||
}
|
||||
|
||||
func (s *peekStatements) SelectPeeks(
|
||||
ctx context.Context, txn *sql.Tx, userID, deviceID string,
|
||||
func (s *peekStatements) SelectPeeksInRange(
|
||||
ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range,
|
||||
) (peeks []types.Peek, err error) {
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectPeeksStmt).QueryContext(ctx, userID, deviceID)
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectPeeksInRangeStmt).QueryContext(ctx, userID, deviceID, r.Low(), r.High())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeks: rows.close() failed")
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeksInRange: rows.close() failed")
|
||||
|
||||
for rows.Next() {
|
||||
peek := types.Peek{}
|
||||
if err = rows.Scan(&peek.RoomID, &peek.New); err != nil {
|
||||
if err = rows.Scan(&peek.RoomID, &peek.Deleted, &peek.New); err != nil {
|
||||
return
|
||||
}
|
||||
peeks = append(peeks, peek)
|
||||
|
@ -152,14 +146,6 @@ func (s *peekStatements) SelectPeeks(
|
|||
return peeks, rows.Err()
|
||||
}
|
||||
|
||||
func (s *peekStatements) MarkPeeksAsOld(
|
||||
ctx context.Context, txn *sql.Tx, userID, deviceID string,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.markPeeksAsOldStmt)
|
||||
err = stmt.QueryRowContext(ctx, userID, deviceID).Scan(&streamPos)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *peekStatements) SelectPeekingDevices(
|
||||
ctx context.Context,
|
||||
) (peekingDevices map[string][]types.PeekingDevice, err error) {
|
||||
|
|
|
@ -694,19 +694,21 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
|
|||
}
|
||||
|
||||
// Add peeked rooms.
|
||||
peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, deviceID)
|
||||
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, deviceID, r)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, peek := range peeks {
|
||||
var jr *types.JoinResponse
|
||||
jr, err = d.getJoinResponseForCompleteSync(
|
||||
ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
if !peek.Deleted {
|
||||
var jr *types.JoinResponse
|
||||
jr, err = d.getJoinResponseForCompleteSync(
|
||||
ctx, txn, peek.RoomID, r, &stateFilter, numRecentEventsPerRoom,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
res.Rooms.Peek[peek.RoomID] = *jr
|
||||
}
|
||||
res.Rooms.Peek[peek.RoomID] = *jr
|
||||
}
|
||||
|
||||
if err = d.addInvitesToResponse(ctx, txn, userID, r, res); err != nil {
|
||||
|
@ -1034,13 +1036,12 @@ func (d *Database) getStateDeltas(
|
|||
|
||||
// find out which rooms this user is peeking, if any.
|
||||
// We do this before joins so any peeks get overwritten
|
||||
peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.ID)
|
||||
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// add peek blocks
|
||||
newPeeks := false
|
||||
for _, peek := range peeks {
|
||||
if peek.New {
|
||||
// send full room state down instead of a delta
|
||||
|
@ -1050,23 +1051,13 @@ func (d *Database) getStateDeltas(
|
|||
return nil, nil, err
|
||||
}
|
||||
state[peek.RoomID] = s
|
||||
newPeeks = true
|
||||
}
|
||||
|
||||
deltas = append(deltas, stateDelta{
|
||||
membership: gomatrixserverlib.Peek,
|
||||
stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
|
||||
roomID: peek.RoomID,
|
||||
})
|
||||
}
|
||||
|
||||
if newPeeks {
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
_, err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
if !peek.Deleted {
|
||||
deltas = append(deltas, stateDelta{
|
||||
membership: gomatrixserverlib.Peek,
|
||||
stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
|
||||
roomID: peek.RoomID,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1130,35 +1121,23 @@ func (d *Database) getStateDeltasForFullStateSync(
|
|||
// Use a reasonable initial capacity
|
||||
deltas := make(map[string]stateDelta)
|
||||
|
||||
peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.ID)
|
||||
peeks, err := d.Peeks.SelectPeeksInRange(ctx, txn, userID, device.ID, r)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Add full states for all peeking rooms
|
||||
newPeeks := false
|
||||
for _, peek := range peeks {
|
||||
if peek.New {
|
||||
newPeeks = true
|
||||
}
|
||||
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter)
|
||||
if stateErr != nil {
|
||||
return nil, nil, stateErr
|
||||
}
|
||||
deltas[peek.RoomID] = stateDelta{
|
||||
membership: gomatrixserverlib.Peek,
|
||||
stateEvents: d.StreamEventsToEvents(device, s),
|
||||
roomID: peek.RoomID,
|
||||
}
|
||||
}
|
||||
|
||||
if newPeeks {
|
||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||
_, err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
if !peek.Deleted {
|
||||
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter)
|
||||
if stateErr != nil {
|
||||
return nil, nil, stateErr
|
||||
}
|
||||
deltas[peek.RoomID] = stateDelta{
|
||||
membership: gomatrixserverlib.Peek,
|
||||
stateEvents: d.StreamEventsToEvents(device, s),
|
||||
roomID: peek.RoomID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ CREATE TABLE IF NOT EXISTS syncapi_peeks (
|
|||
room_id TEXT NOT NULL,
|
||||
user_id TEXT NOT NULL,
|
||||
device_id TEXT NOT NULL,
|
||||
new BOOL NOT NULL DEFAULT true,
|
||||
deleted BOOL NOT NULL DEFAULT false,
|
||||
-- When the peek was created in UNIX epoch ms.
|
||||
creation_ts INTEGER NOT NULL
|
||||
|
@ -52,15 +51,14 @@ const deletePeekSQL = "" +
|
|||
const deletePeeksSQL = "" +
|
||||
"UPDATE syncapi_peeks SET deleted=true, id=$1 WHERE room_id = $2 AND user_id = $3"
|
||||
|
||||
const selectPeeksSQL = "" +
|
||||
"SELECT room_id, new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND deleted=false"
|
||||
// we care about all the peeks which were created in this range, deleted in this range,
|
||||
// or were created before this range but haven't been deleted yet.
|
||||
const selectPeeksInRangeSQL = "" +
|
||||
"SELECT room_id, deleted, (id > $3 AND id <= $4) AS new FROM syncapi_peeks WHERE user_id = $1 AND device_id = $2 AND ((id <= $3 AND NOT deleted) OR new)"
|
||||
|
||||
const selectPeekingDevicesSQL = "" +
|
||||
"SELECT room_id, user_id, device_id FROM syncapi_peeks WHERE deleted=false"
|
||||
|
||||
const markPeeksAsOldSQL = "" +
|
||||
"UPDATE syncapi_peeks SET new=false, id=$1 WHERE user_id = $2 AND device_id = $3 AND deleted=false"
|
||||
|
||||
const selectMaxPeekIDSQL = "" +
|
||||
"SELECT MAX(id) FROM syncapi_peeks"
|
||||
|
||||
|
@ -70,9 +68,8 @@ type peekStatements struct {
|
|||
insertPeekStmt *sql.Stmt
|
||||
deletePeekStmt *sql.Stmt
|
||||
deletePeeksStmt *sql.Stmt
|
||||
selectPeeksStmt *sql.Stmt
|
||||
selectPeeksInRangeStmt *sql.Stmt
|
||||
selectPeekingDevicesStmt *sql.Stmt
|
||||
markPeeksAsOldStmt *sql.Stmt
|
||||
selectMaxPeekIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
|
@ -94,15 +91,12 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *streamIDStatements) (tables.Peeks
|
|||
if s.deletePeeksStmt, err = db.Prepare(deletePeeksSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectPeeksStmt, err = db.Prepare(selectPeeksSQL); err != nil {
|
||||
if s.selectPeeksInRangeStmt, err = db.Prepare(selectPeeksInRangeSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.markPeeksAsOldStmt, err = db.Prepare(markPeeksAsOldSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectMaxPeekIDStmt, err = db.Prepare(selectMaxPeekIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -143,18 +137,18 @@ func (s *peekStatements) DeletePeeks(
|
|||
return
|
||||
}
|
||||
|
||||
func (s *peekStatements) SelectPeeks(
|
||||
ctx context.Context, txn *sql.Tx, userID, deviceID string,
|
||||
func (s *peekStatements) SelectPeeksInRange(
|
||||
ctx context.Context, txn *sql.Tx, userID, deviceID string, r types.Range,
|
||||
) (peeks []types.Peek, err error) {
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectPeeksStmt).QueryContext(ctx, userID, deviceID)
|
||||
rows, err := sqlutil.TxStmt(txn, s.selectPeeksInRangeStmt).QueryContext(ctx, userID, deviceID, r.Low(), r.High())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeks: rows.close() failed")
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeksInRange: rows.close() failed")
|
||||
|
||||
for rows.Next() {
|
||||
peek := types.Peek{}
|
||||
if err = rows.Scan(&peek.RoomID, &peek.New); err != nil {
|
||||
if err = rows.Scan(&peek.RoomID, &peek.Deleted, &peek.New); err != nil {
|
||||
return
|
||||
}
|
||||
peeks = append(peeks, peek)
|
||||
|
@ -163,17 +157,6 @@ func (s *peekStatements) SelectPeeks(
|
|||
return peeks, rows.Err()
|
||||
}
|
||||
|
||||
func (s *peekStatements) MarkPeeksAsOld(
|
||||
ctx context.Context, txn *sql.Tx, userID, deviceID string,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = sqlutil.TxStmt(txn, s.markPeeksAsOldStmt).ExecContext(ctx, streamPos, userID, deviceID)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *peekStatements) SelectPeekingDevices(
|
||||
ctx context.Context,
|
||||
) (peekingDevices map[string][]types.PeekingDevice, err error) {
|
||||
|
|
|
@ -43,9 +43,8 @@ type Peeks interface {
|
|||
InsertPeek(ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string) (streamPos types.StreamPosition, err error)
|
||||
DeletePeek(ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string) (streamPos types.StreamPosition, err error)
|
||||
DeletePeeks(ctx context.Context, txn *sql.Tx, roomID, userID string) (streamPos types.StreamPosition, err error)
|
||||
SelectPeeks(ctxt context.Context, txn *sql.Tx, userID, deviceID string) (peeks []types.Peek, err error)
|
||||
SelectPeeksInRange(ctxt context.Context, txn *sql.Tx, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
|
||||
SelectPeekingDevices(ctxt context.Context) (peekingDevices map[string][]types.PeekingDevice, err error)
|
||||
MarkPeeksAsOld(ctxt context.Context, txn *sql.Tx, userID, deviceID string) (streamPos types.StreamPosition, err error)
|
||||
SelectMaxPeekID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
}
|
||||
|
||||
|
|
|
@ -516,6 +516,7 @@ type PeekingDevice struct {
|
|||
}
|
||||
|
||||
type Peek struct {
|
||||
RoomID string
|
||||
New bool
|
||||
RoomID string
|
||||
New bool
|
||||
Deleted bool
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue