Peeking via MSC2753 (#1370)

Initial implementation of MSC2753, as tested by https://github.com/matrix-org/sytest/pull/944.
Doesn't yet handle unpeeks, peeked EDUs, or history viz changing during a peek - these will follow.
https://github.com/matrix-org/dendrite/pull/1370 has full details.
This commit is contained in:
Matthew Hodgson 2020-09-10 14:39:18 +01:00 committed by GitHub
parent 35564dd73c
commit 39507bacc3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 1209 additions and 59 deletions

View file

@ -17,6 +17,7 @@ package consumers
import (
"context"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal"
@ -26,11 +27,13 @@ import (
"github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
db storage.Database
@ -55,6 +58,7 @@ func NewOutputRoomEventConsumer(
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
cfg: cfg,
rsConsumer: &consumer,
db: store,
notifier: n,
@ -100,6 +104,8 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
case api.OutputTypeNewPeek:
return s.onNewPeek(context.TODO(), *output.NewPeek)
case api.OutputTypeRedactedEvent:
return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default:
@ -162,6 +168,12 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure")
return nil
}
if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
logrus.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifyKeyChanges(&ev)
@ -184,6 +196,37 @@ func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.Headere
}
}
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
if ev.Type() != gomatrixserverlib.MRoomMember {
return sp, nil
}
membership, err := ev.Membership()
if err != nil {
return sp, fmt.Errorf("ev.Membership: %w", err)
}
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
if membership == gomatrixserverlib.Join {
// check it's a local join
_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
if err != nil {
return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
}
if domain != s.cfg.Matrix.ServerName {
return sp, nil
}
// cancel any peeks for it
peekSP, peekErr := s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
if peekErr != nil {
return sp, fmt.Errorf("s.db.DeletePeeks: %w", peekErr)
}
if peekSP > 0 {
sp = peekSP
}
}
return sp, nil
}
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
@ -219,6 +262,26 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
return nil
}
func (s *OutputRoomEventConsumer) onNewPeek(
ctx context.Context, msg api.OutputNewPeek,
) error {
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
return nil
}
// tell the notifier about the new peek so it knows to wake up new devices
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID)
// we need to wake up the users who might need to now be peeking into this room,
// so we send in a dummy event to trigger a wakeup
s.notifier.OnNewEvent(nil, msg.RoomID, nil, types.NewStreamToken(sp, 0, nil))
return nil
}
func (s *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
if event.StateKey() == nil {
return event, nil