Add first stab at pagination

This commit is contained in:
Erik Johnston 2017-12-11 17:23:33 +00:00
parent 7e07f8ae7d
commit 535ad630cd
6 changed files with 537 additions and 0 deletions

View file

@ -19,6 +19,7 @@ import (
"net/http"
"github.com/matrix-org/dendrite/common/keydb"
"github.com/matrix-org/dendrite/paginationapi"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/common"
@ -56,6 +57,7 @@ func main() {
publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB)
roomserver.SetupRoomServerComponentWithDB(base, roomserverDB)
syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB)
paginationapi.SetupPaginationAPIComponent(base, deviceDB)
httpHandler := common.WrapHandlerInCORS(base.APIMux)

View file

@ -165,6 +165,9 @@ type Dendrite struct {
// The PublicRoomsAPI database stores information used to compute the public
// room directory. It is only accessed by the PublicRoomsAPI server.
PublicRoomsAPI DataSource `yaml:"public_rooms_api"`
// The PaginationAPI database stores information used to fulfill
// pagination requests.
PaginationAPI DataSource `yaml:"pagination_api"`
// The Naffka database is used internally by the naffka library, if used.
Naffka DataSource `yaml:"naffka,omitempty"`
} `yaml:"database"`

View file

@ -0,0 +1,56 @@
// Copyright 2017 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"github.com/matrix-org/dendrite/paginationapi/storage"
"github.com/matrix-org/dendrite/roomserver/api"
)
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
db *storage.PaginationAPIDatabase
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer.
func NewOutputRoomEventConsumer(db *storage.PaginationAPIDatabase) *OutputRoomEventConsumer {
s := &OutputRoomEventConsumer{db: db}
return s
}
// ProcessNewRoomEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewRoomEvent(
ctx context.Context, msg *api.OutputNewRoomEvent,
) error {
return s.db.AddEvent(ctx, &msg.Event)
}
// ProcessNewInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessNewInviteEvent(
ctx context.Context, msg *api.OutputNewInviteEvent,
) error {
return nil
}
// ProcessRetireInviteEvent implements output.ProcessOutputEventHandler
func (s *OutputRoomEventConsumer) ProcessRetireInviteEvent(
ctx context.Context, msg *api.OutputRetireInviteEvent,
) error {
return nil
}

View file

@ -0,0 +1,42 @@
// Copyright 2017 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package paginationapi
import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/paginationapi/consumers"
"github.com/matrix-org/dendrite/paginationapi/storage"
"github.com/sirupsen/logrus"
)
// SetupPaginationAPIComponent sets up and registers HTTP handlers for the PaginationAPI
// component.
func SetupPaginationAPIComponent(
base *basecomponent.BaseDendrite,
deviceDB *devices.Database,
) {
tracer := base.CreateNewTracer("PaginationAPI")
paginationDB, err := storage.NewPaginationAPIDatabase(string(base.Cfg.Database.PaginationAPI))
if err != nil {
logrus.WithError(err).Panicf("failed to connect to pagination api db")
}
handler := consumers.NewOutputRoomEventConsumer(paginationDB)
base.StartRoomServerConsumer(tracer, paginationDB, handler)
// routing.Setup(base.APIMux, deviceDB, publicRoomsDB)
}

View file

