Global database connection pool (for monolith mode) (#2411)

* Allow monolith components to share a single database pool

* Don't yell about missing connection strings

* Rename field

* Setup tweaks

* Fix panic

* Improve configuration checks

* Update config

* Fix lint errors

* Update comments
This commit is contained in:
Neil Alexander 2022-05-03 16:35:06 +01:00 committed by GitHub
parent 979a551f1e
commit 4ad5f9c982
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
71 changed files with 345 additions and 240 deletions

View file

@ -17,6 +17,7 @@ package base
import (
"context"
"crypto/tls"
"database/sql"
"fmt"
"io"
"net"
@ -32,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/internal/pushgateway"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/atomic"
@ -40,7 +42,6 @@ import (
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/setup/process"
userdb "github.com/matrix-org/dendrite/userapi/storage"
"github.com/gorilla/mux"
"github.com/kardianos/minwinsvc"
@ -81,6 +82,8 @@ type BaseDendrite struct {
Cfg *config.Dendrite
Caches *caching.Caches
DNSCache *gomatrixserverlib.DNSCache
Database *sql.DB
DatabaseWriter sqlutil.Writer
}
const NoListener = ""
@ -112,7 +115,8 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
}
configErrors := &config.ConfigErrors{}
cfg.Verify(configErrors, componentName == "Monolith") // TODO: better way?
isMonolith := componentName == "Monolith" // TODO: better way?
cfg.Verify(configErrors, isMonolith)
if len(*configErrors) > 0 {
for _, err := range *configErrors {
logrus.Errorf("Configuration error: %s", err)
@ -185,6 +189,24 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
},
}
// If we're in monolith mode, we'll set up a global pool of database
// connections. A component is welcome to use this pool if they don't
// have a separate database config of their own.
var db *sql.DB
var writer sqlutil.Writer
if cfg.Global.DatabaseOptions.ConnectionString != "" {
if !isMonolith {
logrus.Panic("Using a global database connection pool is not supported in polylith deployments")
}
if cfg.Global.DatabaseOptions.ConnectionString.IsSQLite() {
logrus.Panic("Using a global database connection pool is not supported with SQLite databases")
}
if db, err = sqlutil.Open(&cfg.Global.DatabaseOptions, sqlutil.NewDummyWriter()); err != nil {
logrus.WithError(err).Panic("Failed to set up global database connections")
}
logrus.Debug("Using global database connection pool")
}
// Ideally we would only use SkipClean on routes which we know can allow '/' but due to
// https://github.com/gorilla/mux/issues/460 we have to attach this at the top router.
// When used in conjunction with UseEncodedPath() we get the behaviour we want when parsing
@ -214,6 +236,8 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
DendriteAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.DendriteAdminPathPrefix).Subrouter().UseEncodedPath(),
SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.SynapseAdminPathPrefix).Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient,
Database: db, // set if monolith with global connection pool only
DatabaseWriter: writer, // set if monolith with global connection pool only
}
}
@ -222,6 +246,29 @@ func (b *BaseDendrite) Close() error {
return b.tracerCloser.Close()
}
// DatabaseConnection assists in setting up a database connection. It accepts
// the database properties and a new writer for the given component. If we're
// running in monolith mode with a global connection pool configured then we
// will return that connection, along with the global writer, effectively
// ignoring the options provided. Otherwise we'll open a new database connection
// using the supplied options and writer. Note that it's possible for the pointer
// receiver to be nil here that's deliberate as some of the unit tests don't
// have a BaseDendrite and just want a connection with the supplied config
// without any pooling stuff.
func (b *BaseDendrite) DatabaseConnection(dbProperties *config.DatabaseOptions, writer sqlutil.Writer) (*sql.DB, sqlutil.Writer, error) {
if dbProperties.ConnectionString != "" || b == nil {
// Open a new database connection using the supplied config.
db, err := sqlutil.Open(dbProperties, writer)
return db, writer, err
}
if b.Database != nil && b.DatabaseWriter != nil {
// Ignore the supplied config and return the global pool and
// writer.
return b.Database, b.DatabaseWriter, nil
}
return nil, nil, fmt.Errorf("no database connections configured")
}
// AppserviceHTTPClient returns the AppServiceQueryAPI for hitting the appservice component over HTTP.
func (b *BaseDendrite) AppserviceHTTPClient() appserviceAPI.AppServiceQueryAPI {
a, err := asinthttp.NewAppserviceClient(b.Cfg.AppServiceURL(), b.apiHttpClient)
@ -273,24 +320,6 @@ func (b *BaseDendrite) PushGatewayHTTPClient() pushgateway.Client {
return pushgateway.NewHTTPClient(b.Cfg.UserAPI.PushGatewayDisableTLSValidation)
}
// CreateAccountsDB creates a new instance of the accounts database. Should only
// be called once per component.
func (b *BaseDendrite) CreateAccountsDB() userdb.Database {
db, err := userdb.NewUserAPIDatabase(
&b.Cfg.UserAPI.AccountDatabase,
b.Cfg.Global.ServerName,
b.Cfg.UserAPI.BCryptCost,
b.Cfg.UserAPI.OpenIDTokenLifetimeMS,
userapi.DefaultLoginTokenLifetime,
b.Cfg.Global.ServerNotices.LocalPart,
)
if err != nil {
logrus.WithError(err).Panicf("failed to connect to accounts db")
}
return db
}
// CreateClient creates a new client (normally used for media fetch requests).
// Should only be called once per component.
func (b *BaseDendrite) CreateClient() *gomatrixserverlib.Client {

View file

@ -52,7 +52,9 @@ func (c *AppServiceAPI) Defaults(generate bool) {
func (c *AppServiceAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "app_service_api.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "app_service_api.internal_api.bind", string(c.InternalAPI.Connect))
checkNotEmpty(configErrs, "app_service_api.database.connection_string", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "app_service_api.database.connection_string", string(c.Database.ConnectionString))
}
}
// ApplicationServiceNamespace is the namespace that a specific application

View file

@ -49,7 +49,9 @@ func (c *FederationAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
if !isMonolith {
checkURL(configErrs, "federation_api.external_api.listen", string(c.ExternalAPI.Listen))
}
checkNotEmpty(configErrs, "federation_api.database.connection_string", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "federation_api.database.connection_string", string(c.Database.ConnectionString))
}
}
// The config for setting a proxy to use for server->server requests

View file

@ -34,6 +34,13 @@ type Global struct {
// Defaults to 24 hours.
KeyValidityPeriod time.Duration `yaml:"key_validity_period"`
// Global pool of database connections, which is used only in monolith mode. If a
// component does not specify any database options of its own, then this pool of
// connections will be used instead. This way we don't have to manage connection
// counts on a per-component basis, but can instead do it for the entire monolith.
// In a polylith deployment, this will be ignored.
DatabaseOptions DatabaseOptions `yaml:"database"`
// The server name to delegate server-server communications to, with optional port
WellKnownServerName string `yaml:"well_known_server_name"`

View file

@ -20,5 +20,7 @@ func (c *KeyServer) Defaults(generate bool) {
func (c *KeyServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "key_server.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "key_server.internal_api.bind", string(c.InternalAPI.Connect))
checkNotEmpty(configErrs, "key_server.database.connection_string", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "key_server.database.connection_string", string(c.Database.ConnectionString))
}
}

View file

@ -58,7 +58,9 @@ func (c *MediaAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
if !isMonolith {
checkURL(configErrs, "media_api.external_api.listen", string(c.ExternalAPI.Listen))
}
checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "media_api.database.connection_string", string(c.Database.ConnectionString))
}
checkNotEmpty(configErrs, "media_api.base_path", string(c.BasePath))
checkPositive(configErrs, "media_api.max_file_size_bytes", int64(c.MaxFileSizeBytes))

View file

@ -31,5 +31,7 @@ func (c *MSCs) Enabled(msc string) bool {
}
func (c *MSCs) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkNotEmpty(configErrs, "mscs.database.connection_string", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "mscs.database.connection_string", string(c.Database.ConnectionString))
}
}

