From 6843c3beeed675edf24bf73a24a7907ec37bdf66 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 5 May 2021 11:45:28 +0100 Subject: [PATCH] Add push server component template --- cmd/dendrite-demo-libp2p/main.go | 4 + cmd/dendrite-demo-yggdrasil/main.go | 8 ++ cmd/dendrite-monolith-server/main.go | 9 ++ .../personalities/pushserver.go | 33 ++++++++ cmd/dendritejs/main.go | 5 ++ pushserver/api/api.go | 14 ++++ pushserver/consumers/roomserver.go | 82 +++++++++++++++++++ pushserver/internal/api.go | 36 ++++++++ pushserver/inthttp/client.go | 48 +++++++++++ pushserver/inthttp/server.go | 29 +++++++ pushserver/pushserver.go | 47 +++++++++++ pushserver/storage/interface.go | 7 ++ pushserver/storage/postgres/storage.go | 31 +++++++ pushserver/storage/shared/storage.go | 12 +++ pushserver/storage/sqlite3/storage.go | 31 +++++++ pushserver/storage/storage.go | 23 ++++++ pushserver/storage/storage_wasm.go | 20 +++++ pushserver/storage/tables/interface.go | 1 + setup/base.go | 11 +++ setup/config/config.go | 12 +++ setup/config/config_pushserver.go | 22 +++++ setup/monolith.go | 2 + 22 files changed, 487 insertions(+) create mode 100644 cmd/dendrite-polylith-multi/personalities/pushserver.go create mode 100644 pushserver/api/api.go create mode 100644 pushserver/consumers/roomserver.go create mode 100644 pushserver/internal/api.go create mode 100644 pushserver/inthttp/client.go create mode 100644 pushserver/inthttp/server.go create mode 100644 pushserver/pushserver.go create mode 100644 pushserver/storage/interface.go create mode 100644 pushserver/storage/postgres/storage.go create mode 100644 pushserver/storage/shared/storage.go create mode 100644 pushserver/storage/sqlite3/storage.go create mode 100644 pushserver/storage/storage.go create mode 100644 pushserver/storage/storage_wasm.go create mode 100644 pushserver/storage/tables/interface.go create mode 100644 setup/config/config_pushserver.go diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 16afea69..d5efccad 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -33,6 +33,7 @@ import ( "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/pushserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" @@ -176,6 +177,8 @@ func main() { panic("failed to create new public rooms provider: " + err.Error()) } + psAPI := pushserver.NewInternalAPI(&base.Base) + monolith := setup.Monolith{ Config: base.Base.Cfg, AccountDB: accountDB, @@ -190,6 +193,7 @@ func main() { ServerKeyAPI: serverKeyAPI, UserAPI: userAPI, KeyAPI: keyAPI, + PushserverAPI: psAPI, ExtPublicRoomsProvider: provider, } monolith.AddAllPublicRoutes( diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 7815a79b..932ac8e6 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -36,6 +36,7 @@ import ( "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/pushserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" @@ -130,6 +131,12 @@ func main() { } }) + psAPI := pushserver.NewInternalAPI(base) + if base.UseHTTPAPIs { + pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI) + psAPI = base.PushServerHTTPClient() + } + rsComponent.SetFederationSenderAPI(fsAPI) monolith := setup.Monolith{ @@ -145,6 +152,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, + PushserverAPI: psAPI, ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider( ygg, fsAPI, federation, ), diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index b82f7321..c098ba0f 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/pushserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup" @@ -66,6 +67,7 @@ func main() { cfg.RoomServer.InternalAPI.Connect = httpAPIAddr cfg.SigningKeyServer.InternalAPI.Connect = httpAPIAddr cfg.SyncAPI.InternalAPI.Connect = httpAPIAddr + cfg.PushServer.InternalAPI.Connect = httpAPIAddr } base := setup.NewBaseDendrite(cfg, "Monolith", *enableHTTPAPIs) @@ -128,6 +130,12 @@ func main() { } rsAPI.SetAppserviceAPI(asAPI) + psAPI := pushserver.NewInternalAPI(base) + if base.UseHTTPAPIs { + pushserver.AddInternalRoutes(base.InternalAPIMux, psAPI) + psAPI = base.PushServerHTTPClient() + } + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -142,6 +150,7 @@ func main() { ServerKeyAPI: skAPI, UserAPI: userAPI, KeyAPI: keyAPI, + PushserverAPI: psAPI, } monolith.AddAllPublicRoutes( base.ProcessContext, diff --git a/cmd/dendrite-polylith-multi/personalities/pushserver.go b/cmd/dendrite-polylith-multi/personalities/pushserver.go new file mode 100644 index 00000000..b38645be --- /dev/null +++ b/cmd/dendrite-polylith-multi/personalities/pushserver.go @@ -0,0 +1,33 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package personalities + +import ( + "github.com/matrix-org/dendrite/pushserver" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/config" +) + +func PushServer(base *setup.BaseDendrite, cfg *config.Dendrite) { + intAPI := pushserver.NewInternalAPI(base) + + pushserver.AddInternalRoutes(base.InternalAPIMux, intAPI) + + base.SetupAndServeHTTP( + base.Cfg.PushServer.InternalAPI.Listen, // internal listener + setup.NoListener, // external listener + nil, nil, + ) +} diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 92724d1a..680ad33e 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -28,6 +28,7 @@ import ( "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/internal/httputil" "github.com/matrix-org/dendrite/keyserver" + "github.com/matrix-org/dendrite/pushserver" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup/config" @@ -174,6 +175,7 @@ func main() { cfg.SigningKeyServer.Database.ConnectionString = "file:/idb/dendritejs_signingkeyserver.db" cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db" cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db" + cfg.PushServer.Database.ConnectionString = "file:/idb/dendritejs_pushserver.db" cfg.Global.Kafka.UseNaffka = true cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db" cfg.Global.TrustedIDServers = []string{ @@ -215,6 +217,8 @@ func main() { rsAPI.SetFederationSenderAPI(fedSenderAPI) p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) + psAPI := pushserver.NewInternalAPI(base) + monolith := setup.Monolith{ Config: base.Cfg, AccountDB: accountDB, @@ -228,6 +232,7 @@ func main() { RoomserverAPI: rsAPI, UserAPI: userAPI, KeyAPI: keyAPI, + PushserverAPI: psAPI, //ServerKeyAPI: serverKeyAPI, ExtPublicRoomsProvider: p2pPublicRoomProvider, } diff --git a/pushserver/api/api.go b/pushserver/api/api.go new file mode 100644 index 00000000..5233af90 --- /dev/null +++ b/pushserver/api/api.go @@ -0,0 +1,14 @@ +package api + +import "context" + +type PushserverInternalAPI interface { + QueryExample( + ctx context.Context, + request *QueryExampleRequest, + response *QueryExampleResponse, + ) error +} + +type QueryExampleRequest struct{} +type QueryExampleResponse struct{} diff --git a/pushserver/consumers/roomserver.go b/pushserver/consumers/roomserver.go new file mode 100644 index 00000000..8a051d9d --- /dev/null +++ b/pushserver/consumers/roomserver.go @@ -0,0 +1,82 @@ +package consumers + +import ( + "encoding/json" + + "github.com/Shopify/sarama" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage" + rsapi "github.com/matrix-org/dendrite/roomserver/api" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/setup/process" + log "github.com/sirupsen/logrus" +) + +type OutputRoomEventConsumer struct { + cfg *config.PushServer + psAPI api.PushserverInternalAPI + rsConsumer *internal.ContinualConsumer + db storage.Database +} + +func NewOutputRoomEventConsumer( + process *process.ProcessContext, + cfg *config.PushServer, + kafkaConsumer sarama.Consumer, + store storage.Database, + psAPI api.PushserverInternalAPI, +) *OutputRoomEventConsumer { + consumer := internal.ContinualConsumer{ + Process: process, + ComponentName: "pushserver/roomserver", + Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)), + Consumer: kafkaConsumer, + PartitionStore: store, + } + s := &OutputRoomEventConsumer{ + cfg: cfg, + rsConsumer: &consumer, + db: store, + psAPI: psAPI, + } + consumer.ProcessMessage = s.onMessage + return s +} + +func (s *OutputRoomEventConsumer) Start() error { + return s.rsConsumer.Start() +} + +func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { + var output rsapi.OutputEvent + if err := json.Unmarshal(msg.Value, &output); err != nil { + log.WithError(err).Errorf("roomserver output log: message parse failure") + return nil + } + + switch output.Type { + case rsapi.OutputTypeNewRoomEvent: + ev := output.NewRoomEvent.Event + if err := s.processMessage(*output.NewRoomEvent); err != nil { + // panic rather than continue with an inconsistent database + log.WithFields(log.Fields{ + "event_id": ev.EventID(), + "event": string(ev.JSON()), + log.ErrorKey: err, + }).Panicf("roomserver output log: write room event failure") + return err + } + + default: + // Ignore old events, peeks, so on. + } + + return nil +} + +func (s *OutputRoomEventConsumer) processMessage(ore rsapi.OutputNewRoomEvent) error { + // TODO: New events from the roomserver will be passed here. + + return nil +} diff --git a/pushserver/internal/api.go b/pushserver/internal/api.go new file mode 100644 index 00000000..05432f2d --- /dev/null +++ b/pushserver/internal/api.go @@ -0,0 +1,36 @@ +package internal + +import ( + "context" + + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/setup/config" +) + +// PushserverInternalAPI implements api.PushserverInternalAPI +type PushserverInternalAPI struct { + DB storage.Database + Cfg *config.PushServer +} + +func NewPushserverAPI( + cfg *config.PushServer, pushserverDB storage.Database, +) *PushserverInternalAPI { + a := &PushserverInternalAPI{ + DB: pushserverDB, + Cfg: cfg, + } + return a +} + +// SetRoomAlias implements RoomserverAliasAPI +func (p *PushserverInternalAPI) QueryExample( + ctx context.Context, + request *api.QueryExampleRequest, + response *api.QueryExampleResponse, +) error { + // Implement QueryExample here! + + return nil +} diff --git a/pushserver/inthttp/client.go b/pushserver/inthttp/client.go new file mode 100644 index 00000000..f34d88e4 --- /dev/null +++ b/pushserver/inthttp/client.go @@ -0,0 +1,48 @@ +package inthttp + +import ( + "context" + "errors" + "net/http" + + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/opentracing/opentracing-go" +) + +type httpPushserverInternalAPI struct { + roomserverURL string + httpClient *http.Client +} + +const ( + PushserverQueryExamplePath = "/pushserver/queryExample" +) + +// NewRoomserverClient creates a PushserverInternalAPI implemented by talking to a HTTP POST API. +// If httpClient is nil an error is returned +func NewPushserverClient( + pushserverURL string, + httpClient *http.Client, +) (api.PushserverInternalAPI, error) { + if httpClient == nil { + return nil, errors.New("NewPushserverClient: httpClient is ") + } + return &httpPushserverInternalAPI{ + roomserverURL: pushserverURL, + httpClient: httpClient, + }, nil +} + +// SetRoomAlias implements RoomserverAliasAPI +func (h *httpPushserverInternalAPI) QueryExample( + ctx context.Context, + request *api.QueryExampleRequest, + response *api.QueryExampleResponse, +) error { + span, ctx := opentracing.StartSpanFromContext(ctx, "QueryExample") + defer span.Finish() + + apiURL := h.roomserverURL + PushserverQueryExamplePath + return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response) +} diff --git a/pushserver/inthttp/server.go b/pushserver/inthttp/server.go new file mode 100644 index 00000000..18238354 --- /dev/null +++ b/pushserver/inthttp/server.go @@ -0,0 +1,29 @@ +package inthttp + +import ( + "encoding/json" + "net/http" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/internal/httputil" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/util" +) + +// AddRoutes adds the RoomserverInternalAPI handlers to the http.ServeMux. +// nolint: gocyclo +func AddRoutes(r api.PushserverInternalAPI, internalAPIMux *mux.Router) { + internalAPIMux.Handle(PushserverQueryExamplePath, + httputil.MakeInternalAPI("queryExample", func(req *http.Request) util.JSONResponse { + var request api.QueryExampleRequest + var response api.QueryExampleResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := r.QueryExample(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) +} diff --git a/pushserver/pushserver.go b/pushserver/pushserver.go new file mode 100644 index 00000000..ce0f47de --- /dev/null +++ b/pushserver/pushserver.go @@ -0,0 +1,47 @@ +package pushserver + +import ( + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/pushserver/api" + "github.com/matrix-org/dendrite/pushserver/consumers" + "github.com/matrix-org/dendrite/pushserver/internal" + "github.com/matrix-org/dendrite/pushserver/inthttp" + "github.com/matrix-org/dendrite/pushserver/storage" + "github.com/matrix-org/dendrite/setup" + "github.com/matrix-org/dendrite/setup/kafka" + "github.com/sirupsen/logrus" +) + +// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions +// on the given input API. +func AddInternalRoutes(router *mux.Router, intAPI api.PushserverInternalAPI) { + inthttp.AddRoutes(intAPI, router) +} + +// NewInternalAPI returns a concerete implementation of the internal API. Callers +// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes. +func NewInternalAPI( + base *setup.BaseDendrite, +) api.PushserverInternalAPI { + cfg := &base.Cfg.PushServer + + consumer, _ := kafka.SetupConsumerProducer(&cfg.Matrix.Kafka) + + pushserverDB, err := storage.Open(&cfg.Database) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to room server db") + } + + psAPI := internal.NewPushserverAPI( + cfg, pushserverDB, + ) + + rsConsumer := consumers.NewOutputRoomEventConsumer( + base.ProcessContext, cfg, consumer, pushserverDB, psAPI, + ) + if err := rsConsumer.Start(); err != nil { + logrus.WithError(err).Panic("failed to start room server consumer") + } + + return psAPI +} diff --git a/pushserver/storage/interface.go b/pushserver/storage/interface.go new file mode 100644 index 00000000..0f8c7314 --- /dev/null +++ b/pushserver/storage/interface.go @@ -0,0 +1,7 @@ +package storage + +import "github.com/matrix-org/dendrite/internal" + +type Database interface { + internal.PartitionStorer +} diff --git a/pushserver/storage/postgres/storage.go b/pushserver/storage/postgres/storage.go new file mode 100644 index 00000000..f37c1824 --- /dev/null +++ b/pushserver/storage/postgres/storage.go @@ -0,0 +1,31 @@ +package postgres + +import ( + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/pushserver/storage/shared" + "github.com/matrix-org/dendrite/setup/config" +) + +type Database struct { + shared.Database + sqlutil.PartitionOffsetStatements +} + +func Open(dbProperties *config.DatabaseOptions) (*Database, error) { + var d Database + var err error + if d.DB, err = sqlutil.Open(dbProperties); err != nil { + return nil, fmt.Errorf("sqlutil.Open: %w", err) + } + d.Writer = sqlutil.NewDummyWriter() + + if err = d.PartitionOffsetStatements.Prepare(d.DB, d.Writer, "pushserver"); err != nil { + return nil, err + } + + // Create the tables. + + return &d, nil +} diff --git a/pushserver/storage/shared/storage.go b/pushserver/storage/shared/storage.go new file mode 100644 index 00000000..d1fceb65 --- /dev/null +++ b/pushserver/storage/shared/storage.go @@ -0,0 +1,12 @@ +package shared + +import ( + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" +) + +type Database struct { + DB *sql.DB + Writer sqlutil.Writer +} diff --git a/pushserver/storage/sqlite3/storage.go b/pushserver/storage/sqlite3/storage.go new file mode 100644 index 00000000..8d387f51 --- /dev/null +++ b/pushserver/storage/sqlite3/storage.go @@ -0,0 +1,31 @@ +package sqlite3 + +import ( + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/pushserver/storage/shared" + "github.com/matrix-org/dendrite/setup/config" +) + +type Database struct { + shared.Database + sqlutil.PartitionOffsetStatements +} + +func Open(dbProperties *config.DatabaseOptions) (*Database, error) { + var d Database + var err error + if d.DB, err = sqlutil.Open(dbProperties); err != nil { + return nil, fmt.Errorf("sqlutil.Open: %w", err) + } + d.Writer = sqlutil.NewExclusiveWriter() + + if err = d.PartitionOffsetStatements.Prepare(d.DB, d.Writer, "pushserver"); err != nil { + return nil, err + } + + // Create the tables. + + return &d, nil +} diff --git a/pushserver/storage/storage.go b/pushserver/storage/storage.go new file mode 100644 index 00000000..7a537627 --- /dev/null +++ b/pushserver/storage/storage.go @@ -0,0 +1,23 @@ +// +build !wasm + +package storage + +import ( + "fmt" + + "github.com/matrix-org/dendrite/pushserver/storage/postgres" + "github.com/matrix-org/dendrite/pushserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/config" +) + +// Open opens a database connection. +func Open(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.Open(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return postgres.Open(dbProperties) + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/pushserver/storage/storage_wasm.go b/pushserver/storage/storage_wasm.go new file mode 100644 index 00000000..f3ed8673 --- /dev/null +++ b/pushserver/storage/storage_wasm.go @@ -0,0 +1,20 @@ +package storage + +import ( + "fmt" + + "github.com/matrix-org/dendrite/pushserver/storage/sqlite3" + "github.com/matrix-org/dendrite/setup/config" +) + +// NewDatabase opens a new database +func Open(dbProperties *config.DatabaseOptions) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.Open(dbProperties) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/pushserver/storage/tables/interface.go b/pushserver/storage/tables/interface.go new file mode 100644 index 00000000..84653cd4 --- /dev/null +++ b/pushserver/storage/tables/interface.go @@ -0,0 +1 @@ +package tables diff --git a/setup/base.go b/setup/base.go index 6bdeb80f..2cd02ddd 100644 --- a/setup/base.go +++ b/setup/base.go @@ -51,6 +51,8 @@ import ( fsinthttp "github.com/matrix-org/dendrite/federationsender/inthttp" keyserverAPI "github.com/matrix-org/dendrite/keyserver/api" keyinthttp "github.com/matrix-org/dendrite/keyserver/inthttp" + pushserverAPI "github.com/matrix-org/dendrite/pushserver/api" + psinthttp "github.com/matrix-org/dendrite/pushserver/inthttp" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" rsinthttp "github.com/matrix-org/dendrite/roomserver/inthttp" "github.com/matrix-org/dendrite/setup/config" @@ -277,6 +279,15 @@ func (b *BaseDendrite) KeyServerHTTPClient() keyserverAPI.KeyInternalAPI { return f } +// PushServerHTTPClient returns PushserverInternalAPI for hitting the push server over HTTP +func (b *BaseDendrite) PushServerHTTPClient() pushserverAPI.PushserverInternalAPI { + f, err := psinthttp.NewPushserverClient(b.Cfg.PushServerURL(), b.apiHttpClient) + if err != nil { + logrus.WithError(err).Panic("PushServerHTTPClient failed", b.apiHttpClient) + } + return f +} + // CreateAccountsDB creates a new instance of the accounts database. Should only // be called once per component. func (b *BaseDendrite) CreateAccountsDB() accounts.Database { diff --git a/setup/config/config.go b/setup/config/config.go index b9114407..40c62a64 100644 --- a/setup/config/config.go +++ b/setup/config/config.go @@ -65,6 +65,7 @@ type Dendrite struct { SigningKeyServer SigningKeyServer `yaml:"signing_key_server"` SyncAPI SyncAPI `yaml:"sync_api"` UserAPI UserAPI `yaml:"user_api"` + PushServer PushServer `yaml:"push_server"` MSCs MSCs `yaml:"mscs"` @@ -308,6 +309,7 @@ func (c *Dendrite) Defaults() { c.SyncAPI.Defaults() c.UserAPI.Defaults() c.AppServiceAPI.Defaults() + c.PushServer.Defaults() c.MSCs.Defaults() c.Wiring() @@ -340,6 +342,7 @@ func (c *Dendrite) Wiring() { c.SyncAPI.Matrix = &c.Global c.UserAPI.Matrix = &c.Global c.AppServiceAPI.Matrix = &c.Global + c.PushServer.Matrix = &c.Global c.MSCs.Matrix = &c.Global c.ClientAPI.Derived = &c.Derived @@ -547,6 +550,15 @@ func (config *Dendrite) KeyServerURL() string { return string(config.KeyServer.InternalAPI.Connect) } +// PushServerURL returns an HTTP URL for where the push server is listening. +func (config *Dendrite) PushServerURL() string { + // Hard code the push server to talk HTTP for now. + // If we support HTTPS we need to think of a practical way to do certificate validation. + // People setting up servers shouldn't need to get a certificate valid for the public + // internet for an internal API. + return string(config.PushServer.InternalAPI.Connect) +} + // SetupTracing configures the opentracing using the supplied configuration. func (config *Dendrite) SetupTracing(serviceName string) (closer io.Closer, err error) { if !config.Tracing.Enabled { diff --git a/setup/config/config_pushserver.go b/setup/config/config_pushserver.go new file mode 100644 index 00000000..08ac08f7 --- /dev/null +++ b/setup/config/config_pushserver.go @@ -0,0 +1,22 @@ +package config + +type PushServer struct { + Matrix *Global `yaml:"-"` + + InternalAPI InternalAPIOptions `yaml:"internal_api"` + + Database DatabaseOptions `yaml:"database"` +} + +func (c *PushServer) Defaults() { + c.InternalAPI.Listen = "http://localhost:7783" + c.InternalAPI.Connect = "http://localhost:7783" + c.Database.Defaults(10) + c.Database.ConnectionString = "file:pushserver.db" +} + +func (c *PushServer) Verify(configErrs *ConfigErrors, isMonolith bool) { + checkURL(configErrs, "room_server.internal_api.listen", string(c.InternalAPI.Listen)) + checkURL(configErrs, "room_server.internal_ap.bind", string(c.InternalAPI.Connect)) + checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString)) +} diff --git a/setup/monolith.go b/setup/monolith.go index a740ebb7..be2e1ead 100644 --- a/setup/monolith.go +++ b/setup/monolith.go @@ -25,6 +25,7 @@ import ( "github.com/matrix-org/dendrite/internal/transactions" keyAPI "github.com/matrix-org/dendrite/keyserver/api" "github.com/matrix-org/dendrite/mediaapi" + pushserverAPI "github.com/matrix-org/dendrite/pushserver/api" roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/setup/process" @@ -51,6 +52,7 @@ type Monolith struct { ServerKeyAPI serverKeyAPI.SigningKeyServerAPI UserAPI userapi.UserInternalAPI KeyAPI keyAPI.KeyInternalAPI + PushserverAPI pushserverAPI.PushserverInternalAPI // Optional ExtPublicRoomsProvider api.ExtraPublicRoomsProvider