Start his vis

This commit is contained in:
Neil Alexander 2021-01-20 14:41:09 +00:00
parent 33f8d0e278
commit 10f2e8d92f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
7 changed files with 214 additions and 30 deletions

View file

@ -49,6 +49,9 @@ type Database interface {
PeeksInRange(ctx context.Context, userID, deviceID string, r types.Range) (peeks []types.Peek, err error)
RoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
// MostRecentMembership returns the most recent membership event for the user, along with the global stream position.
MostRecentMembership(ctx context.Context, roomID, userID string) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error)
// AllJoinedUsersInRooms returns a map of room ID to a list of all joined user IDs.
AllJoinedUsersInRooms(ctx context.Context) (map[string][]string, error)
// AllPeekingDevicesInRooms returns a map of room ID to a list of all peeking devices.
@ -110,6 +113,8 @@ type Database interface {
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
// EventPositionInTopology returns the depth and stream position of the given event.
EventPositionInTopology(ctx context.Context, eventID string) (types.TopologyToken, error)
// EventPositionInStream returns the global stream position of the given event.
EventPositionInStream(ctx context.Context, eventID string) (types.StreamPosition, error)
// BackwardExtremitiesForRoom returns a map of backwards extremity event ID to a list of its prev_events.
BackwardExtremitiesForRoom(ctx context.Context, roomID string) (backwardExtremities map[string][]string, err error)
// MaxTopologicalPosition returns the highest topological position for a given room.

View file

@ -130,6 +130,10 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectPositionInStreamSQL = "" +
"SELECT id FROM syncapi_output_room_events" +
" WHERE event_id = $1"
type outputRoomEventsStatements struct {
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
@ -140,6 +144,7 @@ type outputRoomEventsStatements struct {
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectPositionInStreamStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -175,6 +180,9 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil {
return nil, err
}
return s, nil
}
@ -435,6 +443,15 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
// SelectPositionInStream returns the position of a given event in the
// global stream topology.
func (s *outputRoomEventsStatements) SelectPositionInStream(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos)
return
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {

View file

@ -500,6 +500,30 @@ func (d *Database) EventPositionInTopology(
return types.TopologyToken{Depth: depth, PDUPosition: stream}, nil
}
func (d *Database) EventPositionInStream(
ctx context.Context, eventID string,
) (types.StreamPosition, error) {
pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, eventID)
if err != nil {
return 0, err
}
return pos, nil
}
func (d *Database) MostRecentMembership(
ctx context.Context, roomID, userID string,
) (*gomatrixserverlib.HeaderedEvent, types.StreamPosition, error) {
event, err := d.CurrentRoomState.SelectStateEvent(ctx, roomID, gomatrixserverlib.MRoomMember, userID)
if err != nil {
return nil, 0, fmt.Errorf("d.CurrentRoomState.SelectStateEvent: %w", err)
}
pos, err := d.OutputEvents.SelectPositionInStream(ctx, nil, event.EventID())
if err != nil {
return nil, 0, fmt.Errorf("d.OutputEvents.SelectPositionInStream: %w", err)
}
return event, pos, nil
}
func (d *Database) GetFilter(
ctx context.Context, localpart string, filterID string,
) (*gomatrixserverlib.Filter, error) {

View file

@ -90,14 +90,19 @@ const selectStateInRangeSQL = "" +
const deleteEventsForRoomSQL = "" +
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
const selectPositionInStreamSQL = "" +
"SELECT id FROM syncapi_output_room_events" +
" WHERE event_id = $1"
type outputRoomEventsStatements struct {
db *sql.DB
streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
db *sql.DB
streamIDStatements *streamIDStatements
insertEventStmt *sql.Stmt
selectEventsStmt *sql.Stmt
selectMaxEventIDStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
deleteEventsForRoomStmt *sql.Stmt
selectPositionInStreamStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -124,6 +129,9 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.deleteEventsForRoomStmt, err = db.Prepare(deleteEventsForRoomSQL); err != nil {
return nil, err
}
if s.selectPositionInStreamStmt, err = db.Prepare(selectPositionInStreamSQL); err != nil {
return nil, err
}
return s, nil
}
@ -424,6 +432,15 @@ func (s *outputRoomEventsStatements) DeleteEventsForRoom(
return err
}
// SelectPositionInStream returns the position of a given event in the
// global stream topology.
func (s *outputRoomEventsStatements) SelectPositionInStream(
ctx context.Context, txn *sql.Tx, eventID string,
) (pos types.StreamPosition, err error) {
err = s.selectPositionInStreamStmt.QueryRowContext(ctx, eventID).Scan(&pos)
return
}
func rowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
var result []types.StreamEvent
for rows.Next() {

View file

@ -63,6 +63,7 @@ type Events interface {
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
DeleteEventsForRoom(ctx context.Context, txn *sql.Tx, roomID string) (err error)
SelectPositionInStream(ctx context.Context, txn *sql.Tx, eventID string) (pos types.StreamPosition, err error)
}
// Topology keeps track of the depths and stream positions for all events.

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
@ -12,6 +13,7 @@ import (
type PDUStreamProvider struct {
StreamProvider
rsAPI rsapi.RoomserverInternalAPI
}
func (p *PDUStreamProvider) Setup() {
@ -50,11 +52,14 @@ func (p *PDUStreamProvider) CompleteSync(
return from
}
stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
// Build up a /sync response. Add joined rooms.
for _, roomID := range joinedRoomIDs {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device,
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -74,7 +79,7 @@ func (p *PDUStreamProvider) CompleteSync(
if !peek.Deleted {
var jr *types.JoinResponse
jr, err = p.getJoinResponseForCompleteSync(
ctx, peek.RoomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device,
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -96,7 +101,7 @@ func (p *PDUStreamProvider) CompleteSync(
for _, roomID := range leaveRoomIDs {
var lr *types.LeaveResponse
lr, err = p.getLeaveResponseForCompleteSync(
ctx, roomID, r, &req.Filter.Room.State, &req.Filter.Room.Timeline, req.Device,
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
)
if err != nil {
req.Log.WithError(err).Error("p.getLeaveResponseForCompleteSync failed")
@ -170,7 +175,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
r.To = delta.MembershipPos
if r.Backwards {
r.From = delta.MembershipPos
} else {
r.To = delta.MembershipPos
}
}
recentStreamEvents, limited, err := p.DB.RecentEvents(
ctx, delta.RoomID, r,
@ -226,6 +235,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return nil
}
// nolint:gocyclo
func (p *PDUStreamProvider) getResponseForCompleteSync(
ctx context.Context,
roomID string,
@ -241,6 +251,78 @@ func (p *PDUStreamProvider) getResponseForCompleteSync(
if err != nil {
return
}
// Calculate the current history visibility rule.
historyVisibility := "joined"
var historyVisibilityEvent *gomatrixserverlib.HeaderedEvent
for _, stateEvent := range stateEvents {
if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility {
var content struct {
HistoryVisibility string `json:"history_visibility"`
}
if err = json.Unmarshal(stateEvent.Content(), &content); err != nil {
break
}
historyVisibility = content.HistoryVisibility
historyVisibilityEvent = stateEvent
break
}
}
switch historyVisibility {
case "invited", "joined":
// Get the most recent membership event of the user and check if
// they are still in the room. If not then we will restrict how
// much of the room the user can see - they won't see beyond their
// leave event.
var membershipEvent *gomatrixserverlib.HeaderedEvent
var membershipPos types.StreamPosition
membershipEvent, membershipPos, err = p.DB.MostRecentMembership(ctx, roomID, device.UserID)
if err != nil {
return
}
if membershipEvent == nil {
return
}
membership, _ := membershipEvent.Membership()
switch membership {
case "leave", "ban", "kick":
if r.Backwards {
r.From = membershipPos
} else {
r.To = membershipPos
}
queryReq := &rsapi.QueryStateAfterEventsRequest{
RoomID: roomID,
PrevEventIDs: []string{membershipEvent.EventID()},
}
queryRes := &rsapi.QueryStateAfterEventsResponse{}
if err = p.rsAPI.QueryStateAfterEvents(ctx, queryReq, queryRes); err != nil {
return
}
stateEvents = p.filterStateEventsAccordingToFilter(queryRes.StateEvents, stateFilter)
default:
}
case "shared":
// Find the stream position of the history visibility event
// and use that as a boundary instead.
var historyVisibilityPosition types.StreamPosition
historyVisibilityPosition, err = p.DB.EventPositionInStream(ctx, historyVisibilityEvent.EventID())
if err != nil {
return
}
if r.Backwards {
r.To = historyVisibilityPosition
} else {
r.From = historyVisibilityPosition
}
case "world_readable":
// Do nothing, as it's OK to reveal the entire timeline in a
// world-readable room.
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
@ -251,7 +333,9 @@ func (p *PDUStreamProvider) getResponseForCompleteSync(
return
}
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, stateEvents, device, limited)
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(
historyVisibility, recentStreamEvents, device, limited,
)
for _, event := range recentStreamEvents {
if event.HeaderedEvent.Event.StateKey() != nil {
@ -279,7 +363,7 @@ func (p *PDUStreamProvider) getResponseForCompleteSync(
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
recentEvents = p.DB.StreamEventsToEvents(device, recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
return
return // nolint:nakedret
}
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
@ -329,28 +413,63 @@ func (p *PDUStreamProvider) getLeaveResponseForCompleteSync(
}
// nolint:gocyclo
func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility(
recentStreamEvents []types.StreamEvent,
func (p *PDUStreamProvider) filterStateEventsAccordingToFilter(
stateEvents []*gomatrixserverlib.HeaderedEvent,
stateFilter *gomatrixserverlib.StateFilter,
) []*gomatrixserverlib.HeaderedEvent {
filterRooms, filterNotRooms := map[string]struct{}{}, map[string]struct{}{}
filterTypes, filterNotTypes := map[string]struct{}{}, map[string]struct{}{}
for _, r := range stateFilter.Rooms {
filterRooms[r] = struct{}{}
}
for _, r := range stateFilter.NotRooms {
filterNotRooms[r] = struct{}{}
}
for _, t := range stateFilter.Types {
filterTypes[t] = struct{}{}
}
for _, t := range stateFilter.NotTypes {
filterNotTypes[t] = struct{}{}
}
newState := make([]*gomatrixserverlib.HeaderedEvent, 0, len(stateEvents))
for _, event := range stateEvents {
if len(filterRooms) > 0 {
if _, ok := filterRooms[event.RoomID()]; !ok {
continue
}
}
if len(filterNotRooms) > 0 {
if _, ok := filterNotRooms[event.RoomID()]; ok {
continue
}
}
if len(filterTypes) > 0 {
if _, ok := filterTypes[event.Type()]; !ok {
continue
}
}
if len(filterNotTypes) > 0 {
if _, ok := filterNotTypes[event.Type()]; ok {
continue
}
}
newState = append(newState, event)
}
return newState
}
// nolint:gocyclo
func (p *PDUStreamProvider) filterStreamEventsAccordingToHistoryVisibility(
visibility string,
recentStreamEvents []types.StreamEvent,
device *userapi.Device,
limited bool,
) ([]types.StreamEvent, bool) {
// If the history is world_readable or shared then don't filter.
for _, stateEvent := range stateEvents {
if stateEvent.Type() == gomatrixserverlib.MRoomHistoryVisibility {
var content struct {
HistoryVisibility string `json:"history_visibility"`
}
if err := json.Unmarshal(stateEvent.Content(), &content); err != nil {
break
}
switch content.HistoryVisibility {
case "world_readable", "shared":
return recentStreamEvents, limited
default:
break
}
}
if visibility == "world_readable" || visibility == "shared" {
return recentStreamEvents, limited
}
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the

View file

@ -29,6 +29,7 @@ func NewSyncStreamProviders(
streams := &Streams{
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
rsAPI: rsAPI,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},