blind stab at adding a peek section to /sync

This commit is contained in:
Matthew Hodgson 2020-08-31 15:28:24 +03:00
parent 9b79f9a883
commit d343b8fb2c
4 changed files with 101 additions and 10 deletions

View file

@ -606,6 +606,8 @@ func (d *Database) IncrementalSync(
}
}
// TODO: handle EDUs in peeked rooms
err = d.addEDUDeltaToResponse(
fromPos, toPos, joinedRoomIDs, res,
)
@ -742,6 +744,8 @@ func (d *Database) CompleteSync(
return nil, fmt.Errorf("d.getResponseWithPDUsForCompleteSync: %w", err)
}
// TODO: handle EDUs in peeked rooms
// Use a zero value SyncPosition for fromPos so all EDU states are added.
err = d.addEDUDeltaToResponse(
types.NewStreamToken(0, 0, nil), toPos, joinedRoomIDs, res,
@ -847,6 +851,14 @@ func (d *Database) addRoomDeltaToResponse(
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case gomatrixserverlib.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = prevBatch.String()
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Peek[delta.roomID] = *jr
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
@ -968,7 +980,7 @@ func (d *Database) getStateDeltas(
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
var deltas []stateDelta
// get all the state events ever between these two positions
// get all the state events ever (i.e. for all available rooms) between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {
return nil, nil, err
@ -978,6 +990,40 @@ func (d *Database) getStateDeltas(
return nil, nil, err
}
// find out which rooms this user is peeking, if any.
// We do this before joins so joins overwrite peeks
peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.DeviceID)
if err != nil {
return nil, nil, err
}
// add peek blocks
for _, peek := range peeks {
if peek.New {
// send full room state down instead of a delta
var s []types.StreamEvent
s, err = d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter)
if err != nil {
return nil, nil, err
}
state[roomID] = s
}
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, state[peek.RoomID]),
roomID: peek.RoomID,
})
}
if len(peeks) > 0 {
err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.DeviceID)
if err != nil {
return nil, nil, err
}
}
// handle newly joined rooms and non-joined rooms
for roomID, stateStreamEvents := range state {
for _, ev := range stateStreamEvents {
// TODO: Currently this will incorrectly add rooms which were ALREADY joined but they sent another no-op join event.
@ -1038,8 +1084,13 @@ func (d *Database) getStateDeltasForFullStateSync(
return nil, nil, err
}
peeks, err = d.Peeks.SelectPeeks(ctx, txn, userID, device,ID)
if err != nil {
return nil, nil, err
}
// Use a reasonable initial capacity
deltas := make([]stateDelta, 0, len(joinedRoomIDs))
deltas := make([]stateDelta, 0, len(joinedRoomIDs) + len(peeks))
// Add full states for all joined rooms
for _, joinedRoomID := range joinedRoomIDs {
@ -1054,6 +1105,26 @@ func (d *Database) getStateDeltasForFullStateSync(
})
}
// Add full states for all peeking rooms
for _, peek := range peeks {
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter)
if stateErr != nil {
return nil, nil, stateErr
}
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: peek.RoomID,
})
}
if len(peeks) > 0 {
err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.DeviceID)
if err != nil {
return nil, nil, err
}
}
// Get all the state events ever between these two positions
stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter)
if err != nil {

View file

@ -32,6 +32,7 @@ CREATE TABLE IF NOT EXISTS syncapi_peeks (
room_id TEXT NOT NULL,
user_id TEXT NOT NULL,
device_id TEXT NOT NULL,
new BOOL NOT NULL DEFAULT true,
-- When the peek was created in UNIX epoch ms.
creation_ts INTEGER NOT NULL,
);
@ -49,11 +50,14 @@ const deletePeekSQL = "" +
"DELETE FROM syncapi_peeks WHERE room_id = $1 AND user_id = $2 and device_id = $3"
const selectPeeksSQL == "" +
"SELECT room_id FROM syncapi_peeks WHERE user_id = $1 and device_id = $2"
"SELECT room_id, new FROM syncapi_peeks WHERE user_id = $1 and device_id = $2"
const selectPeekingDevicesSQL == "" +
"SELECT room_id, user_id, device_id FROM syncapi_peeks"
const markPeeksAsOldSQL == "" +
"UPDATE syncapi_peeks SET new=false WHERE user_id = $1 and device_id = $2"
type peekStatements struct {
db *sql.DB
insertPeekStmt *sql.Stmt
@ -82,6 +86,9 @@ func NewSqlitePeeksTable(db *sql.DB) (tables.Peeks, error) {
if s.selectPeekingDevicesStmt, err = db.Prepare(selectPeekingDevicesSQL); err != nil {
return nil, err
}
if s.markPeeksAsOldStmt, err = db.Prepare(markPeeksAsOldSQL); err != nil {
return nil, err
}
return s, nil
}
@ -110,7 +117,7 @@ func (s *peekStatements) DeletePeek(
func (s *peekStatements) SelectPeeks(
ctx context.Context, txn *sql.Tx, userID, deviceID string,
) (roomIDs []string, err error) {
) (peeks []Peek, err error) {
rows, err := sqlutil.TxStmt(txn, s.selectPeeksStmt).QueryContext(ctx, userID, deviceID)
if err != nil {
return
@ -118,14 +125,21 @@ func (s *peekStatements) SelectPeeks(
defer internal.CloseAndLogIfError(ctx, rows, "SelectPeeks: rows.close() failed")
for rows.Next() {
var roomID string
if err = rows.Scan(&roomId); err != nil {
peek = Peek{}
if err = rows.Scan(&peek.roomId, &peek.new); err != nil {
return
}
roomIDs = append(roomIDs, roomID)
peeks = append(peeks, peek)
}
return roomIDs, rows.Err()
return peeks, rows.Err()
}
func (s *peekStatements) MarkPeeksAsOld (
ctx context.Context, txn *sql.Tx, userID, deviceID string,
) (err error) {
_, err := sqlutil.TxStmt(txn, s.markPeeksAsOldStmt).ExecContext(ctx, userID, deviceID)
return
}
func (s *peekStatements) SelectPeekingDevices(