Add routing

This commit is contained in:
Erik Johnston 2017-12-15 10:22:58 +00:00
parent 535ad630cd
commit 442b261352
5 changed files with 296 additions and 27 deletions

View file

@ -18,6 +18,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/clientapi/auth/storage/devices"
"github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/basecomponent"
"github.com/matrix-org/dendrite/paginationapi/consumers" "github.com/matrix-org/dendrite/paginationapi/consumers"
"github.com/matrix-org/dendrite/paginationapi/routing"
"github.com/matrix-org/dendrite/paginationapi/storage" "github.com/matrix-org/dendrite/paginationapi/storage"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
@ -38,5 +39,5 @@ func SetupPaginationAPIComponent(
handler := consumers.NewOutputRoomEventConsumer(paginationDB) handler := consumers.NewOutputRoomEventConsumer(paginationDB)
base.StartRoomServerConsumer(tracer, paginationDB, handler) base.StartRoomServerConsumer(tracer, paginationDB, handler)
// routing.Setup(base.APIMux, deviceDB, publicRoomsDB) routing.Setup(base.APIMux, base.QueryAPI(), paginationDB)
} }

View file

@ -0,0 +1,72 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package routing
import (
"net/http"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/sirupsen/logrus"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/paginationapi/storage"
"github.com/matrix-org/util"
)
const pathPrefixR0 = "/_matrix/client/r0"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.
func Setup(
apiMux *mux.Router,
queryAPI api.RoomserverQueryAPI,
paginationDB *storage.PaginationAPIDatabase,
) {
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/rooms/{roomID}/messages",
common.MakeExternalAPI("room_messages", func(req *http.Request) util.JSONResponse {
eventID := req.URL.Query().Get("from")
logrus.WithField("eventID", eventID).Warn("Got event")
eventIDs, err := paginationDB.Paginate(req.Context(), eventID, 10)
if err != nil {
return httputil.LogThenError(req, err)
}
var response api.QueryEventsByIDResponse
err = queryAPI.QueryEventsByID(req.Context(), &api.QueryEventsByIDRequest{
EventIDs: eventIDs,
}, &response)
if err != nil {
return httputil.LogThenError(req, err)
}
return util.JSONResponse{Code: 200, JSON: map[string]interface{}{
"event_ids": eventIDs,
"chunk": gomatrixserverlib.ToClientEvents(response.Events, gomatrixserverlib.FormatAll),
"start": eventID,
"end": eventIDs[len(eventIDs)-1],
}}
}),
).Methods("GET", "OPTIONS")
}

View file

