diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index a96b5ae3..49ba614e 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -19,6 +19,7 @@ import ( "net/http" "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/paginationapi" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/common" @@ -56,6 +57,7 @@ func main() { publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB) roomserver.SetupRoomServerComponentWithDB(base, roomserverDB) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB) + paginationapi.SetupPaginationAPIComponent(base, deviceDB) httpHandler := common.WrapHandlerInCORS(base.APIMux) diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 39607d95..58afb48e 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -165,6 +165,9 @@ type Dendrite struct { // The PublicRoomsAPI database stores information used to compute the public // room directory. It is only accessed by the PublicRoomsAPI server. PublicRoomsAPI DataSource `yaml:"public_rooms_api"` + // The PaginationAPI database stores information used to fulfill + // pagination requests. + PaginationAPI DataSource `yaml:"pagination_api"` // The Naffka database is used internally by the naffka library, if used. Naffka DataSource `yaml:"naffka,omitempty"` } `yaml:"database"` diff --git a/src/github.com/matrix-org/dendrite/paginationapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/paginationapi/consumers/roomserver.go new file mode 100644 index 00000000..23c180bb --- /dev/null +++ b/src/github.com/matrix-org/dendrite/paginationapi/consumers/roomserver.go @@ -0,0 +1,56 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package consumers + +import ( + "context" + + "github.com/matrix-org/dendrite/paginationapi/storage" + "github.com/matrix-org/dendrite/roomserver/api" +) + +// OutputRoomEventConsumer consumes events that originated in the room server. +type OutputRoomEventConsumer struct { + db *storage.PaginationAPIDatabase +} + +// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. +func NewOutputRoomEventConsumer(db *storage.PaginationAPIDatabase) *OutputRoomEventConsumer { + s := &OutputRoomEventConsumer{db: db} + + return s +} + +// ProcessNewRoomEvent implements output.ProcessOutputEventHandler +func (s *OutputRoomEventConsumer) ProcessNewRoomEvent( + ctx context.Context, msg *api.OutputNewRoomEvent, +) error { + + return s.db.AddEvent(ctx, &msg.Event) +} + +// ProcessNewInviteEvent implements output.ProcessOutputEventHandler +func (s *OutputRoomEventConsumer) ProcessNewInviteEvent( + ctx context.Context, msg *api.OutputNewInviteEvent, +) error { + return nil +} + +// ProcessRetireInviteEvent implements output.ProcessOutputEventHandler +func (s *OutputRoomEventConsumer) ProcessRetireInviteEvent( + ctx context.Context, msg *api.OutputRetireInviteEvent, +) error { + return nil +} diff --git a/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go b/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go new file mode 100644 index 00000000..f2d408ee --- /dev/null +++ b/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go @@ -0,0 +1,42 @@ +// Copyright 2017 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package paginationapi + +import ( + "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/paginationapi/consumers" + "github.com/matrix-org/dendrite/paginationapi/storage" + "github.com/sirupsen/logrus" +) + +// SetupPaginationAPIComponent sets up and registers HTTP handlers for the PaginationAPI +// component. +func SetupPaginationAPIComponent( + base *basecomponent.BaseDendrite, + deviceDB *devices.Database, +) { + tracer := base.CreateNewTracer("PaginationAPI") + + paginationDB, err := storage.NewPaginationAPIDatabase(string(base.Cfg.Database.PaginationAPI)) + if err != nil { + logrus.WithError(err).Panicf("failed to connect to pagination api db") + } + + handler := consumers.NewOutputRoomEventConsumer(paginationDB) + base.StartRoomServerConsumer(tracer, paginationDB, handler) + + // routing.Setup(base.APIMux, deviceDB, publicRoomsDB) +} diff --git a/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go b/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go new file mode 100644 index 00000000..7e51a781 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go @@ -0,0 +1,278 @@ +// Copyright 2017 New Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +const eventBucketsSchema = ` +CREATE TABLE IF NOT EXISTS paginationapi_events ( + id BIGSERIAL PRIMARY KEY, + event_id TEXT NOT NULL, + bucket_id BIGINT +); + +CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_events_event_id_idx ON paginationapi_events(event_id); + +CREATE TABLE IF NOT EXISTS paginationapi_event_edges ( + parent_id BIGINT NOT NULL, + child_id BIGINT NOT NULL +); + +CREATE INDEX IF NOT EXISTS paginationapi_event_edges_parent_idx ON paginationapi_event_edges(parent_id); +CREATE INDEX IF NOT EXISTS paginationapi_event_edges_child_idx ON paginationapi_event_edges(child_id); + +CREATE TABLE IF NOT EXISTS paginationapi_rooms ( + id BIGSERIAL PRIMARY KEY, + room_id TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS paginationapi_buckets ( + id BIGSERIAL PRIMARY KEY, + room_id BIGINT NOT NULL, + previous_bucket BIGINT, + missing_event_ids TEXT[] +); + +CREATE INDEX IF NOT EXISTS paginationapi_buckets_previous_idx ON paginationapi_buckets(previous_bucket); + +CREATE TABLE IF NOT EXISTS paginationapi_latest_buckets ( + room_id BIGINT NOT NULL, + bucket_id BIGINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_latest_buckets_idx ON paginationapi_latest_buckets(room_id); +` + +const getBucketForEventSQL = `SELECT bucket_id FROM paginationapi_events WHERE event_id = $1` + +const getBucketsForParentEventsSQL = ` +SELECT + p.bucket_id +FROM paginationapi_event_edges + INNER JOIN paginationapi_events AS p + ON parent_id = p.id + INNER JOIN paginationapi_events AS c + ON child_id = c.id +WHERE + c.event_id = $1 + AND p.bucket_id IS NOT NULL +` + +const latestBucketSQL = ` +SELECT bucket_id FROM paginationapi_latest_buckets AS l +INNER JOIN paginationapi_rooms AS r ON r.id = l.room_id +WHERE r.room_id = $1 +` + +type eventBucketsStatements struct { + getBucketForEventStmt *sql.Stmt + getBucketsForParentEventsStmt *sql.Stmt + latestBucketStmt *sql.Stmt +} + +func (s *eventBucketsStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(eventBucketsSchema) + if err != nil { + return + } + if s.getBucketForEventStmt, err = db.Prepare(getBucketForEventSQL); err != nil { + return + } + if s.getBucketsForParentEventsStmt, err = db.Prepare(getBucketsForParentEventsSQL); err != nil { + return + } + if s.latestBucketStmt, err = db.Prepare(latestBucketSQL); err != nil { + return + } + return +} + +func (s *eventBucketsStatements) isAfter(ctx context.Context, txn *sql.Tx, a uint, b uint) (bool, error) { + return a < b, nil +} + +func (s *eventBucketsStatements) getEarliestBucketForEvent(ctx context.Context, txn *sql.Tx, eventID string) (*uint, error) { + rows, err := common.TxStmt(txn, s.getBucketsForParentEventsStmt).QueryContext(ctx, eventID) + if err != nil { + return nil, err + } + defer rows.Close() // nolint: errcheck + + var minBucketID *uint + for rows.Next() { + var bucketID uint + if err = rows.Scan(&bucketID); err != nil { + return nil, err + } + + if minBucketID == nil { + minBucketID = &bucketID + continue + } + + isAfter, err := s.isAfter(ctx, txn, bucketID, *minBucketID) + if err != nil { + return nil, err + } + + if isAfter { + minBucketID = &bucketID + } + } + + return minBucketID, nil +} + +func (s *eventBucketsStatements) getLatestBucketForEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) (*uint, error) { + var maxBucketID *uint + + // FIXME: This can be done via postgres arrays! \o/ + + stmt := common.TxStmt(txn, s.getBucketForEventStmt) + for _, eventID := range eventIDs { + var bucketID *uint + err := stmt.QueryRowContext(ctx, eventID).Scan(&bucketID) + if err != nil && err != sql.ErrNoRows { + return nil, err + } + + if bucketID == nil { + continue + } + + if maxBucketID == nil { + maxBucketID = bucketID + } + + isAfter, err := s.isAfter(ctx, txn, *maxBucketID, *bucketID) + if err != nil { + return nil, err + } + if isAfter { + maxBucketID = bucketID + } + } + + return nil, nil +} + +func (s *eventBucketsStatements) getLatestBucketID(ctx context.Context, txn *sql.Tx, roomID string) (*uint, error) { + stmt := common.TxStmt(txn, s.latestBucketStmt) + + var bucketID sql.NullInt64 + err := stmt.QueryRowContext(ctx, roomID).Scan(&bucketID) + + if err == sql.ErrNoRows { + return nil, nil + } + + if err != nil { + return nil, err + } + + if bucketID.Valid { + result := uint(bucketID.Int64) + return &result, nil + } + + return nil, nil +} + +func (s *eventBucketsStatements) insertIntoOrBeforeBucket( + ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint, +) error { + panic("not implemented") +} + +func (s *eventBucketsStatements) insertIntoOrAfterBucket( + ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint, +) error { + // TODO: Create a new bucket if there are already too many events in that + // bucket + + _, err := txn.ExecContext( + ctx, "INSERT INTO paginationapi_events (event_id, bucket_id) VALUES ($1, $2)", + event.EventID(), bucketID, + ) + return err +} + +func (s *eventBucketsStatements) insertAfterBucket( + ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint, +) error { + var roomNID uint + err := txn.QueryRowContext( + ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), + ).Scan(&roomNID) + + if err != nil { + return err + } + + // TODO: Add missing_event_ids + + var newBucketID uint + err = txn.QueryRowContext( + ctx, "INSERT INTO paginationapi_buckets (room_id, previous_bucket) VALUES ($1, $2) RETURNING id", + roomNID, bucketID, + ).Scan(&newBucketID) + + if err != nil { + return err + } + + _, err = txn.ExecContext( + ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2) ON CONFLICT (room_id) DO UPDATE SET bucket_id = $2", + roomNID, newBucketID, + ) + + return err +} + +func (s *eventBucketsStatements) createBucketAndInsert( + ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, +) error { + var roomNID uint + err := txn.QueryRowContext( + ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), + ).Scan(&roomNID) + + if err != nil { + return err + } + + var newBucketID uint + err = txn.QueryRowContext( + ctx, "INSERT INTO paginationapi_buckets (room_id) VALUES ($1) RETURNING id", + roomNID, + ).Scan(&newBucketID) + + if err != nil { + return err + } + + _, err = txn.ExecContext( + ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2)", + roomNID, newBucketID, + ) + + return err +} diff --git a/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go b/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go new file mode 100644 index 00000000..aeb2b4bc --- /dev/null +++ b/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go @@ -0,0 +1,156 @@ +// Copyright 2017 New Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + // Import the postgres database driver. + _ "github.com/lib/pq" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/gomatrixserverlib" +) + +// SyncServerDatabase represents a sync server database +type PaginationAPIDatabase struct { + db *sql.DB + common.PartitionOffsetStatements + eventBuckets eventBucketsStatements +} + +// NewSyncServerDatabase creates a new sync server database +func NewPaginationAPIDatabase(dataSourceName string) (*PaginationAPIDatabase, error) { + var d PaginationAPIDatabase + var err error + if d.db, err = sql.Open("postgres", dataSourceName); err != nil { + return nil, err + } + if err = d.PartitionOffsetStatements.Prepare(d.db, "paginationapi"); err != nil { + return nil, err + } + if err = d.eventBuckets.prepare(d.db); err != nil { + return nil, err + } + return &d, nil +} + +func (s *PaginationAPIDatabase) Paginate(ctx context.Context, eventID string, limit uint) ([]string, error) { + return nil, nil +} + +func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixserverlib.Event) error { + // 1. figure out which buckets have events that point to that event (take + // smallest), otherwise look at buckets that prev_events are in + // 2. find the largest bucket. + // 3. what if one of the prev_events don't have buckets? Add to current + // bucket or not? should we have sentinel events? What if we can't get a + // sentinel? I guess we just ignore that edge. + // 4. Check if the bucket is too big, if it is make a new one. + // 5. Add event to bucket. + + // TODO: Handle having multiple latest events? + + return common.WithTransaction(s.db, func(txn *sql.Tx) error { + eventID := event.EventID() + + minParentBucketID, err := s.eventBuckets.getEarliestBucketForEvent(ctx, txn, eventID) + if err != nil { + return err + } + + maxChildBucketID, err := s.eventBuckets.getLatestBucketForEvents(ctx, txn, event.PrevEventIDs()) + if err != nil { + return err + } + + maxCurrentBucketID, err := s.eventBuckets.getLatestBucketID(ctx, txn, event.RoomID()) + if err != nil { + return err + } + + result := CalculateEventInsertion(minParentBucketID, maxChildBucketID, maxCurrentBucketID, true) + + switch result.Behaviour { + case AppendAfter: + return s.eventBuckets.insertAfterBucket(ctx, txn, event, result.BucketID) + case AppendInto: + return s.eventBuckets.insertIntoOrAfterBucket(ctx, txn, event, result.BucketID) + case PrependInto: + return s.eventBuckets.insertIntoOrBeforeBucket(ctx, txn, event, result.BucketID) + case CreateNew: + return s.eventBuckets.createBucketAndInsert(ctx, txn, event) + case Quarantine: + panic("Quarantine not implemented") + default: + panic("Unknown behaviour") + } + }) +} + +type EventInsertionBehaviour = uint + +const ( + AppendAfter EventInsertionBehaviour = iota + AppendInto + PrependInto + CreateNew + Quarantine +) + +type EventInsertionOutput struct { + Behaviour EventInsertionBehaviour + BucketID uint +} + +func CalculateEventInsertion( + minParentBucketID *uint, + maxChildBucketID *uint, + maxCurrentBucketID *uint, + isNewEvent bool, +) EventInsertionOutput { + if maxCurrentBucketID == nil { + return EventInsertionOutput{ + Behaviour: CreateNew, + } + } + + // TODO: Handle if both minParentBucketID and maxChildBucketID are non-nil + + if minParentBucketID != nil { + return EventInsertionOutput{ + BucketID: *minParentBucketID, + Behaviour: PrependInto, + } + } + + if maxChildBucketID != nil && *maxCurrentBucketID == *maxChildBucketID { + return EventInsertionOutput{ + BucketID: *maxChildBucketID, + Behaviour: AppendInto, + } + } + + if isNewEvent { + return EventInsertionOutput{ + BucketID: *maxCurrentBucketID, + Behaviour: AppendAfter, + } + } + + return EventInsertionOutput{ + Behaviour: Quarantine, + } +}