Don't leak timeline after leaving room

This commit is contained in:
Neil Alexander 2021-01-20 13:25:16 +00:00
parent 33f8d0e278
commit e3068f4c20
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
7 changed files with 175 additions and 30 deletions

View file

@ -49,6 +49,9 @@ type Database interface {
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
// MostRecentMembership returns the most recent membership event for the user, along with the global stream position.
MostRecentMembership(ctx context.Context, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.

View file

@ -130,6 +130,10 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectPositionInStreamSQL = "" +
"SELECT id FROM syncapi_output_room_events" +
" WHERE event_id = $1"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@ -140,6 +144,7 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectPositionInStreamStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -175,6 +180,9 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil {
return nil, err
}
return s, nil
}
@ -435,6 +443,15 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
// SelectPositionInStream returns the position of a given event in the
// global stream topology.
func (s *outputRoomEventsStatements) SelectPositionInStream(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos)
return
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {

View file

@ -500,6 +500,20 @@ func (d *Database) EventPositionInTopology(
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) MostRecentMembership(
ctx context.Context, roomID, userID string,
) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
event, err := d.CurrentRoomState.SelectStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID)
if err != nil {
return nil, 0, fmt.Errorf("d.CurrentRoomState.SelectStateEvent: %w", err)
}
pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, event.EventID())
if err != nil {
return nil, 0, fmt.Errorf("d.OutputEvents.SelectPositionInStream: %w", err)
}
return event, pos, nil
}
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) (*gomatrixserverlib.Filter, error) {

View file

@ -90,14 +90,19 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectPositionInStreamSQL = "" +
"SELECT id FROM syncapi_output_room_events" +
" WHERE event_id = $1"
type outputRoomEventsStatements struct {
db *sql.DB
streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
db *sql.DB
streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectPositionInStreamStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -124,6 +129,9 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil {
return nil, err
}
return s, nil
}
@ -424,6 +432,15 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
// SelectPositionInStream returns the position of a given event in the
// global stream topology.
func (s *outputRoomEventsStatements) SelectPositionInStream(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos)
return
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {

View file

@ -63,6 +63,7 @@ type Events interface {
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
SelectPositionInStream(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, err error)
}
// Topology keeps track of the depths and stream positions for all events.