More refactoring to remove saramajetstream

This commit is contained in:
Neil Alexander 2021-11-03 14:28:40 +00:00
parent f484285f17
commit 1110f830c6
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
24 changed files with 329 additions and 838 deletions

View file

@ -18,9 +18,7 @@ import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -28,60 +26,55 @@ import (
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputClientDataConsumer consumes events that originated in the client API server.
type OutputClientDataConsumer struct {
clientAPIConsumer *internal.ContinualConsumer
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
jetstream nats.JetStreamContext
topic string
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
func NewOutputClientDataConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputClientDataConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/clientapi",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData)),
Consumer: kafkaConsumer,
PartitionStore: store,
return &OutputClientDataConsumer{
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
db: store,
notifier: notifier,
stream: stream,
}
s := &OutputClientDataConsumer{
clientAPIConsumer: &consumer,
db: store,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
return s.clientAPIConsumer.Start()
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
// onMessage is called when the sync server receives a new event from the client API server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputClientDataConsumer) onMessage(msg *nats.Msg) {
// Parse out the event JSON
userID := msg.Header.Get(jetstream.UserID)
var output eventutil.AccountData
if err := json.Unmarshal(msg.Value, &output); err != nil {
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("client API server output log: message parse failure")
sentry.CaptureException(err)
return nil
return
}
log.WithFields(log.Fields{
@ -90,7 +83,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}).Info("received data from client API server")
streamPos, err := s.db.UpsertAccountData(
context.TODO(), string(msg.Key), output.RoomID, output.Type,
context.TODO(), userID, output.RoomID, output.Type,
)
if err != nil {
sentry.CaptureException(err)
@ -102,7 +95,5 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
}
s.stream.Advance(streamPos)
s.notifier.OnNewAccountData(string(msg.Key), types.StreamingToken{AccountDataPosition: streamPos})
return nil
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
}

View file

@ -18,25 +18,25 @@ import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputReceiptEventConsumer consumes events that originated in the EDU server.
type OutputReceiptEventConsumer struct {
receiptConsumer *internal.ContinualConsumer
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
jetstream nats.JetStreamContext
topic string
db storage.Database
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
@ -44,44 +44,33 @@ type OutputReceiptEventConsumer struct {
func NewOutputReceiptEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputReceiptEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/receipt",
Topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
Consumer: kafkaConsumer,
PartitionStore: store,
return &OutputReceiptEventConsumer{
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
db: store,
notifier: notifier,
stream: stream,
}
s := &OutputReceiptEventConsumer{
receiptConsumer: &consumer,
db: store,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
return s.receiptConsumer.Start()
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputReceiptEventConsumer) onMessage(msg *nats.Msg) {
var output api.OutputReceiptEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return nil
return
}
streamPos, err := s.db.StoreReceipt(
@ -94,11 +83,9 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
)
if err != nil {
sentry.CaptureException(err)
return err
return
}
s.stream.Advance(streamPos)
s.notifier.OnNewReceipt(output.RoomID, types.StreamingToken{ReceiptPosition: streamPos})
return nil
}

View file

@ -18,10 +18,8 @@ import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
@ -30,16 +28,18 @@ import (
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputSendToDeviceEventConsumer consumes events that originated in the EDU server.
type OutputSendToDeviceEventConsumer struct {
sendToDeviceConsumer *internal.ContinualConsumer
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
stream types.StreamProvider
notifier *notifier.Notifier
jetstream nats.JetStreamContext
topic string
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
@ -47,54 +47,43 @@ type OutputSendToDeviceEventConsumer struct {
func NewOutputSendToDeviceEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputSendToDeviceEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/sendtodevice",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
return &OutputSendToDeviceEventConsumer{
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
stream: stream,
}
s := &OutputSendToDeviceEventConsumer{
sendToDeviceConsumer: &consumer,
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
return s.sendToDeviceConsumer.Start()
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputSendToDeviceEventConsumer) onMessage(msg *nats.Msg) {
var output api.OutputSendToDeviceEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return err
return
}
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
if err != nil {
sentry.CaptureException(err)
return err
return
}
if domain != s.serverName {
return nil
return
}
util.GetLogger(context.TODO()).WithFields(log.Fields{
@ -110,7 +99,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
if err != nil {
sentry.CaptureException(err)
log.WithError(err).Errorf("failed to store send-to-device message")
return err
return
}
s.stream.Advance(streamPos)
@ -119,6 +108,4 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
[]string{output.DeviceID},
types.StreamingToken{SendToDevicePosition: streamPos},
)
return nil
}

View file

@ -17,26 +17,25 @@ package consumers
import (
"encoding/json"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi/notifier"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
// OutputTypingEventConsumer consumes events that originated in the EDU server.
type OutputTypingEventConsumer struct {
typingConsumer *internal.ContinualConsumer
eduCache *cache.EDUCache
stream types.StreamProvider
notifier *notifier.Notifier
jetstream nats.JetStreamContext
topic string
eduCache *cache.EDUCache
stream types.StreamProvider
notifier *notifier.Notifier
}
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
@ -44,49 +43,33 @@ type OutputTypingEventConsumer struct {
func NewOutputTypingEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
store storage.Database,
eduCache *cache.EDUCache,
notifier *notifier.Notifier,
stream types.StreamProvider,
) *OutputTypingEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/eduserver/typing",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
return &OutputTypingEventConsumer{
jetstream: js,
eduCache: eduCache,
notifier: notifier,
stream: stream,
}
s := &OutputTypingEventConsumer{
typingConsumer: &consumer,
eduCache: eduCache,
notifier: notifier,
stream: stream,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
s.eduCache.SetTimeoutCallback(func(userID, roomID string, latestSyncPosition int64) {
pos := types.StreamPosition(latestSyncPosition)
s.notifier.OnNewTyping(roomID, types.StreamingToken{TypingPosition: pos})
})
return s.typingConsumer.Start()
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputTypingEventConsumer) onMessage(msg *nats.Msg) {
var output api.OutputTypingEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("EDU server output log: message parse failure")
sentry.CaptureException(err)
return nil
return
}
log.WithFields(log.Fields{
@ -109,6 +92,4 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
s.stream.Advance(typingPos)
s.notifier.OnNewTyping(output.Event.RoomID, types.StreamingToken{TypingPosition: typingPos})
return nil
}

View file

@ -19,9 +19,7 @@ import (
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -30,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
)
@ -37,7 +36,8 @@ import (
type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
rsConsumer *internal.ContinualConsumer
jetstream nats.JetStreamContext
topic string
db storage.Database
pduStream types.StreamProvider
inviteStream types.StreamProvider
@ -48,50 +48,42 @@ type OutputRoomEventConsumer struct {
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.SyncAPI,
kafkaConsumer sarama.Consumer,
js nats.JetStreamContext,
store storage.Database,
notifier *notifier.Notifier,
pduStream types.StreamProvider,
inviteStream types.StreamProvider,
rsAPI api.RoomserverInternalAPI,
) *OutputRoomEventConsumer {
consumer := internal.ContinualConsumer{
Process: process,
ComponentName: "syncapi/roomserver",
Topic: string(cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent)),
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
return &OutputRoomEventConsumer{
cfg: cfg,
rsConsumer: &consumer,
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
db: store,
notifier: notifier,
pduStream: pduStream,
inviteStream: inviteStream,
rsAPI: rsAPI,
}
consumer.ProcessMessage = s.onMessage
return s
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return s.rsConsumer.Start()
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
return err
}
// onMessage is called when the sync server receives a new event from the room server output log.
// It is not safe for this function to be called from multiple goroutines, or else the
// sync stream position may race and be incorrectly calculated.
func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
func (s *OutputRoomEventConsumer) onMessage(msg *nats.Msg) {
// Parse out the event JSON
var err error
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
if err = json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
return
}
switch output.Type {
@ -103,27 +95,29 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// in the special case where the event redacts itself, just pass the message through because
// we will never see the other part of the pair
if event.Redacts() != event.EventID() {
return nil
return
}
}
return s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
s.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeOldRoomEvent:
return s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
s.onOldRoomEvent(context.TODO(), *output.OldRoomEvent)
case api.OutputTypeNewInviteEvent:
return s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
s.onNewInviteEvent(context.TODO(), *output.NewInviteEvent)
case api.OutputTypeRetireInviteEvent:
return s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
s.onRetireInviteEvent(context.TODO(), *output.RetireInviteEvent)
case api.OutputTypeNewPeek:
return s.onNewPeek(context.TODO(), *output.NewPeek)
s.onNewPeek(context.TODO(), *output.NewPeek)
case api.OutputTypeRetirePeek:
return s.onRetirePeek(context.TODO(), *output.RetirePeek)
s.onRetirePeek(context.TODO(), *output.RetirePeek)
case api.OutputTypeRedactedEvent:
return s.onRedactEvent(context.TODO(), *output.RedactedEvent)
s.onRedactEvent(context.TODO(), *output.RedactedEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
return nil
}
if err != nil {
log.WithError(err).Error("roomserver output log: failed to process event")
}
}
@ -276,12 +270,12 @@ func (s *OutputRoomEventConsumer) notifyJoinedPeeks(ctx context.Context, ev *gom
func (s *OutputRoomEventConsumer) onNewInviteEvent(
ctx context.Context, msg api.OutputNewInviteEvent,
) error {
) {
if msg.Event.StateKey() == nil {
log.WithFields(log.Fields{
"event": string(msg.Event.JSON()),
}).Panicf("roomserver output log: invite has no state key")
return nil
return
}
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
if err != nil {
@ -293,18 +287,16 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
"pdupos": pduPos,
log.ErrorKey: err,
}).Panicf("roomserver output log: write invite failure")
return nil
return
}
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, *msg.Event.StateKey())
return nil
}
func (s *OutputRoomEventConsumer) onRetireInviteEvent(
ctx context.Context, msg api.OutputRetireInviteEvent,
) error {
) {
pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
if err != nil {
sentry.CaptureException(err)
@ -313,19 +305,17 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
"event_id": msg.EventID,
log.ErrorKey: err,
}).Panicf("roomserver output log: remove invite failure")
return nil
return
}
// Notify any active sync requests that the invite has been retired.
s.inviteStream.Advance(pduPos)
s.notifier.OnNewInvite(types.StreamingToken{InvitePosition: pduPos}, msg.TargetUserID)
return nil
}
func (s *OutputRoomEventConsumer) onNewPeek(
ctx context.Context, msg api.OutputNewPeek,
) error {
) {
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil {
sentry.CaptureException(err)
@ -333,7 +323,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
return nil
return
}
// tell the notifier about the new peek so it knows to wake up new devices
@ -341,20 +331,18 @@ func (s *OutputRoomEventConsumer) onNewPeek(
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnNewPeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}
func (s *OutputRoomEventConsumer) onRetirePeek(
ctx context.Context, msg api.OutputRetirePeek,
) error {
) {
sp, err := s.db.DeletePeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
log.ErrorKey: err,
}).Panicf("roomserver output log: write peek failure")
return nil
return
}
// tell the notifier about the new peek so it knows to wake up new devices
@ -362,8 +350,6 @@ func (s *OutputRoomEventConsumer) onRetirePeek(
// index as PDUs, but we should fix this
s.pduStream.Advance(sp)
s.notifier.OnRetirePeek(msg.RoomID, msg.UserID, msg.DeviceID, types.StreamingToken{PDUPosition: sp})
return nil
}
func (s *OutputRoomEventConsumer) updateStateEvent(event *gomatrixserverlib.HeaderedEvent) (*gomatrixserverlib.HeaderedEvent, error) {