@ -18,6 +18,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/pkg/errors"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
"github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/gomatrixserverlib"
) )
@ -25,12 +27,20 @@ import (
const eventBucketsSchema = ` const eventBucketsSchema = `
CREATE TABLE IF NOT EXISTS paginationapi_events ( CREATE TABLE IF NOT EXISTS paginationapi_events (
id BIGSERIAL PRIMARY KEY, id BIGSERIAL PRIMARY KEY,
event_id TEXT NOT NULL, event_id TEXT NOT NULL
bucket_id BIGINT
); );
CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_events_event_id_idx ON paginationapi_events(event_id); CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_events_event_id_idx ON paginationapi_events(event_id);
CREATE TABLE IF NOT EXISTS paginationapi_event_to_bucket (
event_nid BIGINT NOT NULL,
bucket_id BIGINT NOT NULL,
ordering SMALLINT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_event_to_bucket_event_id_idx ON paginationapi_event_to_bucket(event_nid);
CREATE INDEX IF NOT EXISTS paginationapi_event_to_bucket_bucket_idx ON paginationapi_event_to_bucket(bucket_id);
CREATE TABLE IF NOT EXISTS paginationapi_event_edges ( CREATE TABLE IF NOT EXISTS paginationapi_event_edges (
parent_id BIGINT NOT NULL, parent_id BIGINT NOT NULL,
child_id BIGINT NOT NULL child_id BIGINT NOT NULL
@ -44,6 +54,8 @@ CREATE TABLE IF NOT EXISTS paginationapi_rooms (
room_id TEXT NOT NULL room_id TEXT NOT NULL
); );
CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_rooms_idx ON paginationapi_rooms(room_id);
CREATE TABLE IF NOT EXISTS paginationapi_buckets ( CREATE TABLE IF NOT EXISTS paginationapi_buckets (
id BIGSERIAL PRIMARY KEY, id BIGSERIAL PRIMARY KEY,
room_id BIGINT NOT NULL, room_id BIGINT NOT NULL,
@ -61,27 +73,32 @@ CREATE TABLE IF NOT EXISTS paginationapi_latest_buckets (
CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_latest_buckets_idx ON paginationapi_latest_buckets(room_id); CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_latest_buckets_idx ON paginationapi_latest_buckets(room_id);
` `
const getBucketForEventSQL = `SELECT bucket_id FROM paginationapi_events WHERE event_id = $1` const getBucketForEventSQL = `SELECT bucket_id FROM paginationapi_event_to_bucket INNER JOIN paginationapi_events ON id = event_nid WHERE event_id = $1`
const getBucketsForParentEventsSQL = ` const getBucketsForParentEventsSQL = `
SELECT SELECT
p.bucket_id b.bucket_id
FROM paginationapi_event_edges FROM paginationapi_event_edges
INNER JOIN paginationapi_events AS p
ON parent_id = p.id
INNER JOIN paginationapi_events AS c INNER JOIN paginationapi_events AS c
ON child_id = c.id ON child_id = c.id
INNER JOIN paginationapi_event_to_bucket AS b
ON b.event_nid = parent_id
WHERE WHERE
c.event_id = $1 c.event_id = $1
AND p.bucket_id IS NOT NULL AND b.bucket_id IS NOT NULL
` `
const latestBucketSQL = ` const latestBucketSQL = `
SELECT bucket_id FROM paginationapi_latest_buckets AS l SELECT
INNER JOIN paginationapi_rooms AS r ON r.id = l.room_id bucket_id
FROM paginationapi_latest_buckets AS l
INNER JOIN paginationapi_rooms AS r
ON r.id = l.room_id
WHERE r.room_id = $1 WHERE r.room_id = $1
` `
const maxSizeOfBuckets uint = 5
type eventBucketsStatements struct { type eventBucketsStatements struct {
getBucketForEventStmt *sql.Stmt getBucketForEventStmt *sql.Stmt
getBucketsForParentEventsStmt *sql.Stmt getBucketsForParentEventsStmt *sql.Stmt
@ -171,7 +188,7 @@ func (s *eventBucketsStatements) getLatestBucketForEvents(ctx context.Context, t
} }
} }
return nil, nil return maxBucketID, nil
} }
func (s *eventBucketsStatements) getLatestBucketID(ctx context.Context, txn *sql.Tx, roomID string) (*uint, error) { func (s *eventBucketsStatements) getLatestBucketID(ctx context.Context, txn *sql.Tx, roomID string) (*uint, error) {
@ -208,19 +225,28 @@ func (s *eventBucketsStatements) insertIntoOrAfterBucket(
// TODO: Create a new bucket if there are already too many events in that // TODO: Create a new bucket if there are already too many events in that
// bucket // bucket
_, err := txn.ExecContext( var count uint
ctx, "INSERT INTO paginationapi_events (event_id, bucket_id) VALUES ($1, $2)", err := txn.QueryRowContext(
event.EventID(), bucketID, ctx, "SELECT count(*) FROM paginationapi_event_to_bucket WHERE bucket_id = $1", bucketID,
) ).Scan(&count)
if err != nil {
return err return err
} }
if count >= maxSizeOfBuckets {
return s.insertAfterBucket(ctx, txn, event, bucketID)
}
return s.insertEventToBucket(ctx, txn, event, bucketID)
}
func (s *eventBucketsStatements) insertAfterBucket( func (s *eventBucketsStatements) insertAfterBucket(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint,
) error { ) error {
var roomNID uint var roomNID uint
err := txn.QueryRowContext( err := txn.QueryRowContext(
ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), ctx, "SELECT id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(),
).Scan(&roomNID) ).Scan(&roomNID)
if err != nil { if err != nil {
@ -239,6 +265,11 @@ func (s *eventBucketsStatements) insertAfterBucket(
return err return err
} }
err = s.insertEventToBucket(ctx, txn, event, newBucketID)
if err != nil {
return err
}
_, err = txn.ExecContext( _, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2) ON CONFLICT (room_id) DO UPDATE SET bucket_id = $2", ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2) ON CONFLICT (room_id) DO UPDATE SET bucket_id = $2",
roomNID, newBucketID, roomNID, newBucketID,
@ -250,10 +281,10 @@ func (s *eventBucketsStatements) insertAfterBucket(
func (s *eventBucketsStatements) createBucketAndInsert( func (s *eventBucketsStatements) createBucketAndInsert(
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event,
) error { ) error {
var roomNID uint _, err := txn.ExecContext(
err := txn.QueryRowContext( ctx, "INSERT INTO paginationapi_rooms (room_id) VALUES ($1) ON CONFLICT (room_id) DO NOTHING",
ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), event.RoomID(),
).Scan(&roomNID) )
if err != nil { if err != nil {
return err return err
@ -261,18 +292,165 @@ func (s *eventBucketsStatements) createBucketAndInsert(
var newBucketID uint var newBucketID uint
err = txn.QueryRowContext( err = txn.QueryRowContext(
ctx, "INSERT INTO paginationapi_buckets (room_id) VALUES ($1) RETURNING id", ctx, "INSERT INTO paginationapi_buckets (room_id) SELECT id FROM paginationapi_rooms WHERE room_id = $1 RETURNING id",
roomNID, event.RoomID(),
).Scan(&newBucketID) ).Scan(&newBucketID)
if err != nil { if err != nil {
return err return err
} }
err = s.insertEventToBucket(ctx, txn, event, newBucketID)
if err != nil {
return err
}
_, err = txn.ExecContext( _, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2)", ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) SELECT id, $2 FROM paginationapi_rooms WHERE room_id = $1",
roomNID, newBucketID, event.RoomID(), newBucketID,
) )
return err return err
} }
func (s *eventBucketsStatements) insertEventToBucket(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint) error {
_, err := txn.ExecContext(
ctx, "INSERT INTO paginationapi_events (event_id) VALUES ($1) ON CONFLICT (event_id) DO NOTHING",
event.EventID(),
)
if err != nil {
return err
}
var maxOrder sql.NullInt64
err = txn.QueryRowContext(
ctx,
"SELECT min(ordering) FROM paginationapi_event_to_bucket INNER JOIN paginationapi_event_edges ON parent_id = event_nid INNER JOIN paginationapi_events AS e ON child_id = e.id WHERE e.event_id = $1",
event.EventID(),
).Scan(&maxOrder)
if err != nil {
return err
}
var order uint
if maxOrder.Valid {
order = uint(maxOrder.Int64)
_, err = txn.ExecContext(
ctx, "UPDATE paginationapi_event_to_bucket SET ordering = ordering + 1 WHERE bucket_id = $1 AND ordering >= $2",
bucketID, order,
)
if err != nil {
return err
}
} else {
var currentMaxOrder uint
err = txn.QueryRowContext(
ctx,
"SELECT COALESCE(max(ordering), 0) FROM paginationapi_event_to_bucket WHERE bucket_id = $1",
bucketID,
).Scan(&currentMaxOrder)
if err != nil {
return err
}
order = currentMaxOrder + 1
}
var eventNID uint
err = txn.QueryRowContext(
ctx, "SELECT id FROM paginationapi_events WHERE event_id = $1",
event.EventID(),
).Scan(&eventNID)
if err != nil {
return err
}
_, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_event_to_bucket (event_nid, bucket_id, ordering) SELECT id, $2, $3 FROM paginationapi_events WHERE event_id = $1",
event.EventID(), bucketID, order,
)
if err != nil {
return err
}
for _, childID := range event.PrevEventIDs() {
_, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_events (event_id) VALUES ($1) ON CONFLICT (event_id) DO NOTHING",
childID,
)
if err != nil {
return err
}
_, err = txn.ExecContext(
ctx, "INSERT INTO paginationapi_event_edges (parent_id, child_id) SELECT $1, id FROM paginationapi_events WHERE event_id = $2",
eventNID, childID,
)
if err != nil {
return err
}
}
return nil
}
func (s *eventBucketsStatements) paginate(ctx context.Context, txn *sql.Tx, baseEventID string, limit int) ([]string, error) {
var eventIDs []string
var (
bucketID uint
order uint
)
err := txn.QueryRowContext(
ctx, "SELECT bucket_id, ordering FROM paginationapi_events INNER JOIN paginationapi_event_to_bucket ON id = event_nid WHERE event_id = $1",
baseEventID,
).Scan(&bucketID, &order)
if err != nil {
return nil, errors.WithMessage(err, "failed to lookup current bucket")
}
for len(eventIDs) < limit {
var rows *sql.Rows
if order > 0 {
rows, err = txn.QueryContext(ctx, "SELECT event_id FROM paginationapi_event_to_bucket INNER JOIN paginationapi_events ON event_nid = id WHERE bucket_id = $1 AND ordering < $2 ORDER BY ordering DESC", bucketID, order)
} else {
rows, err = txn.QueryContext(ctx, "SELECT event_id FROM paginationapi_event_to_bucket INNER JOIN paginationapi_events ON event_nid = id WHERE bucket_id = $1 ORDER BY ordering DESC", bucketID)
}
if err != nil {
return nil, err
}
for rows.Next() {
var eventID string
if err = rows.Scan(&eventID); err != nil {
return nil, err
}
eventIDs = append(eventIDs, eventID)
}
if len(eventIDs) >= limit {
break
}
order = 0
err := txn.QueryRowContext(
ctx, "SELECT previous_bucket FROM paginationapi_buckets WHERE id = $1",
bucketID,
).Scan(&bucketID)
if err == sql.ErrNoRows {
break
} else if err != nil {
return nil, err
}
}
return eventIDs, nil
}

View file

@ -18,6 +18,8 @@ import (
"context" "context"
"database/sql" "database/sql"
"github.com/sirupsen/logrus"
// Import the postgres database driver. // Import the postgres database driver.
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common"
@ -47,8 +49,16 @@ func NewPaginationAPIDatabase(dataSourceName string) (*PaginationAPIDatabase, er
return &d, nil return &d, nil
} }
func (s *PaginationAPIDatabase) Paginate(ctx context.Context, eventID string, limit uint) ([]string, error) { func (s *PaginationAPIDatabase) Paginate(ctx context.Context, eventID string, limit int) ([]string, error) {
return nil, nil var eventIDs []string
err := common.WithTransaction(s.db, func(txn *sql.Tx) error {
var err error
eventIDs, err = s.eventBuckets.paginate(ctx, txn, eventID, limit)
return err
})
return eventIDs, err
} }
func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixserverlib.Event) error { func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixserverlib.Event) error {
@ -83,6 +93,12 @@ func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixser
result := CalculateEventInsertion(minParentBucketID, maxChildBucketID, maxCurrentBucketID, true) result := CalculateEventInsertion(minParentBucketID, maxChildBucketID, maxCurrentBucketID, true)
logrus.WithFields(logrus.Fields{
"behaviour": result.Behaviour,
"bucket_id": result.BucketID,
"event_id": eventID,
}).Warn("Resutl")
switch result.Behaviour { switch result.Behaviour {
case AppendAfter: case AppendAfter:
return s.eventBuckets.insertAfterBucket(ctx, txn, event, result.BucketID) return s.eventBuckets.insertAfterBucket(ctx, txn, event, result.BucketID)

View file

@ -297,6 +297,7 @@ func (d *SyncServerDatabase) CompleteSync(
stateEvents = removeDuplicates(stateEvents, recentEvents) stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.PrevBatch = recentEvents[len(recentEvents)-1].EventID()
jr.Timeline.Limited = true jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr res.Rooms.Join[roomID] = *jr
@ -425,6 +426,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
jr := types.NewJoinResponse() jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.Timeline.PrevBatch = recentEvents[len(recentEvents)-1].EventID()
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr res.Rooms.Join[delta.roomID] = *jr
case "leave": case "leave":