@ -0,0 +1,278 @@
// Copyright 2017 New Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib"
)
const eventBucketsSchema = `
CREATE TABLE IF NOT EXISTS paginationapi_events (
id BIGSERIAL PRIMARY KEY,
event_id TEXT NOT NULL,
bucket_id BIGINT
);
CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_events_event_id_idx ON paginationapi_events(event_id);
CREATE TABLE IF NOT EXISTS paginationapi_event_edges (
parent_id BIGINT NOT NULL,
child_id BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS paginationapi_event_edges_parent_idx ON paginationapi_event_edges(parent_id);
CREATE INDEX IF NOT EXISTS paginationapi_event_edges_child_idx ON paginationapi_event_edges(child_id);
CREATE TABLE IF NOT EXISTS paginationapi_rooms (
id BIGSERIAL PRIMARY KEY,
room_id TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS paginationapi_buckets (
id BIGSERIAL PRIMARY KEY,
room_id BIGINT NOT NULL,
previous_bucket BIGINT,
missing_event_ids TEXT[]
);
CREATE INDEX IF NOT EXISTS paginationapi_buckets_previous_idx ON paginationapi_buckets(previous_bucket);
CREATE TABLE IF NOT EXISTS paginationapi_latest_buckets (
room_id BIGINT NOT NULL,
bucket_id BIGINT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_latest_buckets_idx ON paginationapi_latest_buckets(room_id);
`
const getBucketForEventSQL = `SELECT bucket_id FROM paginationapi_events WHERE event_id = $1`
const getBucketsForParentEventsSQL = `
SELECT
p.bucket_id
FROM paginationapi_event_edges
INNER JOIN paginationapi_events AS p
ON parent_id = p.id
INNER JOIN paginationapi_events AS c
ON child_id = c.id
WHERE
c.event_id = $1
AND p.bucket_id IS NOT NULL
`
const latestBucketSQL = `
SELECT bucket_id FROM paginationapi_latest_buckets AS l
INNER JOIN paginationapi_rooms AS r ON r.id = l.room_id
WHERE r.room_id = $1
`
type eventBucketsStatements struct {
getBucketForEventStmt *sql.Stmt
getBucketsForParentEventsStmt *sql.Stmt
latestBucketStmt *sql.Stmt
}
func (s *eventBucketsStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(eventBucketsSchema)
if err != nil {
return
}
if s.getBucketForEventStmt, err = db.Prepare(getBucketForEventSQL); err != nil {
return
}
if s.getBucketsForParentEventsStmt, err = db.Prepare(getBucketsForParentEventsSQL); err != nil {
return
}
if s.latestBucketStmt, err = db.Prepare(latestBucketSQL); err != nil {
return
}
return
}
func (s *eventBucketsStatements) isAfter(ctx context.Context, txn *sql.Tx, a uint, b uint) (bool, error) {
return a < b, nil
}
func (s *eventBucketsStatements) getEarliestBucketForEvent(ctx context.Context, txn *sql.Tx, eventID string) (*uint, error) {
rows, err := common.TxStmt(txn, s.getBucketsForParentEventsStmt).QueryContext(ctx, eventID)
if err != nil {
return nil, err
}
defer rows.Close() // nolint: errcheck
var minBucketID *uint
for rows.Next() {
var bucketID uint
if err = rows.Scan(&bucketID); err != nil {
return nil, err
}
if minBucketID == nil {
minBucketID = &bucketID
continue
}
isAfter, err := s.isAfter(ctx, txn, bucketID, *minBucketID)
if err != nil {
return nil, err
}
if isAfter {
minBucketID = &bucketID
}
}
return minBucketID, nil
}
func (s *eventBucketsStatements) getLatestBucketForEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) (*uint, error) {
var maxBucketID *uint
// FIXME: This can be done via postgres arrays! \o/
stmt := common.TxStmt(txn, s.getBucketForEventStmt)
for _, eventID := range eventIDs {
var bucketID *uint
err := stmt.QueryRowContext(ctx, eventID).Scan(&bucketID)
if err != nil && err != sql.ErrNoRows {
return nil, err
}
if bucketID == nil {
continue
}
if maxBucketID == nil {
maxBucketID = bucketID
}
isAfter, err := s.isAfter(ctx, txn, *maxBucketID, *bucketID)
if err != nil {
return nil, err
}
if isAfter {
maxBucketID = bucketID
}
}
return nil, nil
}
func (s *eventBucketsStatements) getLatestBucketID(ctx context.Context, txn *sql.Tx, roomID string) (*uint, error) {
stmt := common.TxStmt(txn, s.latestBucketStmt)
var bucketID sql.NullInt64
err := stmt.QueryRowContext(ctx, roomID).Scan(&bucketID)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
if bucketID.Valid {
result := uint(bucketID.Int64)
return &result, nil
}
return nil, nil
}
func (s *eventBucketsStatements) insertIntoOrBeforeBucket(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint,
) error {
panic("not implemented")
}
func (s *eventBucketsStatements) insertIntoOrAfterBucket(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint,
) error {
// TODO: Create a new bucket if there are already too many events in that
// bucket
_, err := txn.ExecContext(
ctx, "INSERT INTO paginationapi_events (event_id, bucket_id) VALUES ($1, $2)",
event.EventID(), bucketID,
)
return err
}
func (s *eventBucketsStatements) insertAfterBucket(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint,
) error {
var roomNID uint
err := txn.QueryRowContext(
ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(),
).Scan(&roomNID)
if err != nil {
return err
}
// TODO: Add missing_event_ids
var newBucketID uint
err = txn.QueryRowContext(
ctx, "INSERT INTO paginationapi_buckets (room_id, previous_bucket) VALUES ($1, $2) RETURNING id",
roomNID, bucketID,
).Scan(&newBucketID)
if err != nil {
return err
}
_, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2) ON CONFLICT (room_id) DO UPDATE SET bucket_id = $2",
roomNID, newBucketID,
)
return err
}
func (s *eventBucketsStatements) createBucketAndInsert(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event,
) error {
var roomNID uint
err := txn.QueryRowContext(
ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(),
).Scan(&roomNID)
if err != nil {
return err
}
var newBucketID uint
err = txn.QueryRowContext(
ctx, "INSERT INTO paginationapi_buckets (room_id) VALUES ($1) RETURNING id",
roomNID,
).Scan(&newBucketID)
if err != nil {
return err
}
_, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2)",
roomNID, newBucketID,
)
return err
}

