Merge branch 'master' of https://github.com/matrix-org/dendrite into add-nats-support

This commit is contained in:
Till Faelligen 2021-07-24 11:27:24 +02:00
commit a833f5764a
48 changed files with 700 additions and 298 deletions

View file

@ -19,6 +19,10 @@ var natsServer *natsserver.Server
var natsServerMutex sync.Mutex
func SetupConsumerProducer(cfg *config.JetStream) (sarama.Consumer, sarama.SyncProducer) {
// check if we need an in-process NATS Server
if len(cfg.Addresses) != 0 {
return setupNATS(cfg, nil)
}
natsServerMutex.Lock()
if natsServer == nil {
var err error
@ -64,14 +68,12 @@ func setupNATS(cfg *config.JetStream, nc *natsclient.Conn) (sarama.Consumer, sar
}
for _, stream := range streams {
stream.Name = cfg.TopicFor(stream.Name)
info, err := s.StreamInfo(stream.Name)
if err != nil && err != natsclient.ErrStreamNotFound {
logrus.WithError(err).Fatal("Unable to get stream info")
}
if info == nil {
stream.Name = cfg.TopicFor(stream.Name)
stream.Subjects = []string{stream.Name}
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg.InMemory {

View file

@ -18,32 +18,38 @@ var (
var streams = []*nats.StreamConfig{
{
Name: OutputRoomEvent,
Subjects: []string{OutputRoomEvent},
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputSendToDeviceEvent,
Subjects: []string{OutputSendToDeviceEvent},
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputKeyChangeEvent,
Subjects: []string{OutputKeyChangeEvent},
Retention: nats.LimitsPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputTypingEvent,
Subjects: []string{OutputTypingEvent},
Retention: nats.InterestPolicy,
Storage: nats.MemoryStorage,
MaxAge: time.Second * 60,
},
{
Name: OutputClientData,
Subjects: []string{OutputClientData},
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},
{
Name: OutputReceiptEvent,
Subjects: []string{OutputReceiptEvent},
Retention: nats.InterestPolicy,
Storage: nats.FileStorage,
},