Revert "Don't leak timeline after leaving room"

This reverts commit e3068f4c20.
This commit is contained in:
Neil Alexander 2021-01-20 14:41:40 +00:00
parent e3068f4c20
commit e114744dcb
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
7 changed files with 29 additions and 174 deletions

View file

@ -49,9 +49,6 @@ type Database interface {
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
// MostRecentMembership returns the most recent membership event for the user, along with the global stream position.
MostRecentMembership(ctx context.Context, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.

View file

@ -130,10 +130,6 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectPositionInStreamSQL = "" +
"SELECT id FROM syncapi_output_room_events" +
" WHERE event_id = $1"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@ -144,7 +140,6 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectPositionInStreamStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -180,9 +175,6 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil {
return nil, err
}
return s, nil
}
@ -443,15 +435,6 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
// SelectPositionInStream returns the position of a given event in the
// global stream topology.
func (s *outputRoomEventsStatements) SelectPositionInStream(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos)
return
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {

View file

@ -500,20 +500,6 @@ func (d *Database) EventPositionInTopology(
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) MostRecentMembership(
ctx context.Context, roomID, userID string,
) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
event, err := d.CurrentRoomState.SelectStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID)
if err != nil {
return nil, 0, fmt.Errorf("d.CurrentRoomState.SelectStateEvent: %w", err)
}
pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, event.EventID())
if err != nil {
return nil, 0, fmt.Errorf("d.OutputEvents.SelectPositionInStream: %w", err)
}
return event, pos, nil
}
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) (*gomatrixserverlib.Filter, error) {

View file

@ -90,19 +90,14 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectPositionInStreamSQL = "" +
"SELECT id FROM syncapi_output_room_events" +
" WHERE event_id = $1"
type outputRoomEventsStatements struct {
db *sql.DB
streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectPositionInStreamStmt *sql.Stmt
db *sql.DB
streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -129,9 +124,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil {
return nil, err
}
return s, nil
}
@ -432,15 +424,6 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
// SelectPositionInStream returns the position of a given event in the
// global stream topology.
func (s *outputRoomEventsStatements) SelectPositionInStream(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos)
return
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {

View file

@ -63,7 +63,6 @@ type Events interface {
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
SelectPositionInStream(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, err error)
}
// Topology keeps track of the depths and stream positions for all events.

View file

@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -13,7 +12,6 @@ import (
type PDUStreamProvider struct {
StreamProvider
rsAPI rsapi.RoomserverInternalAPI
}
func (p *PDUStreamProvider) Setup() {
@ -52,14 +50,11 @@ func (p *PDUStreamProvider) CompleteSync(
return from
}
stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -79,7 +74,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device,
ctx, peek.RoomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -101,7 +96,7 @@ func (p *PDUStreamProvider) CompleteSync(
for _, roomID := range leaveRoomIDs {
var lr *types.LeaveResponse
lr, err = p.getLeaveResponseForCompleteSync(
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getLeaveResponseForCompleteSync failed")
@ -175,11 +170,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
if r.Backwards {
r.From = delta.MembershipPos
} else {
r.To = delta.MembershipPos
}
r.To = delta.MembershipPos
}
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, delta.RoomID, r,
@ -235,7 +226,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return nil
}
// nolint:gocyclo
func (p *PDUStreamProvider) getResponseForCompleteSync(
ctx context.Context,
roomID string,
@ -251,51 +241,6 @@ func (p *PDUStreamProvider) getResponseForCompleteSync(
if err != nil {
return
}
// Calculate the current history visibility rule.
historyVisibility := "joined"
for _, stateEvent := range stateEvents {
if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility {
var content struct {
HistoryVisibility string `json:"history_visibility"`
}
if err = json.Unmarshal(stateEvent.Content(), &content); err != nil {
break
}
historyVisibility = content.HistoryVisibility
}
}
if historyVisibility != "shared" && historyVisibility != "world_readable" {
var membershipEvent *gomatrixserverlib.HeaderedEvent
var membershipPos types.StreamPosition
membershipEvent, membershipPos, err = p.DB.MostRecentMembership(ctx, roomID, device.UserID)
if err != nil {
return
}
if membershipEvent != nil {
membership, _ := membershipEvent.Membership()
switch membership {
case "leave", "kick", "ban":
if r.Backwards {
r.From = membershipPos
} else {
r.To = membershipPos
}
queryReq := &rsapi.QueryStateAfterEventsRequest{
RoomID: roomID,
PrevEventIDs: []string{membershipEvent.EventID()},
}
queryRes := &rsapi.QueryStateAfterEventsResponse{}
if err = p.rsAPI.QueryStateAfterEvents(ctx, queryReq, queryRes); err != nil {
return
}
stateEvents = p.filterStateEventsAccordingToFilter(queryRes.StateEvents, stateFilter)
default:
}
}
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
@ -306,9 +251,7 @@ func (p *PDUStreamProvider) getResponseForCompleteSync(
return
}
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(
historyVisibility, recentStreamEvents, device, limited,
)
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited)
for _, event := range recentStreamEvents {
if event.HeaderedEvent.Event.StateKey() != nil {
@ -336,7 +279,7 @@ func (p *PDUStreamProvider) getResponseForCompleteSync(
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents = p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
return // nolint:nakedret
return
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
@ -385,64 +328,29 @@ func (p *PDUStreamProvider) getLeaveResponseForCompleteSync(
return lr, nil
}
// nolint:gocyclo
func (p *PDUStreamProvider) filterStateEventsAccordingToFilter(
stateEvents []*gomatrixserverlib.HeaderedEvent,
stateFilter *gomatrixserverlib.StateFilter,
) []*gomatrixserverlib.HeaderedEvent {
filterRooms, filterNotRooms := map[string]struct{}{}, map[string]struct{}{}
filterTypes, filterNotTypes := map[string]struct{}{}, map[string]struct{}{}
for _, r := range stateFilter.Rooms {
filterRooms[r] = struct{}{}
}
for _, r := range stateFilter.NotRooms {
filterNotRooms[r] = struct{}{}
}
for _, t := range stateFilter.Types {
filterTypes[t] = struct{}{}
}
for _, t := range stateFilter.NotTypes {
filterNotTypes[t] = struct{}{}
}
newState := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
for _, event := range stateEvents {
if len(filterRooms) > 0 {
if _, ok := filterRooms[event.RoomID()]; !ok {
continue
}
}
if len(filterNotRooms) > 0 {
if _, ok := filterNotRooms[event.RoomID()]; ok {
continue
}
}
if len(filterTypes) > 0 {
if _, ok := filterTypes[event.Type()]; !ok {
continue
}
}
if len(filterNotTypes) > 0 {
if _, ok := filterNotTypes[event.Type()]; ok {
continue
}
}
newState = append(newState, event)
}
return newState
}
// nolint:gocyclo
func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility(
visibility string,
recentStreamEvents []types.StreamEvent,
stateEvents []*gomatrixserverlib.HeaderedEvent,
device *userapi.Device,
limited bool,
) ([]types.StreamEvent, bool) {
// If the history is world_readable or shared then don't filter.
if visibility == "world_readable" || visibility == "shared" {
return recentStreamEvents, limited
for _, stateEvent := range stateEvents {
if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility {
var content struct {
HistoryVisibility string `json:"history_visibility"`
}
if err := json.Unmarshal(stateEvent.Content(), &content); err != nil {
break
}
switch content.HistoryVisibility {
case "world_readable", "shared":
return recentStreamEvents, limited
default:
break
}
}
}
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the

View file

@ -29,7 +29,6 @@ func NewSyncStreamProviders(
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
rsAPI: rsAPI,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},