mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-10 13:53:40 +00:00
move peek-cancelling to consumer
This commit is contained in:
parent
55c7f2c892
commit
034ff3208c
3 changed files with 48 additions and 26 deletions
|
@ -31,6 +31,7 @@ import (
|
||||||
|
|
||||||
// OutputRoomEventConsumer consumes events that originated in the room server.
|
// OutputRoomEventConsumer consumes events that originated in the room server.
|
||||||
type OutputRoomEventConsumer struct {
|
type OutputRoomEventConsumer struct {
|
||||||
|
cfg *config.SyncAPI
|
||||||
rsAPI api.RoomserverInternalAPI
|
rsAPI api.RoomserverInternalAPI
|
||||||
rsConsumer *internal.ContinualConsumer
|
rsConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
|
@ -55,6 +56,7 @@ func NewOutputRoomEventConsumer(
|
||||||
PartitionStore: store,
|
PartitionStore: store,
|
||||||
}
|
}
|
||||||
s := &OutputRoomEventConsumer{
|
s := &OutputRoomEventConsumer{
|
||||||
|
cfg: cfg,
|
||||||
rsConsumer: &consumer,
|
rsConsumer: &consumer,
|
||||||
db: store,
|
db: store,
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
@ -168,6 +170,10 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
||||||
|
|
||||||
s.notifyKeyChanges(&ev)
|
s.notifyKeyChanges(&ev)
|
||||||
|
|
||||||
|
if err = s.notifyJoinedPeeks(ctx, &ev); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -186,6 +192,32 @@ func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.Headere
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) (error) {
|
||||||
|
membership, err := ev.Membership()
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
|
||||||
|
if membership == gomatrixserverlib.Join {
|
||||||
|
// check it's a local join
|
||||||
|
_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if domain != s.cfg.Matrix.ServerName {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// cancel any peeks for it
|
||||||
|
_, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
|
||||||
|
// XXX: should we do anything with this new streampos?
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
||||||
ctx context.Context, msg api.OutputNewInviteEvent,
|
ctx context.Context, msg api.OutputNewInviteEvent,
|
||||||
) error {
|
) error {
|
||||||
|
|
|
@ -86,6 +86,9 @@ type Database interface {
|
||||||
// AddPeek adds a new peek to our DB for a given room by a given user's device.
|
// AddPeek adds a new peek to our DB for a given room by a given user's device.
|
||||||
// Returns an error if there was a problem communicating with the database.
|
// Returns an error if there was a problem communicating with the database.
|
||||||
AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error)
|
AddPeek(ctx context.Context, RoomID, UserID, DeviceID string) (types.StreamPosition, error)
|
||||||
|
// DeletePeek deletes all peeks for a given room by a given user
|
||||||
|
// Returns an error if there was a problem communicating with the database.
|
||||||
|
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
|
||||||
// SetTypingTimeoutCallback sets a callback function that is called right after
|
// SetTypingTimeoutCallback sets a callback function that is called right after
|
||||||
// a user is removed from the typing user list due to timeout.
|
// a user is removed from the typing user list due to timeout.
|
||||||
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
|
SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn)
|
||||||
|
|
|
@ -176,6 +176,19 @@ func (d *Database) AddPeek(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeletePeeks tracks the fact that a user has stopped peeking from all devices
|
||||||
|
// If the peeks was successfully deleted this returns the stream ID it was stored at.
|
||||||
|
// Returns an error if there was a problem communicating with the database.
|
||||||
|
func (d *Database) DeletePeeks(
|
||||||
|
ctx context.Context, roomID, userID string,
|
||||||
|
) (sp types.StreamPosition, err error) {
|
||||||
|
_ = d.Writer.Do(nil, nil, func(_ *sql.Tx) error {
|
||||||
|
sp, err = d.Peeks.DeletePeeks(ctx, nil, roomID, userID)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// GetAccountDataInRange returns all account data for a given user inserted or
|
// GetAccountDataInRange returns all account data for a given user inserted or
|
||||||
// updated between two given positions
|
// updated between two given positions
|
||||||
// Returns a map following the format data[roomID] = []dataTypes
|
// Returns a map following the format data[roomID] = []dataTypes
|
||||||
|
@ -1031,10 +1044,8 @@ func (d *Database) getStateDeltas(
|
||||||
}
|
}
|
||||||
|
|
||||||
// add peek blocks
|
// add peek blocks
|
||||||
peeking := make(map[string]bool)
|
|
||||||
newPeeks := false
|
newPeeks := false
|
||||||
for _, peek := range peeks {
|
for _, peek := range peeks {
|
||||||
peeking[peek.RoomID] = true
|
|
||||||
if peek.New {
|
if peek.New {
|
||||||
// send full room state down instead of a delta
|
// send full room state down instead of a delta
|
||||||
var s []types.StreamEvent
|
var s []types.StreamEvent
|
||||||
|
@ -1073,18 +1084,6 @@ func (d *Database) getStateDeltas(
|
||||||
// the timeline.
|
// the timeline.
|
||||||
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
|
||||||
if membership == gomatrixserverlib.Join {
|
if membership == gomatrixserverlib.Join {
|
||||||
if peeking[roomID] {
|
|
||||||
// we automatically cancel our peeks when we join a room
|
|
||||||
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
|
||||||
// XXX: is it correct that we're discarding the streamid here?
|
|
||||||
_, err = d.Peeks.DeletePeeks(ctx, txn, roomID, userID)
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// send full room state down instead of a delta
|
// send full room state down instead of a delta
|
||||||
var s []types.StreamEvent
|
var s []types.StreamEvent
|
||||||
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
|
s, err = d.currentStateStreamEventsForRoom(ctx, txn, roomID, stateFilter)
|
||||||
|
@ -1141,10 +1140,8 @@ func (d *Database) getStateDeltasForFullStateSync(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add full states for all peeking rooms
|
// Add full states for all peeking rooms
|
||||||
peeking := make(map[string]bool)
|
|
||||||
newPeeks := false
|
newPeeks := false
|
||||||
for _, peek := range peeks {
|
for _, peek := range peeks {
|
||||||
peeking[peek.RoomID] = true
|
|
||||||
if peek.New {
|
if peek.New {
|
||||||
newPeeks = true
|
newPeeks = true
|
||||||
}
|
}
|
||||||
|
@ -1207,16 +1204,6 @@ func (d *Database) getStateDeltasForFullStateSync(
|
||||||
if stateErr != nil {
|
if stateErr != nil {
|
||||||
return nil, nil, stateErr
|
return nil, nil, stateErr
|
||||||
}
|
}
|
||||||
if peeking[joinedRoomID] {
|
|
||||||
// we automatically cancel our peeks when we join a room
|
|
||||||
if err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
|
||||||
// XXX: is it correct that we're discarding the streamid here?
|
|
||||||
_, err = d.Peeks.DeletePeeks(ctx, txn, joinedRoomID, userID)
|
|
||||||
return err
|
|
||||||
}); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
deltas[joinedRoomID] = stateDelta{
|
deltas[joinedRoomID] = stateDelta{
|
||||||
membership: gomatrixserverlib.Join,
|
membership: gomatrixserverlib.Join,
|
||||||
stateEvents: d.StreamEventsToEvents(device, s),
|
stateEvents: d.StreamEventsToEvents(device, s),
|
||||||
|
|
Loading…
Reference in a new issue