diff --git a/syncapi/consumers/roomserver.go b/syncapi/consumers/roomserver.go index a846b87d..6379c5f6 100644 --- a/syncapi/consumers/roomserver.go +++ b/syncapi/consumers/roomserver.go @@ -17,6 +17,7 @@ package consumers import ( "context" "encoding/json" + "fmt" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" @@ -26,6 +27,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/sync" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -56,7 +58,7 @@ func NewOutputRoomEventConsumer( PartitionStore: store, } s := &OutputRoomEventConsumer{ - cfg: cfg, + cfg: cfg, rsConsumer: &consumer, db: store, notifier: n, @@ -168,6 +170,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( } if pduPos, err = s.notifyJoinedPeeks(ctx, &ev, pduPos); err != nil { + logrus.WithError(err).Panicf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos) return err } @@ -194,16 +197,19 @@ func (s *OutputRoomEventConsumer) notifyKeyChanges(ev *gomatrixserverlib.Headere } func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent, sp types.StreamPosition) (types.StreamPosition, error) { + if ev.Type() != gomatrixserverlib.MRoomMember { + return sp, nil + } membership, err := ev.Membership() if err != nil { - return sp, err + return sp, fmt.Errorf("ev.Membership: %w", err) } // TODO: check that it's a join and not a profile change (means unmarshalling prev_content) if membership == gomatrixserverlib.Join { // check it's a local join _, domain, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) if err != nil { - return sp, err + return sp, fmt.Errorf("gomatrixserverlib.SplitID: %w", err) } if domain != s.cfg.Matrix.ServerName { return sp, nil @@ -213,7 +219,7 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom sp, err = s.db.DeletePeeks(ctx, ev.RoomID(), *ev.StateKey()) // XXX: should we do anything with this new streampos? if err != nil { - return sp, err + return sp, fmt.Errorf("s.db.DeletePeeks: %w", err) } } return sp, nil