From 034ff3208c6c08b17874bd21b5578549b8580d29 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 8 Sep 2020 01:42:27 +0100 Subject: [PATCH] move peek-cancelling to consumer --- syncapi/consumers/roomserver.go | 32 +++++++++++++++++++++++ syncapi/storage/interface.go | 3 +++ syncapi/storage/shared/syncserver.go | 39 ++++++++++------------------ 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index b205e1e9..8e2feaf2 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -31,6 +31,7 @@ import ( // OutputRoomEventConsumer consumes events that originated in the room server. type OutputRoomEventConsumer struct { + cfg *config.SyncAPI rsAPI api.RoomserverInternalAPI rsConsumer *internal.ContinualConsumer db storage.Database @@ -55,6 +56,7 @@ func NewOutputRoomEventConsumer( PartitionStore: store, } s := &OutputRoomEventConsumer{ + cfg: cfg, rsConsumer: &consumer, db: store, notifier: n, @@ -168,6 +170,10 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( s.notifyKeyChanges(&ev) + if err = s.notifyJoinedPeeks(ctx, &ev); err != nil { + return err + } + return nil } @@ -186,6 +192,32 @@ func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.Headere } } +func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) (error) { + membership, err := ev.Membership() + if err != nil { + return nil + } + // TODO: check that it's a join and not a profile change (means unmarshalling prev_content) + if membership == gomatrixserverlib.Join { + // check it's a local join + _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) + if err != nil { + return err + } + if domain != s.cfg.Matrix.ServerName { + return nil + } + + // cancel any peeks for it + _, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey()) + // XXX: should we do anything with this new streampos? + if err != nil { + return err + } + } + return nil +} + func (s *OutputRoomEventConsumer) onNewInviteEvent( ctx context.Context, msg api.OutputNewInviteEvent, ) error { diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 3559a9e4..807c7f5e 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -86,6 +86,9 @@ type Database interface { // AddPeek adds a new peek to our DB for a given room by a given user's device. // Returns an error if there was a problem communicating with the database. AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error) + // DeletePeek deletes all peeks for a given room by a given user + // Returns an error if there was a problem communicating with the database. + DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error) // SetTypingTimeoutCallback sets a callback function that is called right after // a user is removed from the typing user list due to timeout. SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index c8c92753..6582656d 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -176,6 +176,19 @@ func (d *Database) AddPeek( return } +// DeletePeeks tracks the fact that a user has stopped peeking from all devices +// If the peeks was successfully deleted this returns the stream ID it was stored at. +// Returns an error if there was a problem communicating with the database. +func (d *Database) DeletePeeks( + ctx context.Context, roomID, userID string, +) (sp types.StreamPosition, err error) { + _ = d.Writer.Do(nil, nil, func(_ *sql.Tx) error { + sp, err = d.Peeks.DeletePeeks(ctx, nil, roomID, userID) + return nil + }) + return +} + // GetAccountDataInRange returns all account data for a given user inserted or // updated between two given positions // Returns a map following the format data[roomID] = []dataTypes @@ -1031,10 +1044,8 @@ func (d *Database) getStateDeltas( } // add peek blocks - peeking := make(map[string]bool) newPeeks := false for _, peek := range peeks { - peeking[peek.RoomID] = true if peek.New { // send full room state down instead of a delta var s []types.StreamEvent @@ -1073,18 +1084,6 @@ func (d *Database) getStateDeltas( // the timeline. if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" { if membership == gomatrixserverlib.Join { - if peeking[roomID] { - // we automatically cancel our peeks when we join a room - err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { - // XXX: is it correct that we're discarding the streamid here? - _, err = d.Peeks.DeletePeeks(ctx, txn, roomID, userID) - return err - }) - if err != nil { - return nil, nil, err - } - } - // send full room state down instead of a delta var s []types.StreamEvent s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter) @@ -1141,10 +1140,8 @@ func (d *Database) getStateDeltasForFullStateSync( } // Add full states for all peeking rooms - peeking := make(map[string]bool) newPeeks := false for _, peek := range peeks { - peeking[peek.RoomID] = true if peek.New { newPeeks = true } @@ -1207,16 +1204,6 @@ func (d *Database) getStateDeltasForFullStateSync( if stateErr != nil { return nil, nil, stateErr } - if peeking[joinedRoomID] { - // we automatically cancel our peeks when we join a room - if err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error { - // XXX: is it correct that we're discarding the streamid here? - _, err = d.Peeks.DeletePeeks(ctx, txn, joinedRoomID, userID) - return err - }); err != nil { - return nil, nil, err - } - } deltas[joinedRoomID] = stateDelta{ membership: gomatrixserverlib.Join, stateEvents: d.StreamEventsToEvents(device, s),