From dd8b05c310e85a9e2c4279071cc0a3e17ec2a64a Mon Sep 17 00:00:00 2001 From: Dan Peleg Date: Fri, 23 Apr 2021 21:28:38 +0300 Subject: [PATCH] Initial storage for Implementation of push notification --- build/docker/config/dendrite-config.yaml | 5 + build/gobind/monolith.go | 1 + cmd/dendrite-demo-yggdrasil/main.go | 1 + cmd/dendritejs/main.go | 1 + dendrite-config.yaml | 5 + internal/test/config.go | 1 + setup/config/config_test.go | 5 + setup/config/config_userapi.go | 6 + userapi/api/api.go | 31 +++++ userapi/internal/api.go | 18 +++ userapi/storage/pushers/interface.go | 25 ++++ .../storage/pushers/postgres/pushers_table.go | 128 ++++++++++++++++++ userapi/storage/pushers/postgres/storage.go | 63 +++++++++ .../storage/pushers/sqlite3/pushers_table.go | 99 ++++++++++++++ userapi/storage/pushers/sqlite3/storage.go | 65 +++++++++ userapi/storage/pushers/storage.go | 39 ++++++ userapi/storage/pushers/storage_wasm.go | 37 +++++ userapi/userapi.go | 7 + 18 files changed, 537 insertions(+) create mode 100644 userapi/storage/pushers/interface.go create mode 100644 userapi/storage/pushers/postgres/pushers_table.go create mode 100644 userapi/storage/pushers/postgres/storage.go create mode 100644 userapi/storage/pushers/sqlite3/pushers_table.go create mode 100644 userapi/storage/pushers/sqlite3/storage.go create mode 100644 userapi/storage/pushers/storage.go create mode 100644 userapi/storage/pushers/storage_wasm.go diff --git a/build/docker/config/dendrite-config.yaml b/build/docker/config/dendrite-config.yaml index ffcf6a45..692c3be6 100644 --- a/build/docker/config/dendrite-config.yaml +++ b/build/docker/config/dendrite-config.yaml @@ -318,6 +318,11 @@ user_api: max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 + pusher_database: + connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_userapi_pushers?sslmode=disable + max_open_conns: 10 + max_idle_conns: 2 + conn_max_lifetime: -1 # Configuration for Opentracing. # See https://github.com/matrix-org/dendrite/tree/master/docs/tracing for information on diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index 332d156b..db324e02 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -91,6 +91,7 @@ func (m *DendriteMonolith) Start() { cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-naffka.db", m.StorageDirectory)) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-account.db", m.StorageDirectory)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-device.db", m.StorageDirectory)) + cfg.UserAPI.PusherDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-pusher.db", m.StorageDirectory)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-mediaapi.db", m.StorageDirectory)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-syncapi.db", m.StorageDirectory)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-roomserver.db", m.StorageDirectory)) diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 2a4a335a..7815a79b 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -75,6 +75,7 @@ func main() { cfg.Global.Kafka.UseNaffka = true cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-account.db", *instanceName)) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-device.db", *instanceName)) + cfg.UserAPI.PusherDatabase.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-pusher.db", *instanceName)) cfg.MediaAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-mediaapi.db", *instanceName)) cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-syncapi.db", *instanceName)) cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-roomserver.db", *instanceName)) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 0dfa4681..92724d1a 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -167,6 +167,7 @@ func main() { cfg.UserAPI.AccountDatabase.ConnectionString = "file:/idb/dendritejs_account.db" cfg.AppServiceAPI.Database.ConnectionString = "file:/idb/dendritejs_appservice.db" cfg.UserAPI.DeviceDatabase.ConnectionString = "file:/idb/dendritejs_device.db" + cfg.UserAPI.PusherDatabase.ConnectionString = "file:/idb/dendritejs_pusher.db" cfg.FederationSender.Database.ConnectionString = "file:/idb/dendritejs_fedsender.db" cfg.MediaAPI.Database.ConnectionString = "file:/idb/dendritejs_mediaapi.db" cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db" diff --git a/dendrite-config.yaml b/dendrite-config.yaml index 0ea584aa..50a4c74c 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -361,6 +361,11 @@ user_api: max_open_conns: 10 max_idle_conns: 2 conn_max_lifetime: -1 + pusher_database: + connection_string: file:userapi_pushers.db + max_open_conns: 10 + max_idle_conns: 2 + conn_max_lifetime: -1 # The length of time that a token issued for a relying party from # /_matrix/client/r0/user/{userId}/openid/request_token endpoint # is considered to be valid in milliseconds. diff --git a/internal/test/config.go b/internal/test/config.go index 7e68d6d2..aff80231 100644 --- a/internal/test/config.go +++ b/internal/test/config.go @@ -96,6 +96,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con cfg.SyncAPI.Database.ConnectionString = config.DataSource(database) cfg.UserAPI.AccountDatabase.ConnectionString = config.DataSource(database) cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(database) + cfg.UserAPI.PusherDatabase.ConnectionString = config.DataSource(database) cfg.AppServiceAPI.InternalAPI.Listen = assignAddress() cfg.EDUServer.InternalAPI.Listen = assignAddress() diff --git a/setup/config/config_test.go b/setup/config/config_test.go index 4107b684..a2850a6b 100644 --- a/setup/config/config_test.go +++ b/setup/config/config_test.go @@ -204,6 +204,11 @@ user_api: max_open_conns: 100 max_idle_conns: 2 conn_max_lifetime: -1 + pusher_database: + connection_string: file:userapi_pushers.db + max_open_conns: 100 + max_idle_conns: 2 + conn_max_lifetime: -1 tracing: enabled: false jaeger: diff --git a/setup/config/config_userapi.go b/setup/config/config_userapi.go index 2bf1be3d..8a17714a 100644 --- a/setup/config/config_userapi.go +++ b/setup/config/config_userapi.go @@ -19,6 +19,9 @@ type UserAPI struct { // The Device database stores session information for the devices of logged // in local users. It is accessed by the UserAPI. DeviceDatabase DatabaseOptions `yaml:"device_database"` + // The Pusher database stores user's push notification for the devices of logged + // in local users. It is accessed by the UserAPI. + PusherDatabase DatabaseOptions `yaml:"pusher_database"` } const DefaultOpenIDTokenLifetimeMS = 3600000 // 60 minutes @@ -28,8 +31,10 @@ func (c *UserAPI) Defaults() { c.InternalAPI.Connect = "http://localhost:7781" c.AccountDatabase.Defaults(10) c.DeviceDatabase.Defaults(10) + c.PusherDatabase.Defaults(10) c.AccountDatabase.ConnectionString = "file:userapi_accounts.db" c.DeviceDatabase.ConnectionString = "file:userapi_devices.db" + c.PusherDatabase.ConnectionString = "file:userapi_pushers.db" c.BCryptCost = bcrypt.DefaultCost c.OpenIDTokenLifetimeMS = DefaultOpenIDTokenLifetimeMS } @@ -39,5 +44,6 @@ func (c *UserAPI) Verify(configErrs *ConfigErrors, isMonolith bool) { checkURL(configErrs, "user_api.internal_api.connect", string(c.InternalAPI.Connect)) checkNotEmpty(configErrs, "user_api.account_database.connection_string", string(c.AccountDatabase.ConnectionString)) checkNotEmpty(configErrs, "user_api.device_database.connection_string", string(c.DeviceDatabase.ConnectionString)) + checkNotEmpty(configErrs, "user_api.pusher_database.connection_string", string(c.PusherDatabase.ConnectionString)) checkPositive(configErrs, "user_api.openid_token_lifetime_ms", c.OpenIDTokenLifetimeMS) } diff --git a/userapi/api/api.go b/userapi/api/api.go index 40735012..642225c2 100644 --- a/userapi/api/api.go +++ b/userapi/api/api.go @@ -36,6 +36,7 @@ type UserInternalAPI interface { QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error QueryDevices(ctx context.Context, req *QueryDevicesRequest, res *QueryDevicesResponse) error + QueryPushers(ctx context.Context, req *QueryPushersRequest, res *QueryPushersResponse) error QueryAccountData(ctx context.Context, req *QueryAccountDataRequest, res *QueryAccountDataResponse) error QueryDeviceInfos(ctx context.Context, req *QueryDeviceInfosRequest, res *QueryDeviceInfosResponse) error QuerySearchProfiles(ctx context.Context, req *QuerySearchProfilesRequest, res *QuerySearchProfilesResponse) error @@ -128,6 +129,17 @@ type QueryDevicesResponse struct { Devices []Device } +// QueryPushersRequest is the request for QueryPushers +type QueryPushersRequest struct { + UserID string +} + +// QueryPushersResponse is the response for QueryPushers +type QueryPushersResponse struct { + UserExists bool + Pushers []Pusher +} + // QueryProfileRequest is the request for QueryProfile type QueryProfileRequest struct { // The user ID to query @@ -269,6 +281,25 @@ type Device struct { AppserviceID string } +// Pusher represents a push notification subscriber +type Pusher struct { + ID string + UserID string + PushKey string + Kind string + AppID string + AppDisplayName string + DeviceDisplayName string + ProfileTag string + Language string + Data PusherData +} + +type PusherData struct { + URL string + Format string +} + // Account represents a Matrix account on this home server. type Account struct { UserID string diff --git a/userapi/internal/api.go b/userapi/internal/api.go index 21933c1c..e0a00fc9 100644 --- a/userapi/internal/api.go +++ b/userapi/internal/api.go @@ -29,6 +29,7 @@ import ( "github.com/matrix-org/dendrite/userapi/api" "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/devices" + "github.com/matrix-org/dendrite/userapi/storage/pushers" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" "github.com/sirupsen/logrus" @@ -37,6 +38,7 @@ import ( type UserInternalAPI struct { AccountDB accounts.Database DeviceDB devices.Database + PusherDB pushers.Database ServerName gomatrixserverlib.ServerName // AppServices is the list of all registered AS AppServices []config.ApplicationService @@ -306,6 +308,22 @@ func (a *UserInternalAPI) QueryDevices(ctx context.Context, req *api.QueryDevice return nil } +func (a *UserInternalAPI) QueryPushers(ctx context.Context, req *api.QueryPushersRequest, res *api.QueryPushersResponse) error { + local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) + if err != nil { + return err + } + if domain != a.ServerName { + return fmt.Errorf("cannot query pushers of remote users: got %s want %s", domain, a.ServerName) + } + pushers, err := a.PusherDB.GetPushersByLocalpart(ctx, local) + if err != nil { + return err + } + res.Pushers = pushers + return nil +} + func (a *UserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAccountDataRequest, res *api.QueryAccountDataResponse) error { local, domain, err := gomatrixserverlib.SplitID('@', req.UserID) if err != nil { diff --git a/userapi/storage/pushers/interface.go b/userapi/storage/pushers/interface.go new file mode 100644 index 00000000..e54cf114 --- /dev/null +++ b/userapi/storage/pushers/interface.go @@ -0,0 +1,25 @@ +// Copyright 2021 Dan Peleg +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pushers + +import ( + "context" + + "github.com/matrix-org/dendrite/userapi/api" +) + +type Database interface { + GetPushersByLocalpart(ctx context.Context, localpart string) ([]api.Pusher, error) +} diff --git a/userapi/storage/pushers/postgres/pushers_table.go b/userapi/storage/pushers/postgres/pushers_table.go new file mode 100644 index 00000000..872b27dd --- /dev/null +++ b/userapi/storage/pushers/postgres/pushers_table.go @@ -0,0 +1,128 @@ +// Copyright 2021 Dan Peleg +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/dendrite/internal" + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +const pushersSchema = ` +-- Stores data about pushers. +CREATE TABLE IF NOT EXISTS pusher_pushers ( + -- The Matrix user ID localpart for this pusher + localpart TEXT NOT NULL PRIMARY KEY, + -- The push key for this pusher + pushkey TEXT, + -- The pusher kind + kind TEXT, + -- The pusher Application ID + app_id TEXT, + -- The pusher application display name, human friendlier than app_id and updatable + app_display_name TEXT, + -- The pusher device display name, + device_display_name TEXT, + -- The pusher profile tag, + profile_tag TEXT, + -- The pusher preferred language, + language TEXT, +); + +-- Pusher IDs must be unique for a given user. +CREATE UNIQUE INDEX IF NOT EXISTS pusher_localpart_id_idx ON pusher_pushers(localpart, pusher_id); +` + +const selectPushersByLocalpartSQL = "" + + "SELECT pusher_id, display_name, last_seen_ts, ip, user_agent FROM pusher_pushers WHERE localpart = $1 AND pusher_id != $2" + +type pushersStatements struct { + selectPushersByLocalpartStmt *sql.Stmt + serverName gomatrixserverlib.ServerName +} + +func (s *pushersStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(pushersSchema) + return err +} + +func (s *pushersStatements) prepare(db *sql.DB, server gomatrixserverlib.ServerName) (err error) { + if s.selectPushersByLocalpartStmt, err = db.Prepare(selectPushersByLocalpartSQL); err != nil { + return + } + s.serverName = server + return +} + +func (s *pushersStatements) selectPushersByLocalpart( + ctx context.Context, txn *sql.Tx, localpart, exceptPusherID string, +) ([]api.Pusher, error) { + pushers := []api.Pusher{} + rows, err := sqlutil.TxStmt(txn, s.selectPushersByLocalpartStmt).QueryContext(ctx, localpart, exceptPusherID) + + if err != nil { + return pushers, err + } + defer internal.CloseAndLogIfError(ctx, rows, "selectPushersByLocalpart: rows.close() failed") + + for rows.Next() { + var pusher api.Pusher + var id, pushkey, kind, appid, appdisplayname, devicedisplayname, profiletag, language, url, format sql.NullString + err = rows.Scan(&id, &pushkey, &kind, &appid, &appdisplayname, &devicedisplayname, &profiletag, &language, &url, &format) + if err != nil { + return pushers, err + } + if id.Valid { + pusher.ID = id.String + } + if pushkey.Valid { + pusher.PushKey = pushkey.String + } + if kind.Valid { + pusher.Kind = kind.String + } + if appid.Valid { + pusher.AppID = appid.String + } + if appdisplayname.Valid { + pusher.AppDisplayName = appdisplayname.String + } + if devicedisplayname.Valid { + pusher.DeviceDisplayName = devicedisplayname.String + } + if profiletag.Valid { + pusher.ProfileTag = profiletag.String + } + if language.Valid { + pusher.Language = language.String + } + if url.Valid && format.Valid { + pusher.Data = api.PusherData{ + URL: url.String, + Format: format.String, + } + } + + pusher.UserID = userutil.MakeUserID(localpart, s.serverName) + pushers = append(pushers, pusher) + } + + return pushers, rows.Err() +} diff --git a/userapi/storage/pushers/postgres/storage.go b/userapi/storage/pushers/postgres/storage.go new file mode 100644 index 00000000..4d96afec --- /dev/null +++ b/userapi/storage/pushers/postgres/storage.go @@ -0,0 +1,63 @@ +// Copyright 2021 Dan Peleg +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package postgres + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" +) + +// Database represents a pusher database. +type Database struct { + db *sql.DB + pushers pushersStatements +} + +// NewDatabase creates a new puser database +func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName) (*Database, error) { + db, err := sqlutil.Open(dbProperties) + if err != nil { + return nil, err + } + d := pushersStatements{} + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + if err = d.execSchema(db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + + if err = d.prepare(db, serverName); err != nil { + return nil, err + } + + return &Database{db, d}, nil +} + +// GetPushersByLocalpart returns the pusers matching the given localpart. +func (d *Database) GetPushersByLocalpart( + ctx context.Context, localpart string, +) ([]api.Pusher, error) { + return d.pushers.selectPushersByLocalpart(ctx, nil, localpart, "") +} diff --git a/userapi/storage/pushers/sqlite3/pushers_table.go b/userapi/storage/pushers/sqlite3/pushers_table.go new file mode 100644 index 00000000..8ec4234c --- /dev/null +++ b/userapi/storage/pushers/sqlite3/pushers_table.go @@ -0,0 +1,99 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/userapi/api" + + "github.com/matrix-org/dendrite/clientapi/userutil" + "github.com/matrix-org/gomatrixserverlib" +) + +const pushersSchema = ` +-- This sequence is used for automatic allocation of session_id. +-- CREATE SEQUENCE IF NOT EXISTS pusher_session_id_seq START 1; + +-- Stores data about pushers. +CREATE TABLE IF NOT EXISTS pusher_pushers ( + access_token TEXT PRIMARY KEY, + session_id INTEGER, + pusher_id TEXT , + localpart TEXT , + created_ts BIGINT, + display_name TEXT, + last_seen_ts BIGINT, + ip TEXT, + user_agent TEXT, + + UNIQUE (localpart, pusher_id) +); +` +const selectPushersByLocalpartSQL = "" + + "SELECT pusher_id, display_name, last_seen_ts, ip, user_agent FROM pusher_pushers WHERE localpart = $1 AND pusher_id != $2" + +type pushersStatements struct { + db *sql.DB + writer sqlutil.Writer + selectPushersByLocalpartStmt *sql.Stmt + serverName gomatrixserverlib.ServerName +} + +func (s *pushersStatements) execSchema(db *sql.DB) error { + _, err := db.Exec(pushersSchema) + return err +} + +func (s *pushersStatements) prepare(db *sql.DB, writer sqlutil.Writer, server gomatrixserverlib.ServerName) (err error) { + s.db = db + s.writer = writer + if s.selectPushersByLocalpartStmt, err = db.Prepare(selectPushersByLocalpartSQL); err != nil { + return + } + s.serverName = server + return +} + +func (s *pushersStatements) selectPushersByLocalpart( + ctx context.Context, txn *sql.Tx, localpart, exceptPusherID string, +) ([]api.Pusher, error) { + pushers := []api.Pusher{} + rows, err := sqlutil.TxStmt(txn, s.selectPushersByLocalpartStmt).QueryContext(ctx, localpart, exceptPusherID) + + if err != nil { + return pushers, err + } + + for rows.Next() { + var dev api.Pusher + var lastseents sql.NullInt64 + var id, displayname, ip, useragent sql.NullString + err = rows.Scan(&id, &displayname, &lastseents, &ip, &useragent) + if err != nil { + return pushers, err + } + if id.Valid { + dev.ID = id.String + } + + dev.UserID = userutil.MakeUserID(localpart, s.serverName) + pushers = append(pushers, dev) + } + + return pushers, nil +} diff --git a/userapi/storage/pushers/sqlite3/storage.go b/userapi/storage/pushers/sqlite3/storage.go new file mode 100644 index 00000000..ea20df19 --- /dev/null +++ b/userapi/storage/pushers/sqlite3/storage.go @@ -0,0 +1,65 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqlite3 + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi/api" + "github.com/matrix-org/gomatrixserverlib" + + _ "github.com/mattn/go-sqlite3" +) + +// Database represents a pusher database. +type Database struct { + db *sql.DB + writer sqlutil.Writer + pushers pushersStatements +} + +// NewDatabase creates a new pusher database +func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName) (*Database, error) { + db, err := sqlutil.Open(dbProperties) + if err != nil { + return nil, err + } + writer := sqlutil.NewExclusiveWriter() + d := pushersStatements{} + + // Create tables before executing migrations so we don't fail if the table is missing, + // and THEN prepare statements so we don't fail due to referencing new columns + if err = d.execSchema(db); err != nil { + return nil, err + } + m := sqlutil.NewMigrations() + if err = m.RunDeltas(db, dbProperties); err != nil { + return nil, err + } + if err = d.prepare(db, writer, serverName); err != nil { + return nil, err + } + return &Database{db, writer, d}, nil +} + +// GetPushersByLocalpart returns the pushers matching the given localpart. +func (d *Database) GetPushersByLocalpart( + ctx context.Context, localpart string, +) ([]api.Pusher, error) { + return d.pushers.selectPushersByLocalpart(ctx, nil, localpart, "") +} diff --git a/userapi/storage/pushers/storage.go b/userapi/storage/pushers/storage.go new file mode 100644 index 00000000..87f8642a --- /dev/null +++ b/userapi/storage/pushers/storage.go @@ -0,0 +1,39 @@ +// Copyright 2021 Dan Peleg +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !wasm + +package pushers + +import ( + "fmt" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi/storage/pushers/postgres" + "github.com/matrix-org/dendrite/userapi/storage/pushers/sqlite3" + "github.com/matrix-org/gomatrixserverlib" +) + +// NewDatabase opens a new Postgres or Sqlite database (based on dataSourceName scheme) +// and sets postgres connection parameters +func NewDatabase(dbProperties *config.DatabaseOptions, serverName gomatrixserverlib.ServerName) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(dbProperties, serverName) + case dbProperties.ConnectionString.IsPostgres(): + return postgres.NewDatabase(dbProperties, serverName) + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/userapi/storage/pushers/storage_wasm.go b/userapi/storage/pushers/storage_wasm.go new file mode 100644 index 00000000..d78a299b --- /dev/null +++ b/userapi/storage/pushers/storage_wasm.go @@ -0,0 +1,37 @@ +// Copyright 2021 Dan Peleg +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pushers + +import ( + "fmt" + + "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/dendrite/userapi/storage/pushers/sqlite3" + "github.com/matrix-org/gomatrixserverlib" +) + +func NewDatabase( + dbProperties *config.DatabaseOptions, + serverName gomatrixserverlib.ServerName, +) (Database, error) { + switch { + case dbProperties.ConnectionString.IsSQLite(): + return sqlite3.NewDatabase(dbProperties, serverName) + case dbProperties.ConnectionString.IsPostgres(): + return nil, fmt.Errorf("can't use Postgres implementation") + default: + return nil, fmt.Errorf("unexpected database type") + } +} diff --git a/userapi/userapi.go b/userapi/userapi.go index 74702020..47574208 100644 --- a/userapi/userapi.go +++ b/userapi/userapi.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/userapi/inthttp" "github.com/matrix-org/dendrite/userapi/storage/accounts" "github.com/matrix-org/dendrite/userapi/storage/devices" + "github.com/matrix-org/dendrite/userapi/storage/pushers" "github.com/sirupsen/logrus" ) @@ -42,9 +43,15 @@ func NewInternalAPI( logrus.WithError(err).Panicf("failed to connect to device db") } + pusherDB, err := pushers.NewDatabase(&cfg.PusherDatabase, cfg.Matrix.ServerName) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to device db") + } + return &internal.UserInternalAPI{ AccountDB: accountDB, DeviceDB: deviceDB, + PusherDB: pusherDB, ServerName: cfg.Matrix.ServerName, AppServices: appServices, KeyAPI: keyAPI,