Only call Membership() on membership events

This commit is contained in:
Neil Alexander 2020-09-08 14:05:53 +01:00
parent 7a76f4961a
commit 56001d0d4f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -17,6 +17,7 @@ package consumers
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/Shopify/sarama" "github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal"
@ -26,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/sync"
"github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -56,7 +58,7 @@ func NewOutputRoomEventConsumer(
PartitionStore: store, PartitionStore: store,
} }
s := &OutputRoomEventConsumer{ s := &OutputRoomEventConsumer{
cfg: cfg, cfg: cfg,
rsConsumer: &consumer, rsConsumer: &consumer,
db: store, db: store,
notifier: n, notifier: n,
@ -168,6 +170,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
} }
if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil { if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil {
logrus.WithError(err).Panicf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
return err return err
} }
@ -194,16 +197,19 @@ func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.Headere
} }
func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) { func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) {
if ev.Type() != gomatrixserverlib.MRoomMember {
return sp, nil
}
membership, err := ev.Membership() membership, err := ev.Membership()
if err != nil { if err != nil {
return sp, err return sp, fmt.Errorf("ev.Membership: %w", err)
} }
// TODO: check that it's a join and not a profile change (means unmarshalling prev_content) // TODO: check that it's a join and not a profile change (means unmarshalling prev_content)
if membership == gomatrixserverlib.Join { if membership == gomatrixserverlib.Join {
// check it's a local join // check it's a local join
_, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey())
if err != nil { if err != nil {
return sp, err return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
} }
if domain != s.cfg.Matrix.ServerName { if domain != s.cfg.Matrix.ServerName {
return sp, nil return sp, nil
@ -213,7 +219,7 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom
sp, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey()) sp, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey())
// XXX: should we do anything with this new streampos? // XXX: should we do anything with this new streampos?
if err != nil { if err != nil {
return sp, err return sp, fmt.Errorf("s.db.DeletePeeks: %w", err)
} }
} }
return sp, nil return sp, nil