View file

@ -0,0 +1,156 @@
// Copyright 2017 New Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
// Import the postgres database driver.
_ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib"
)
// SyncServerDatabase represents a sync server database
type PaginationAPIDatabase struct {
db *sql.DB
common.PartitionOffsetStatements
eventBuckets eventBucketsStatements
}
// NewSyncServerDatabase creates a new sync server database
func NewPaginationAPIDatabase(dataSourceName string) (*PaginationAPIDatabase, error) {
var d PaginationAPIDatabase
var err error
if d.db, err = sql.Open("postgres", dataSourceName); err != nil {
return nil, err
}
if err = d.PartitionOffsetStatements.Prepare(d.db, "paginationapi"); err != nil {
return nil, err
}
if err = d.eventBuckets.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
func (s *PaginationAPIDatabase) Paginate(ctx context.Context, eventID string, limit uint) ([]string, error) {
return nil, nil
}
func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixserverlib.Event) error {
// 1. figure out which buckets have events that point to that event (take
// smallest), otherwise look at buckets that prev_events are in
// 2. find the largest bucket.
// 3. what if one of the prev_events don't have buckets? Add to current
// bucket or not? should we have sentinel events? What if we can't get a
// sentinel? I guess we just ignore that edge.
// 4. Check if the bucket is too big, if it is make a new one.
// 5. Add event to bucket.
// TODO: Handle having multiple latest events?
return common.WithTransaction(s.db, func(txn *sql.Tx) error {
eventID := event.EventID()
minParentBucketID, err := s.eventBuckets.getEarliestBucketForEvent(ctx, txn, eventID)
if err != nil {
return err
}
maxChildBucketID, err := s.eventBuckets.getLatestBucketForEvents(ctx, txn, event.PrevEventIDs())
if err != nil {
return err
}
maxCurrentBucketID, err := s.eventBuckets.getLatestBucketID(ctx, txn, event.RoomID())
if err != nil {
return err
}
result := CalculateEventInsertion(minParentBucketID, maxChildBucketID, maxCurrentBucketID, true)
switch result.Behaviour {
case AppendAfter:
return s.eventBuckets.insertAfterBucket(ctx, txn, event, result.BucketID)
case AppendInto:
return s.eventBuckets.insertIntoOrAfterBucket(ctx, txn, event, result.BucketID)
case PrependInto:
return s.eventBuckets.insertIntoOrBeforeBucket(ctx, txn, event, result.BucketID)
case CreateNew:
return s.eventBuckets.createBucketAndInsert(ctx, txn, event)
case Quarantine:
panic("Quarantine not implemented")
default:
panic("Unknown behaviour")
}
})
}
type EventInsertionBehaviour = uint
const (
AppendAfter EventInsertionBehaviour = iota
AppendInto
PrependInto
CreateNew
Quarantine
)
type EventInsertionOutput struct {
Behaviour EventInsertionBehaviour
BucketID uint
}
func CalculateEventInsertion(
minParentBucketID *uint,
maxChildBucketID *uint,
maxCurrentBucketID *uint,
isNewEvent bool,
) EventInsertionOutput {
if maxCurrentBucketID == nil {
return EventInsertionOutput{
Behaviour: CreateNew,
}
}
// TODO: Handle if both minParentBucketID and maxChildBucketID are non-nil
if minParentBucketID != nil {
return EventInsertionOutput{
BucketID: *minParentBucketID,
Behaviour: PrependInto,
}
}
if maxChildBucketID != nil && *maxCurrentBucketID == *maxChildBucketID {
return EventInsertionOutput{
BucketID: *maxChildBucketID,
Behaviour: AppendInto,
}
}
if isNewEvent {
return EventInsertionOutput{
BucketID: *maxCurrentBucketID,
Behaviour: AppendAfter,
}
}
return EventInsertionOutput{
Behaviour: Quarantine,
}
}