From 84d089caed52d91186eb19e0efabe9966747e208 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 21 Jan 2021 13:31:03 +0000 Subject: [PATCH] Still doesn't work --- syncapi/storage/interface.go | 4 +- syncapi/storage/shared/syncserver.go | 49 +++++++++++++++--- syncapi/streams/stream_pdu.go | 74 ++++++++++++++++------------ syncapi/sync/requestpool.go | 3 ++ 4 files changed, 88 insertions(+), 42 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index f5e67ffb..d1a90cba 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -36,8 +36,8 @@ type Database interface { MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) - GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) - GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) + GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, filter *gomatrixserverlib.Filter) ([]types.StateDelta, []string, error) + GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, filter *gomatrixserverlib.Filter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 1d16ca29..12f0ad37 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -692,7 +692,7 @@ func (d *Database) fetchMissingStateEvents( func (d *Database) GetStateDeltas( ctx context.Context, device *userapi.Device, r types.Range, userID string, - stateFilter *gomatrixserverlib.StateFilter, + filter *gomatrixserverlib.Filter, ) ([]types.StateDelta, []string, error) { // Implement membership change algorithm: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L821 // - Get membership list changes for this user in this sync response @@ -712,7 +712,7 @@ func (d *Database) GetStateDeltas( var deltas []types.StateDelta // get all the state events ever (i.e. for all available rooms) between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, &filter.Room.State) if err != nil { return nil, nil, err } @@ -733,7 +733,7 @@ func (d *Database) GetStateDeltas( if peek.New { // send full room state down instead of a delta var s []types.StreamEvent - s, err = d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + s, err = d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, &filter.Room.State) if err != nil { return nil, nil, err } @@ -760,7 +760,7 @@ func (d *Database) GetStateDeltas( if membership == gomatrixserverlib.Join { // send full room state down instead of a delta var s []types.StreamEvent - s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) + s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, &filter.Room.State) if err != nil { return nil, nil, err } @@ -792,6 +792,21 @@ func (d *Database) GetStateDeltas( }) } + if filter.Room.IncludeLeave { + // Add in left rooms + leftRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Leave) + if err != nil { + return nil, nil, err + } + for _, leftRoomID := range leftRoomIDs { + deltas = append(deltas, types.StateDelta{ + Membership: gomatrixserverlib.Leave, + //StateEvents: d.StreamEventsToEvents(device, state[leftRoomID]), + RoomID: leftRoomID, + }) + } + } + succeeded = true return deltas, joinedRoomIDs, nil } @@ -804,7 +819,7 @@ func (d *Database) GetStateDeltas( func (d *Database) GetStateDeltasForFullStateSync( ctx context.Context, device *userapi.Device, r types.Range, userID string, - stateFilter *gomatrixserverlib.StateFilter, + filter *gomatrixserverlib.Filter, ) ([]types.StateDelta, []string, error) { txn, err := d.readOnlySnapshot(ctx) if err != nil { @@ -824,7 +839,7 @@ func (d *Database) GetStateDeltasForFullStateSync( // Add full states for all peeking rooms for _, peek := range peeks { if !peek.Deleted { - s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, stateFilter) + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, peek.RoomID, &filter.Room.State) if stateErr != nil { return nil, nil, stateErr } @@ -837,7 +852,7 @@ func (d *Database) GetStateDeltasForFullStateSync( } // Get all the state events ever between these two positions - stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, stateFilter) + stateNeeded, eventMap, err := d.OutputEvents.SelectStateInRange(ctx, txn, r, &filter.Room.State) if err != nil { return nil, nil, err } @@ -870,7 +885,7 @@ func (d *Database) GetStateDeltasForFullStateSync( // Add full states for all joined rooms for _, joinedRoomID := range joinedRoomIDs { - s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter) + s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, &filter.Room.State) if stateErr != nil { return nil, nil, stateErr } @@ -881,6 +896,24 @@ func (d *Database) GetStateDeltasForFullStateSync( } } + if filter.Room.IncludeLeave { + leftRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Leave) + if err != nil { + return nil, nil, err + } + + // Add full states for all joined rooms + for _, leftRoomID := range leftRoomIDs { + deltas[leftRoomID] = types.StateDelta{ + Membership: gomatrixserverlib.Leave, + // We leave the caller to populate StateEvents instead of populating it + // here because we don't have access to the RS API and we don't know + // which event ID to use for the state. + RoomID: leftRoomID, + } + } + } + // Create a response array. result := make([]types.StateDelta, len(deltas)) i := 0 diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index 79731bda..55d494a9 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -135,12 +135,12 @@ func (p *PDUStreamProvider) IncrementalSync( eventFilter := req.Filter.Room.Timeline if req.WantFullState { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &req.Filter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed") return } } else { - if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil { + if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &req.Filter); err != nil { req.Log.WithError(err).Error("p.DB.GetStateDeltas failed") return } @@ -194,12 +194,13 @@ func (p *PDUStreamProvider) getHistoryVisibility( return historyVisibility, historyEventID, nil } +// nolint:gocyclo func (p *PDUStreamProvider) addRoomDeltaToResponse( ctx context.Context, device *userapi.Device, r types.Range, delta types.StateDelta, - _ *gomatrixserverlib.StateFilter, + stateFilter *gomatrixserverlib.StateFilter, eventFilter *gomatrixserverlib.RoomEventFilter, res *types.Response, ) error { @@ -208,7 +209,8 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return fmt.Errorf("p.getHistoryVisibility: %w", err) } - if r, _, err = p.limitBoundariesUsingHistoryVisibility( + var stateAtEvent string + if r, stateAtEvent, err = p.limitBoundariesUsingHistoryVisibility( ctx, delta.RoomID, device.UserID, historyVisibility, historyEventID, r, ); err != nil { return err @@ -221,6 +223,23 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( if err != nil { return err } + + // If the boundary has been truncated by history visibility then we + // must not reveal any state that comes after that. Returning the + // current state is no good. Instead, ask the roomserver for the + // state at the boundary event. + if len(delta.StateEvents) == 0 && stateAtEvent != "" { + queryReq := &rsapi.QueryStateAfterEventsRequest{ + RoomID: delta.RoomID, + PrevEventIDs: []string{stateAtEvent}, + } + queryRes := &rsapi.QueryStateAfterEventsResponse{} + if err = p.rsAPI.QueryStateAfterEvents(ctx, queryReq, queryRes); err != nil { + return err + } + delta.StateEvents = p.filterStateEventsAccordingToFilter(queryRes.StateEvents, stateFilter) + } + recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents) delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents) @@ -268,14 +287,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return nil } +// nolint:gocyclo func (p *PDUStreamProvider) limitBoundariesUsingHistoryVisibility( ctx context.Context, roomID, userID string, historyVisibility, historyEventID string, r types.Range, ) (types.Range, string, error) { - // Calculate the current history visibility rule. - var err error var joinPos types.StreamPosition var stateAtEventID string @@ -296,10 +314,6 @@ func (p *PDUStreamProvider) limitBoundariesUsingHistoryVisibility( // to the room. return r, stateAtEventID, nil } - - case "world_readable": - // It doesn't matter if the user is joined to the room or not - // when the history is world_readable. } // If the user is in the room then we next need to work out if we @@ -308,11 +322,11 @@ func (p *PDUStreamProvider) limitBoundariesUsingHistoryVisibility( switch historyVisibility { case "invited", "joined": if r.Backwards { - if r.To > joinPos { + if r.To < joinPos { r.To = joinPos } } else { - if r.From > joinPos { + if r.From < joinPos { r.From = joinPos } } @@ -320,32 +334,30 @@ func (p *PDUStreamProvider) limitBoundariesUsingHistoryVisibility( case "shared": // Find the stream position of the history visibility event // and use that as a boundary instead. - var historyVisibilityPosition types.StreamPosition - historyVisibilityPosition, err = p.DB.EventPositionInStream(ctx, historyEventID) - if err != nil { - return r, stateAtEventID, fmt.Errorf("p.DB.EventPositionInStream: %w", err) - } - if r.Backwards { - if r.To < historyVisibilityPosition { - r.To = historyVisibilityPosition + if historyEventID != "" { + var pos types.StreamPosition + pos, err = p.DB.EventPositionInStream(ctx, historyEventID) + if err != nil { + return r, stateAtEventID, fmt.Errorf("p.DB.EventPositionInStream: %w", err) } - } else { - if r.From < historyVisibilityPosition { - r.From = historyVisibilityPosition + if r.Backwards { + if r.To < pos { + r.To = pos + } + } else { + if r.From < pos { + r.From = pos + } } + stateAtEventID = historyEventID } - stateAtEventID = historyEventID - - case "world_readable": - // Do nothing, as it's OK to reveal the entire timeline in a - // world-readable room. } // Finally, work out if the user left the room. If they did then // we will request the state at the leave event from the roomserver. switch historyVisibility { case "invited", "joined", "shared": - if leaveEvent, leavePos, _, err := p.DB.MostRecentMembership(ctx, roomID, userID, []string{"leave", "ban", "kick"}); err == nil { + if ev, leavePos, _, err := p.DB.MostRecentMembership(ctx, roomID, userID, []string{"leave", "ban"}); err == nil { if r.Backwards { if r.From > leavePos { r.From = leavePos @@ -355,10 +367,8 @@ func (p *PDUStreamProvider) limitBoundariesUsingHistoryVisibility( r.To = leavePos } } - stateAtEventID = leaveEvent.EventID() + stateAtEventID = ev.EventID() } - - case "world_readable": } return r, stateAtEventID, nil diff --git a/syncapi/sync/requestpool.go b/syncapi/sync/requestpool.go index 384fc25c..4f161bb9 100644 --- a/syncapi/sync/requestpool.go +++ b/syncapi/sync/requestpool.go @@ -35,6 +35,7 @@ import ( userapi "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/util" "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" ) // RequestPool manages HTTP long-poll connections for /sync @@ -188,6 +189,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. if syncReq.Since.IsEmpty() { // Complete sync + logrus.Infof("Sync complete") syncReq.Response.NextBatch = types.StreamingToken{ PDUPosition: rp.streams.PDUStreamProvider.CompleteSync( syncReq.Context, syncReq, @@ -213,6 +215,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi. } } else { // Incremental sync + logrus.Infof("Sync since %s", syncReq.Since.String()) syncReq.Response.NextBatch = types.StreamingToken{ PDUPosition: rp.streams.PDUStreamProvider.IncrementalSync( syncReq.Context, syncReq,