Implement storage interfaces (#841)

* Implement interfaces for federationsender storage

* Implement interfaces for mediaapi storage

* Implement interfaces for publicroomsapi storage

* Implement interfaces for roomserver storage

* Implement interfaces for syncapi storage

* Implement interfaces for keydb storage

* common.PartitionStorer in publicroomsapi interface

* Update copyright notices
This commit is contained in:
Neil Alexander 2020-01-03 14:07:05 +00:00 committed by GitHub
parent 6cab622468
commit c28577ea25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
60 changed files with 1594 additions and 1223 deletions

View file

@ -33,7 +33,7 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
roomServerConsumer *common.ContinualConsumer
db *storage.Database
db storage.Database
queues *queue.OutgoingQueues
query api.RoomserverQueryAPI
}
@ -43,7 +43,7 @@ func NewOutputRoomEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
store storage.Database,
queryAPI api.RoomserverQueryAPI,
) *OutputRoomEventConsumer {
consumer := common.ContinualConsumer{

View file

@ -29,7 +29,7 @@ import (
// OutputTypingEventConsumer consumes events that originate in typing server.
type OutputTypingEventConsumer struct {
consumer *common.ContinualConsumer
db *storage.Database
db storage.Database
queues *queue.OutgoingQueues
ServerName gomatrixserverlib.ServerName
}
@ -39,7 +39,7 @@ func NewOutputTypingEventConsumer(
cfg *config.Dendrite,
kafkaConsumer sarama.Consumer,
queues *queue.OutgoingQueues,
store *storage.Database,
store storage.Database,
) *OutputTypingEventConsumer {
consumer := common.ContinualConsumer{
Topic: string(cfg.Kafka.Topics.OutputTypingEvent),

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -1,4 +1,5 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
package postgres
import (
"context"

View file

@ -0,0 +1,122 @@
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/types"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
common.PartitionOffsetStatements
db *sql.DB
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
}
return &result, nil
}
func (d *Database) prepare() error {
var err error
if err = d.joinedHostsStatements.prepare(d.db); err != nil {
return err
}
if err = d.roomStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update, or nil if this was a duplicate message.
// This is called when we receive a message from kafka, so we pass in
// oldEventID and newEventID to check that we haven't missed any messages or
// this isn't a duplicate message.
func (d *Database) UpdateRoom(
ctx context.Context,
roomID, oldEventID, newEventID string,
addHosts []types.JoinedHost,
removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
err = d.insertRoom(ctx, txn, roomID)
if err != nil {
return err
}
lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
if err != nil {
return err
}
if lastSentEventID == newEventID {
// We've handled this message before, so let's just ignore it.
// We can only get a duplicate for the last message we processed,
// so its enough just to compare the newEventID with lastSentEventID
return nil
}
if lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
}
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
for _, add := range addHosts {
err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
if err != nil {
return err
}
}
if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
return err
}
return d.updateRoom(ctx, txn, roomID, newEventID)
})
return
}
// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}

View file

@ -1,4 +1,4 @@
// Copyright 2017 Vector Creations Ltd
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,106 +16,30 @@ package storage
import (
"context"
"database/sql"
"errors"
"net/url"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/federationsender/storage/postgres"
"github.com/matrix-org/dendrite/federationsender/types"
)
// Database stores information needed by the federation sender
type Database struct {
joinedHostsStatements
roomStatements
common.PartitionOffsetStatements
db *sql.DB
type Database interface {
common.PartitionStorer
UpdateRoom(ctx context.Context, roomID, oldEventID, newEventID string, addHosts []types.JoinedHost, removeHosts []string) (joinedHosts []types.JoinedHost, err error)
GetJoinedHosts(ctx context.Context, roomID string) ([]types.JoinedHost, error)
}
// NewDatabase opens a new database
func NewDatabase(dataSourceName string) (*Database, error) {
var result Database
var err error
if result.db, err = sql.Open("postgres", dataSourceName); err != nil {
func NewDatabase(dataSourceName string) (Database, error) {
uri, err := url.Parse(dataSourceName)
if err != nil {
return nil, err
}
if err = result.prepare(); err != nil {
return nil, err
switch uri.Scheme {
case "postgres":
return postgres.NewDatabase(dataSourceName)
default:
return nil, errors.New("unknown schema")
}
return &result, nil
}
func (d *Database) prepare() error {
var err error
if err = d.joinedHostsStatements.prepare(d.db); err != nil {
return err
}
if err = d.roomStatements.prepare(d.db); err != nil {
return err
}
return d.PartitionOffsetStatements.Prepare(d.db, "federationsender")
}
// UpdateRoom updates the joined hosts for a room and returns what the joined
// hosts were before the update, or nil if this was a duplicate message.
// This is called when we receive a message from kafka, so we pass in
// oldEventID and newEventID to check that we haven't missed any messages or
// this isn't a duplicate message.
func (d *Database) UpdateRoom(
ctx context.Context,
roomID, oldEventID, newEventID string,
addHosts []types.JoinedHost,
removeHosts []string,
) (joinedHosts []types.JoinedHost, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
err = d.insertRoom(ctx, txn, roomID)
if err != nil {
return err
}
lastSentEventID, err := d.selectRoomForUpdate(ctx, txn, roomID)
if err != nil {
return err
}
if lastSentEventID == newEventID {
// We've handled this message before, so let's just ignore it.
// We can only get a duplicate for the last message we processed,
// so its enough just to compare the newEventID with lastSentEventID
return nil
}
if lastSentEventID != oldEventID {
return types.EventIDMismatchError{
DatabaseID: lastSentEventID, RoomServerID: oldEventID,
}
}
joinedHosts, err = d.selectJoinedHostsWithTx(ctx, txn, roomID)
if err != nil {
return err
}
for _, add := range addHosts {
err = d.insertJoinedHosts(ctx, txn, roomID, add.MemberEventID, add.ServerName)
if err != nil {
return err
}
}
if err = d.deleteJoinedHosts(ctx, txn, removeHosts); err != nil {
return err
}
return d.updateRoom(ctx, txn, roomID, newEventID)
})
return
}
// GetJoinedHosts returns the currently joined hosts for room,
// as known to federationserver.
// Returns an error if something goes wrong.
func (d *Database) GetJoinedHosts(
ctx context.Context, roomID string,
) ([]types.JoinedHost, error) {
return d.selectJoinedHosts(ctx, roomID)
}