NATS JetStream tweaks (#2086)

* Use named NATS durable consumers

* Build fixes

* Remove dupe call to SetFederationAPI

* Use namespaced consumer name

* Fix namespacing

* Fix unit tests hopefully
This commit is contained in:
Neil Alexander 2022-01-07 17:31:57 +00:00 committed by GitHub
parent a422321435
commit 16035b9737
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 51 additions and 15 deletions

View file

@ -34,6 +34,7 @@ import (
type OutputClientDataConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
stream types.StreamProvider
@ -53,6 +54,7 @@ func NewOutputClientDataConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputClientData),
durable: cfg.Matrix.JetStream.Durable("SyncAPIClientAPIConsumer"),
db: store,
notifier: notifier,
stream: stream,
@ -61,7 +63,7 @@ func NewOutputClientDataConsumer(
// Start consuming from room servers
func (s *OutputClientDataConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -34,6 +34,7 @@ import (
type OutputReceiptEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
stream types.StreamProvider
@ -54,6 +55,7 @@ func NewOutputReceiptEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputReceiptEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerReceiptConsumer"),
db: store,
notifier: notifier,
stream: stream,
@ -62,7 +64,7 @@ func NewOutputReceiptEventConsumer(
// Start consuming from EDU api
func (s *OutputReceiptEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -36,6 +36,7 @@ import (
type OutputSendToDeviceEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
@ -57,6 +58,7 @@ func NewOutputSendToDeviceEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputSendToDeviceEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerSendToDeviceConsumer"),
db: store,
serverName: cfg.Matrix.ServerName,
notifier: notifier,
@ -66,7 +68,7 @@ func NewOutputSendToDeviceEventConsumer(
// Start consuming from EDU api
func (s *OutputSendToDeviceEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -35,6 +35,7 @@ import (
type OutputTypingEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
eduCache *cache.EDUCache
stream types.StreamProvider
@ -56,6 +57,7 @@ func NewOutputTypingEventConsumer(
ctx: process.Context(),
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputTypingEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIEDUServerTypingConsumer"),
eduCache: eduCache,
notifier: notifier,
stream: stream,
@ -64,7 +66,7 @@ func NewOutputTypingEventConsumer(
// Start consuming from EDU api
func (s *OutputTypingEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}

View file

@ -38,6 +38,7 @@ type OutputRoomEventConsumer struct {
cfg *config.SyncAPI
rsAPI api.RoomserverInternalAPI
jetstream nats.JetStreamContext
durable nats.SubOpt
topic string
db storage.Database
pduStream types.StreamProvider
@ -61,6 +62,7 @@ func NewOutputRoomEventConsumer(
cfg: cfg,
jetstream: js,
topic: cfg.Matrix.JetStream.TopicFor(jetstream.OutputRoomEvent),
durable: cfg.Matrix.JetStream.Durable("SyncAPIRoomServerConsumer"),
db: store,
notifier: notifier,
pduStream: pduStream,
@ -71,7 +73,7 @@ func NewOutputRoomEventConsumer(
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
_, err := s.jetstream.Subscribe(s.topic, s.onMessage)
_, err := s.jetstream.Subscribe(s.topic, s.onMessage, s.durable)
return err
}