2022-01-05 17:44:49 +00:00
package jetstream
import (
2022-08-02 11:58:08 +00:00
"crypto/tls"
2022-03-25 12:24:21 +00:00
"fmt"
2022-03-23 10:20:18 +00:00
"reflect"
2022-01-05 17:44:49 +00:00
"strings"
"sync"
"time"
2022-03-25 12:24:21 +00:00
"github.com/getsentry/sentry-go"
2022-09-27 13:01:34 +00:00
"github.com/sirupsen/logrus"
2022-01-05 17:44:49 +00:00
"github.com/matrix-org/dendrite/setup/config"
2022-03-21 10:32:34 +00:00
"github.com/matrix-org/dendrite/setup/process"
2022-01-05 17:44:49 +00:00
natsserver "github.com/nats-io/nats-server/v2/server"
natsclient "github.com/nats-io/nats.go"
)
2022-05-09 13:15:24 +00:00
type NATSInstance struct {
* natsserver . Server
2022-04-08 09:12:30 +00:00
}
2022-08-05 08:19:33 +00:00
var natsLock sync . Mutex
func DeleteAllStreams ( js natsclient . JetStreamContext , cfg * config . JetStream ) {
2022-05-09 16:23:02 +00:00
for _ , stream := range streams { // streams are defined in streams.go
name := cfg . Prefixed ( stream . Name )
_ = js . DeleteStream ( name )
}
}
2022-05-09 13:15:24 +00:00
func ( s * NATSInstance ) Prepare ( process * process . ProcessContext , cfg * config . JetStream ) ( natsclient . JetStreamContext , * natsclient . Conn ) {
2022-08-05 08:19:33 +00:00
natsLock . Lock ( )
defer natsLock . Unlock ( )
2022-01-05 17:44:49 +00:00
// check if we need an in-process NATS Server
if len ( cfg . Addresses ) != 0 {
2022-03-25 12:24:21 +00:00
return setupNATS ( process , cfg , nil )
2022-01-05 17:44:49 +00:00
}
2022-05-09 13:15:24 +00:00
if s . Server == nil {
2022-01-05 17:44:49 +00:00
var err error
2022-05-09 13:15:24 +00:00
s . Server , err = natsserver . NewServer ( & natsserver . Options {
2022-02-17 13:15:35 +00:00
ServerName : "monolith" ,
DontListen : true ,
JetStream : true ,
StoreDir : string ( cfg . StoragePath ) ,
NoSystemAccount : true ,
MaxPayload : 16 * 1024 * 1024 ,
2022-04-27 12:36:40 +00:00
NoSigs : true ,
2022-08-02 11:58:08 +00:00
NoLog : cfg . NoLog ,
2022-01-05 17:44:49 +00:00
} )
if err != nil {
panic ( err )
}
2022-05-09 13:15:24 +00:00
s . ConfigureLogger ( )
2022-03-21 10:32:34 +00:00
go func ( ) {
process . ComponentStarted ( )
2022-05-09 13:15:24 +00:00
s . Start ( )
2022-03-21 10:32:34 +00:00
} ( )
go func ( ) {
<- process . WaitForShutdown ( )
2022-05-09 13:15:24 +00:00
s . Shutdown ( )
s . WaitForShutdown ( )
2022-03-21 10:32:34 +00:00
process . ComponentFinished ( )
} ( )
2022-01-05 17:44:49 +00:00
}
2022-05-09 13:15:24 +00:00
if ! s . ReadyForConnections ( time . Second * 10 ) {
2022-01-05 17:44:49 +00:00
logrus . Fatalln ( "NATS did not start in time" )
}
2022-05-09 13:15:24 +00:00
nc , err := natsclient . Connect ( "" , natsclient . InProcessServer ( s ) )
2022-01-05 17:44:49 +00:00
if err != nil {
logrus . Fatalln ( "Failed to create NATS client" )
}
2022-03-25 12:24:21 +00:00
return setupNATS ( process , cfg , nc )
2022-01-05 17:44:49 +00:00
}
2022-03-25 12:24:21 +00:00
func setupNATS ( process * process . ProcessContext , cfg * config . JetStream , nc * natsclient . Conn ) ( natsclient . JetStreamContext , * natsclient . Conn ) {
2022-01-05 17:44:49 +00:00
if nc == nil {
var err error
2022-08-05 08:19:33 +00:00
opts := [ ] natsclient . Option { }
2022-08-02 11:58:08 +00:00
if cfg . DisableTLSValidation {
2022-08-05 08:19:33 +00:00
opts = append ( opts , natsclient . Secure ( & tls . Config {
2022-08-02 11:58:08 +00:00
InsecureSkipVerify : true ,
} ) )
}
nc , err = natsclient . Connect ( strings . Join ( cfg . Addresses , "," ) , opts ... )
2022-01-05 17:44:49 +00:00
if err != nil {
logrus . WithError ( err ) . Panic ( "Unable to connect to NATS" )
2022-03-16 14:21:11 +00:00
return nil , nil
2022-01-05 17:44:49 +00:00
}
}
s , err := nc . JetStream ( )
if err != nil {
logrus . WithError ( err ) . Panic ( "Unable to get JetStream context" )
2022-03-16 14:21:11 +00:00
return nil , nil
2022-01-05 17:44:49 +00:00
}
for _ , stream := range streams { // streams are defined in streams.go
2022-03-23 10:20:18 +00:00
name := cfg . Prefixed ( stream . Name )
2022-01-05 17:44:49 +00:00
info , err := s . StreamInfo ( name )
if err != nil && err != natsclient . ErrStreamNotFound {
logrus . WithError ( err ) . Fatal ( "Unable to get stream info" )
}
2022-03-23 10:20:18 +00:00
subjects := stream . Subjects
if len ( subjects ) == 0 {
// By default we want each stream to listen for the subjects
// that are either an exact match for the stream name, or where
// the first part of the subject is the stream name. ">" is a
// wildcard in NATS for one or more subject tokens. In the case
// that the stream is called "Foo", this will match any message
// with the subject "Foo", "Foo.Bar" or "Foo.Bar.Baz" etc.
subjects = [ ] string { name , name + ".>" }
}
if info != nil {
switch {
case ! reflect . DeepEqual ( info . Config . Subjects , subjects ) :
fallthrough
case info . Config . Retention != stream . Retention :
fallthrough
case info . Config . Storage != stream . Storage :
if err = s . DeleteStream ( name ) ; err != nil {
logrus . WithError ( err ) . Fatal ( "Unable to delete stream" )
}
info = nil
}
}
2022-01-05 17:44:49 +00:00
if info == nil {
// If we're trying to keep everything in memory (e.g. unit tests)
// then overwrite the storage policy.
if cfg . InMemory {
2022-02-04 13:08:13 +00:00
stream . Storage = natsclient . MemoryStorage
2022-01-05 17:44:49 +00:00
}
2022-01-07 17:31:57 +00:00
// Namespace the streams without modifying the original streams
// array, otherwise we end up with namespaces on namespaces.
namespaced := * stream
namespaced . Name = name
2022-03-23 10:20:18 +00:00
namespaced . Subjects = subjects
2022-01-07 17:31:57 +00:00
if _ , err = s . AddStream ( & namespaced ) ; err != nil {
2022-03-25 12:24:21 +00:00
logger := logrus . WithError ( err ) . WithFields ( logrus . Fields {
"stream" : namespaced . Name ,
"subjects" : namespaced . Subjects ,
} )
// If the stream was supposed to be in-memory to begin with
// then an error here is fatal so we'll give up.
if namespaced . Storage == natsclient . MemoryStorage {
logger . WithError ( err ) . Fatal ( "Unable to add in-memory stream" )
}
// The stream was supposed to be on disk. Let's try starting
// Dendrite with the stream in-memory instead. That'll mean that
// we can't recover anything that was queued on the disk but we
// will still be able to start and run hopefully in the meantime.
logger . WithError ( err ) . Error ( "Unable to add stream" )
sentry . CaptureException ( fmt . Errorf ( "Unable to add stream %q: %w" , namespaced . Name , err ) )
namespaced . Storage = natsclient . MemoryStorage
if _ , err = s . AddStream ( & namespaced ) ; err != nil {
// We tried to add the stream in-memory instead but something
// went wrong. That's an unrecoverable situation so we will
// give up at this point.
logger . WithError ( err ) . Fatal ( "Unable to add in-memory stream" )
}
if stream . Storage != namespaced . Storage {
// We've managed to add the stream in memory. What's on the
// disk will be left alone, but our ability to recover from a
// future crash will be limited. Yell about it.
sentry . CaptureException ( fmt . Errorf ( "Stream %q is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible" , namespaced . Name ) )
logrus . Warn ( "Stream is running in-memory; this may be due to data corruption in the JetStream storage directory, investigate as soon as possible" )
process . Degraded ( )
}
2022-01-05 17:44:49 +00:00
}
}
}
2022-03-29 12:14:35 +00:00
// Clean up old consumers so that interest-based consumers do the
// right thing.
for stream , consumers := range map [ string ] [ ] string {
OutputClientData : { "SyncAPIClientAPIConsumer" } ,
OutputReceiptEvent : { "SyncAPIEDUServerReceiptConsumer" , "FederationAPIEDUServerConsumer" } ,
OutputSendToDeviceEvent : { "SyncAPIEDUServerSendToDeviceConsumer" , "FederationAPIEDUServerConsumer" } ,
OutputTypingEvent : { "SyncAPIEDUServerTypingConsumer" , "FederationAPIEDUServerConsumer" } ,
2022-09-01 08:20:40 +00:00
OutputRoomEvent : { "AppserviceRoomserverConsumer" } ,
2022-09-27 13:01:34 +00:00
OutputStreamEvent : { "UserAPISyncAPIStreamEventConsumer" } ,
OutputReadUpdate : { "UserAPISyncAPIReadUpdateConsumer" } ,
2022-03-29 12:14:35 +00:00
} {
streamName := cfg . Matrix . JetStream . Prefixed ( stream )
for _ , consumer := range consumers {
consumerName := cfg . Matrix . JetStream . Prefixed ( consumer ) + "Pull"
consumerInfo , err := s . ConsumerInfo ( streamName , consumerName )
if err != nil || consumerInfo == nil {
continue
}
if err = s . DeleteConsumer ( streamName , consumerName ) ; err != nil {
logrus . WithError ( err ) . Errorf ( "Unable to clean up old consumer %q for stream %q" , consumer , stream )
}
}
}
2022-03-16 14:21:11 +00:00
return s , nc
2022-01-05 17:44:49 +00:00
}