From 10f2e8d92fe5606d70573ed978acaa21fb62c7e3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 20 Jan 2021 14:41:09 +0000 Subject: [PATCH] Start his vis --- syncapi/storage/interface.go | 5 + .../postgres/output_room_events_table.go | 17 ++ syncapi/storage/shared/syncserver.go | 24 +++ .../sqlite3/output_room_events_table.go | 31 +++- syncapi/storage/tables/interface.go | 1 + syncapi/streams/stream_pdu.go | 165 +++++++++++++++--- syncapi/streams/streams.go | 1 + 7 files changed, 214 insertions(+), 30 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 22d80161..87c1f99e 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -49,6 +49,9 @@ type Database interface { PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error) RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error) + // MostRecentMembership returns the most recent membership event for the user, along with the global stream position. + MostRecentMembership(ctx context.Context, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) + // AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs. AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error) // AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices. @@ -110,6 +113,8 @@ type Database interface { GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error) // EventPositionInTopology returns the depth and stream position of the given event. EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error) + // EventPositionInStream returns the global stream position of the given event. + EventPositionInStream(ctx context.Context, eventID string) (types.StreamPosition, error) // BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events. BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error) // MaxTopologicalPosition returns the highest topological position for a given room. diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index 28668de0..1b3125f8 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -130,6 +130,10 @@ const selectStateInRangeSQL = "" + const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" +const selectPositionInStreamSQL = "" + + "SELECT id FROM syncapi_output_room_events" + + " WHERE event_id = $1" + type outputRoomEventsStatements struct { insertEventStmt *sql.Stmt selectEventsStmt *sql.Stmt @@ -140,6 +144,7 @@ type outputRoomEventsStatements struct { selectStateInRangeStmt *sql.Stmt updateEventJSONStmt *sql.Stmt deleteEventsForRoomStmt *sql.Stmt + selectPositionInStreamStmt *sql.Stmt } func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { @@ -175,6 +180,9 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) { if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil { return nil, err } + if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil { + return nil, err + } return s, nil } @@ -435,6 +443,15 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom( return err } +// SelectPositionInStream returns the position of a given event in the +// global stream topology. +func (s *outputRoomEventsStatements) SelectPositionInStream( + ctx context.Context, txn *sql.Tx, eventID string, +) (pos types.StreamPosition, err error) { + err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos) + return +} + func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 9df07693..4fb99a0a 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -500,6 +500,30 @@ func (d *Database) EventPositionInTopology( return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil } +func (d *Database) EventPositionInStream( + ctx context.Context, eventID string, +) (types.StreamPosition, error) { + pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, eventID) + if err != nil { + return 0, err + } + return pos, nil +} + +func (d *Database) MostRecentMembership( + ctx context.Context, roomID, userID string, +) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) { + event, err := d.CurrentRoomState.SelectStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID) + if err != nil { + return nil, 0, fmt.Errorf("d.CurrentRoomState.SelectStateEvent: %w", err) + } + pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, event.EventID()) + if err != nil { + return nil, 0, fmt.Errorf("d.OutputEvents.SelectPositionInStream: %w", err) + } + return event, pos, nil +} + func (d *Database) GetFilter( ctx context.Context, localpart string, filterID string, ) (*gomatrixserverlib.Filter, error) { diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 019aba8b..51d5fb68 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -90,14 +90,19 @@ const selectStateInRangeSQL = "" + const deleteEventsForRoomSQL = "" + "DELETE FROM syncapi_output_room_events WHERE room_id = $1" +const selectPositionInStreamSQL = "" + + "SELECT id FROM syncapi_output_room_events" + + " WHERE event_id = $1" + type outputRoomEventsStatements struct { - db *sql.DB - streamIDStatements *streamIDStatements - insertEventStmt *sql.Stmt - selectEventsStmt *sql.Stmt - selectMaxEventIDStmt *sql.Stmt - updateEventJSONStmt *sql.Stmt - deleteEventsForRoomStmt *sql.Stmt + db *sql.DB + streamIDStatements *streamIDStatements + insertEventStmt *sql.Stmt + selectEventsStmt *sql.Stmt + selectMaxEventIDStmt *sql.Stmt + updateEventJSONStmt *sql.Stmt + deleteEventsForRoomStmt *sql.Stmt + selectPositionInStreamStmt *sql.Stmt } func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) { @@ -124,6 +129,9 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil { return nil, err } + if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil { + return nil, err + } return s, nil } @@ -424,6 +432,15 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom( return err } +// SelectPositionInStream returns the position of a given event in the +// global stream topology. +func (s *outputRoomEventsStatements) SelectPositionInStream( + ctx context.Context, txn *sql.Tx, eventID string, +) (pos types.StreamPosition, err error) { + err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos) + return +} + func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) { var result []types.StreamEvent for rows.Next() { diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 73967677..7d935a71 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -63,6 +63,7 @@ type Events interface { UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error // DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely. DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error) + SelectPositionInStream(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, err error) } // Topology keeps track of the depths and stream positions for all events. diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 39c31be1..9a6625ac 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" + rsapi "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/syncapi/types" userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/gomatrixserverlib" @@ -12,6 +13,7 @@ import ( type PDUStreamProvider struct { StreamProvider + rsAPI rsapi.RoomserverInternalAPI } func (p *PDUStreamProvider) Setup() { @@ -50,11 +52,14 @@ func (p *PDUStreamProvider) CompleteSync( return from } + stateFilter := req.Filter.Room.State + eventFilter := req.Filter.Room.Timeline + // Build up a /sync response. Add joined rooms. for _, roomID := range joinedRoomIDs { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device, + ctx, roomID, r, &stateFilter, &eventFilter, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -74,7 +79,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device, + ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -96,7 +101,7 @@ func (p *PDUStreamProvider) CompleteSync( for _, roomID := range leaveRoomIDs { var lr *types.LeaveResponse lr, err = p.getLeaveResponseForCompleteSync( - ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device, + ctx, roomID, r, &stateFilter, &eventFilter, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getLeaveResponseForCompleteSync failed") @@ -170,7 +175,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( // TODO: This will fail on join -> leave -> sensitive msg -> join -> leave // in a single /sync request // This is all "okay" assuming history_visibility == "shared" which it is by default. - r.To = delta.MembershipPos + if r.Backwards { + r.From = delta.MembershipPos + } else { + r.To = delta.MembershipPos + } } recentStreamEvents, limited, err := p.DB.RecentEvents( ctx, delta.RoomID, r, @@ -226,6 +235,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return nil } +// nolint:gocyclo func (p *PDUStreamProvider) getResponseForCompleteSync( ctx context.Context, roomID string, @@ -241,6 +251,78 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( if err != nil { return } + + // Calculate the current history visibility rule. + historyVisibility := "joined" + var historyVisibilityEvent *gomatrixserverlib.HeaderedEvent + for _, stateEvent := range stateEvents { + if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility { + var content struct { + HistoryVisibility string `json:"history_visibility"` + } + if err = json.Unmarshal(stateEvent.Content(), &content); err != nil { + break + } + historyVisibility = content.HistoryVisibility + historyVisibilityEvent = stateEvent + break + } + } + + switch historyVisibility { + case "invited", "joined": + // Get the most recent membership event of the user and check if + // they are still in the room. If not then we will restrict how + // much of the room the user can see - they won't see beyond their + // leave event. + var membershipEvent *gomatrixserverlib.HeaderedEvent + var membershipPos types.StreamPosition + membershipEvent, membershipPos, err = p.DB.MostRecentMembership(ctx, roomID, device.UserID) + if err != nil { + return + } + if membershipEvent == nil { + return + } + membership, _ := membershipEvent.Membership() + switch membership { + case "leave", "ban", "kick": + if r.Backwards { + r.From = membershipPos + } else { + r.To = membershipPos + } + queryReq := &rsapi.QueryStateAfterEventsRequest{ + RoomID: roomID, + PrevEventIDs: []string{membershipEvent.EventID()}, + } + queryRes := &rsapi.QueryStateAfterEventsResponse{} + if err = p.rsAPI.QueryStateAfterEvents(ctx, queryReq, queryRes); err != nil { + return + } + stateEvents = p.filterStateEventsAccordingToFilter(queryRes.StateEvents, stateFilter) + default: + } + + case "shared": + // Find the stream position of the history visibility event + // and use that as a boundary instead. + var historyVisibilityPosition types.StreamPosition + historyVisibilityPosition, err = p.DB.EventPositionInStream(ctx, historyVisibilityEvent.EventID()) + if err != nil { + return + } + if r.Backwards { + r.To = historyVisibilityPosition + } else { + r.From = historyVisibilityPosition + } + + case "world_readable": + // Do nothing, as it's OK to reveal the entire timeline in a + // world-readable room. + } + // TODO: When filters are added, we may need to call this multiple times to get enough events. // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 var recentStreamEvents []types.StreamEvent @@ -251,7 +333,9 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( return } - recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited) + recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility( + historyVisibility, recentStreamEvents, device, limited, + ) for _, event := range recentStreamEvents { if event.HeaderedEvent.Event.StateKey() != nil { @@ -279,7 +363,7 @@ func (p *PDUStreamProvider) getResponseForCompleteSync( // "Can sync a room with a message with a transaction id" - which does a complete sync to check. recentEvents = p.DB.StreamEventsToEvents(device, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) - return + return // nolint:nakedret } func (p *PDUStreamProvider) getJoinResponseForCompleteSync( @@ -329,28 +413,63 @@ func (p *PDUStreamProvider) getLeaveResponseForCompleteSync( } // nolint:gocyclo -func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility( - recentStreamEvents []types.StreamEvent, +func (p *PDUStreamProvider) filterStateEventsAccordingToFilter( stateEvents []*gomatrixserverlib.HeaderedEvent, + stateFilter *gomatrixserverlib.StateFilter, +) []*gomatrixserverlib.HeaderedEvent { + filterRooms, filterNotRooms := map[string]struct{}{}, map[string]struct{}{} + filterTypes, filterNotTypes := map[string]struct{}{}, map[string]struct{}{} + for _, r := range stateFilter.Rooms { + filterRooms[r] = struct{}{} + } + for _, r := range stateFilter.NotRooms { + filterNotRooms[r] = struct{}{} + } + for _, t := range stateFilter.Types { + filterTypes[t] = struct{}{} + } + for _, t := range stateFilter.NotTypes { + filterNotTypes[t] = struct{}{} + } + + newState := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents)) + for _, event := range stateEvents { + if len(filterRooms) > 0 { + if _, ok := filterRooms[event.RoomID()]; !ok { + continue + } + } + if len(filterNotRooms) > 0 { + if _, ok := filterNotRooms[event.RoomID()]; ok { + continue + } + } + if len(filterTypes) > 0 { + if _, ok := filterTypes[event.Type()]; !ok { + continue + } + } + if len(filterNotTypes) > 0 { + if _, ok := filterNotTypes[event.Type()]; ok { + continue + } + } + newState = append(newState, event) + } + + return newState +} + +// nolint:gocyclo +func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility( + visibility string, + recentStreamEvents []types.StreamEvent, device *userapi.Device, limited bool, ) ([]types.StreamEvent, bool) { // If the history is world_readable or shared then don't filter. - for _, stateEvent := range stateEvents { - if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility { - var content struct { - HistoryVisibility string `json:"history_visibility"` - } - if err := json.Unmarshal(stateEvent.Content(), &content); err != nil { - break - } - switch content.HistoryVisibility { - case "world_readable", "shared": - return recentStreamEvents, limited - default: - break - } - } + if visibility == "world_readable" || visibility == "shared" { + return recentStreamEvents, limited } // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the diff --git a/syncapi/streams/streams.go b/syncapi/streams/streams.go index ba4118df..d4e65a17 100644 --- a/syncapi/streams/streams.go +++ b/syncapi/streams/streams.go @@ -29,6 +29,7 @@ func NewSyncStreamProviders( streams := &Streams{ PDUStreamProvider: &PDUStreamProvider{ StreamProvider: StreamProvider{DB: d}, + rsAPI: rsAPI, }, TypingStreamProvider: &TypingStreamProvider{ StreamProvider: StreamProvider{DB: d},