View file

@ -20,5 +20,7 @@ func (c *RoomServer) Defaults(generate bool) {
func (c *RoomServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "room_server.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "room_server.internal_ap.bind", string(c.InternalAPI.Connect))
checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "room_server.database.connection_string", string(c.Database.ConnectionString))
}
}

View file

@ -27,5 +27,7 @@ func (c *SyncAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
if !isMonolith {
checkURL(configErrs, "sync_api.external_api.listen", string(c.ExternalAPI.Listen))
}
checkNotEmpty(configErrs, "sync_api.database", string(c.Database.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "sync_api.database", string(c.Database.ConnectionString))
}
}

View file

@ -37,6 +37,8 @@ func (c *UserAPI) Defaults(generate bool) {
func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "user_api.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "user_api.internal_api.connect", string(c.InternalAPI.Connect))
checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString))
if c.Matrix.DatabaseOptions.ConnectionString == "" {
checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString))
}
checkPositive(configErrs, "user_api.openid_token_lifetime_ms", c.OpenIDTokenLifetimeMS)
}

View file

@ -25,11 +25,10 @@ import (
keyAPI "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/mediaapi"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
"github.com/matrix-org/dendrite/syncapi"
userapi "github.com/matrix-org/dendrite/userapi/api"
userdb "github.com/matrix-org/dendrite/userapi/storage"
"github.com/matrix-org/gomatrixserverlib"
)
@ -37,7 +36,6 @@ import (
// all components of Dendrite, for use in monolith mode.
type Monolith struct {
Config *config.Dendrite
AccountDB userdb.Database
KeyRing *gomatrixserverlib.KeyRing
Client *gomatrixserverlib.Client
FedClient *gomatrixserverlib.FederationClient
@ -54,26 +52,28 @@ type Monolith struct {
}
// AddAllPublicRoutes attaches all public paths to the given router
func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux, dendriteMux *mux.Router) {
func (m *Monolith) AddAllPublicRoutes(base *base.BaseDendrite, csMux, ssMux, keyMux, wkMux, mediaMux, synapseMux, dendriteMux *mux.Router) {
userDirectoryProvider := m.ExtUserDirectoryProvider
if userDirectoryProvider == nil {
userDirectoryProvider = m.UserAPI
}
clientapi.AddPublicRoutes(
process, csMux, synapseMux, dendriteMux, &m.Config.ClientAPI,
m.FedClient, m.RoomserverAPI,
m.AppserviceAPI, transactions.New(),
base.ProcessContext, csMux, synapseMux, dendriteMux, &m.Config.ClientAPI,
m.FedClient, m.RoomserverAPI, m.AppserviceAPI, transactions.New(),
m.FederationAPI, m.UserAPI, userDirectoryProvider, m.KeyAPI,
m.ExtPublicRoomsProvider, &m.Config.MSCs,
)
federationapi.AddPublicRoutes(
process, ssMux, keyMux, wkMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationAPI,
base.ProcessContext, ssMux, keyMux, wkMux, &m.Config.FederationAPI,
m.UserAPI, m.FedClient, m.KeyRing, m.RoomserverAPI, m.FederationAPI,
m.KeyAPI, &m.Config.MSCs, nil,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client)
mediaapi.AddPublicRoutes(
base, mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting,
m.UserAPI, m.Client,
)
syncapi.AddPublicRoutes(
process, csMux, m.UserAPI, m.RoomserverAPI,
base, csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}

View file

@ -102,7 +102,7 @@ func Enable(
base *base.BaseDendrite, rsAPI roomserver.RoomserverInternalAPI, fsAPI fs.FederationInternalAPI,
userAPI userapi.UserInternalAPI, keyRing gomatrixserverlib.JSONVerifier,
) error {
db, err := NewDatabase(&base.Cfg.MSCs.Database)
db, err := NewDatabase(base, &base.Cfg.MSCs.Database)
if err != nil {
return fmt.Errorf("cannot enable MSC2836: %w", err)
}

View file

@ -8,6 +8,7 @@ import (
"encoding/json"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/setup/base"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -58,19 +59,17 @@ type DB struct {
}
// NewDatabase loads the database for msc2836
func NewDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
func NewDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) {
if dbOpts.ConnectionString.IsPostgres() {
return newPostgresDatabase(dbOpts)
return newPostgresDatabase(base, dbOpts)
}
return newSQLiteDatabase(dbOpts)
return newSQLiteDatabase(base, dbOpts)
}
func newPostgresDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
d := DB{
writer: sqlutil.NewDummyWriter(),
}
func newPostgresDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) {
d := DB{}
var err error
if d.db, err = sqlutil.Open(dbOpts); err != nil {
if d.db, d.writer, err = base.DatabaseConnection(dbOpts, sqlutil.NewDummyWriter()); err != nil {
return nil, err
}
_, err = d.db.Exec(`
@ -145,12 +144,10 @@ func newPostgresDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
return &d, err
}
func newSQLiteDatabase(dbOpts *config.DatabaseOptions) (Database, error) {
d := DB{
writer: sqlutil.NewExclusiveWriter(),
}
func newSQLiteDatabase(base *base.BaseDendrite, dbOpts *config.DatabaseOptions) (Database, error) {
d := DB{}
var err error
if d.db, err = sqlutil.Open(dbOpts); err != nil {
if d.db, d.writer, err = base.DatabaseConnection(dbOpts, sqlutil.NewExclusiveWriter()); err != nil {
return nil, err
}
_, err = d.db.Exec(`