notify new streampos after joining peeked rooms

This commit is contained in:
Matthew Hodgson 2020-09-08 12:28:22 +01:00
parent b96a31db51
commit 7a76f4961a

View file

@ -166,14 +166,15 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
}).Panicf("roomserver output log: write event failure") }).Panicf("roomserver output log: write event failure")
return nil return nil
} }
if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
return err
}
s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil)) s.notifier.OnNewEvent(&ev, "", nil, types.NewStreamToken(pduPos, 0, nil))
s.notifyKeyChanges(&ev) s.notifyKeyChanges(&ev)
if err = s.notifyJoinedPeeks(ctx, &ev); err != nil {
return err
}
return nil return nil
} }
@ -192,30 +193,30 @@ func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.Headere
} }
} }
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) (error) { func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
membership, err := ev.Membership() membership, err := ev.Membership()
if err != nil { if err != nil {
return nil return sp, err
} }
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content) // TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
if membership == gomatrixserverlib.Join { if membership == gomatrixserverlib.Join {
// check it's a local join // check it's a local join
_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
if err != nil { if err != nil {
return err return sp, err
} }
if domain != s.cfg.Matrix.ServerName { if domain != s.cfg.Matrix.ServerName {
return nil return sp, nil
} }
// cancel any peeks for it // cancel any peeks for it
_, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey()) sp, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
// XXX: should we do anything with this new streampos? // XXX: should we do anything with this new streampos?
if err != nil { if err != nil {
return err return sp, err
} }
} }
return nil return sp, nil
} }
func (s *OutputRoomEventConsumer) onNewInviteEvent( func (s *OutputRoomEventConsumer) onNewInviteEvent(