diff --git a/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go b/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go index f2d408ee..9dd235ef 100644 --- a/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go +++ b/src/github.com/matrix-org/dendrite/paginationapi/paginationapi.go @@ -18,6 +18,7 @@ import ( "github.com/matrix-org/dendrite/clientapi/auth/storage/devices" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/paginationapi/consumers" + "github.com/matrix-org/dendrite/paginationapi/routing" "github.com/matrix-org/dendrite/paginationapi/storage" "github.com/sirupsen/logrus" ) @@ -38,5 +39,5 @@ func SetupPaginationAPIComponent( handler := consumers.NewOutputRoomEventConsumer(paginationDB) base.StartRoomServerConsumer(tracer, paginationDB, handler) - // routing.Setup(base.APIMux, deviceDB, publicRoomsDB) + routing.Setup(base.APIMux, base.QueryAPI(), paginationDB) } diff --git a/src/github.com/matrix-org/dendrite/paginationapi/routing/routing.go b/src/github.com/matrix-org/dendrite/paginationapi/routing/routing.go new file mode 100644 index 00000000..de4e2972 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/paginationapi/routing/routing.go @@ -0,0 +1,72 @@ +// Copyright 2017 Vector Creations Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "net/http" + + "github.com/matrix-org/gomatrixserverlib" + + "github.com/matrix-org/dendrite/roomserver/api" + + "github.com/sirupsen/logrus" + + "github.com/matrix-org/dendrite/clientapi/httputil" + + "github.com/gorilla/mux" + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/paginationapi/storage" + "github.com/matrix-org/util" +) + +const pathPrefixR0 = "/_matrix/client/r0" + +// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client +// to clients which need to make outbound HTTP requests. +func Setup( + apiMux *mux.Router, + queryAPI api.RoomserverQueryAPI, + paginationDB *storage.PaginationAPIDatabase, +) { + r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter() + + r0mux.Handle("/rooms/{roomID}/messages", + common.MakeExternalAPI("room_messages", func(req *http.Request) util.JSONResponse { + eventID := req.URL.Query().Get("from") + + logrus.WithField("eventID", eventID).Warn("Got event") + + eventIDs, err := paginationDB.Paginate(req.Context(), eventID, 10) + if err != nil { + return httputil.LogThenError(req, err) + } + + var response api.QueryEventsByIDResponse + err = queryAPI.QueryEventsByID(req.Context(), &api.QueryEventsByIDRequest{ + EventIDs: eventIDs, + }, &response) + if err != nil { + return httputil.LogThenError(req, err) + } + + return util.JSONResponse{Code: 200, JSON: map[string]interface{}{ + "event_ids": eventIDs, + "chunk": gomatrixserverlib.ToClientEvents(response.Events, gomatrixserverlib.FormatAll), + "start": eventID, + "end": eventIDs[len(eventIDs)-1], + }} + }), + ).Methods("GET", "OPTIONS") +} diff --git a/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go b/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go index 7e51a781..0f98c8b0 100644 --- a/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go +++ b/src/github.com/matrix-org/dendrite/paginationapi/storage/events.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" + "github.com/pkg/errors" + "github.com/matrix-org/dendrite/common" "github.com/matrix-org/gomatrixserverlib" ) @@ -25,12 +27,20 @@ import ( const eventBucketsSchema = ` CREATE TABLE IF NOT EXISTS paginationapi_events ( id BIGSERIAL PRIMARY KEY, - event_id TEXT NOT NULL, - bucket_id BIGINT + event_id TEXT NOT NULL ); CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_events_event_id_idx ON paginationapi_events(event_id); +CREATE TABLE IF NOT EXISTS paginationapi_event_to_bucket ( + event_nid BIGINT NOT NULL, + bucket_id BIGINT NOT NULL, + ordering SMALLINT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_event_to_bucket_event_id_idx ON paginationapi_event_to_bucket(event_nid); +CREATE INDEX IF NOT EXISTS paginationapi_event_to_bucket_bucket_idx ON paginationapi_event_to_bucket(bucket_id); + CREATE TABLE IF NOT EXISTS paginationapi_event_edges ( parent_id BIGINT NOT NULL, child_id BIGINT NOT NULL @@ -44,6 +54,8 @@ CREATE TABLE IF NOT EXISTS paginationapi_rooms ( room_id TEXT NOT NULL ); +CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_rooms_idx ON paginationapi_rooms(room_id); + CREATE TABLE IF NOT EXISTS paginationapi_buckets ( id BIGSERIAL PRIMARY KEY, room_id BIGINT NOT NULL, @@ -61,27 +73,32 @@ CREATE TABLE IF NOT EXISTS paginationapi_latest_buckets ( CREATE UNIQUE INDEX IF NOT EXISTS paginationapi_latest_buckets_idx ON paginationapi_latest_buckets(room_id); ` -const getBucketForEventSQL = `SELECT bucket_id FROM paginationapi_events WHERE event_id = $1` +const getBucketForEventSQL = `SELECT bucket_id FROM paginationapi_event_to_bucket INNER JOIN paginationapi_events ON id = event_nid WHERE event_id = $1` const getBucketsForParentEventsSQL = ` SELECT - p.bucket_id + b.bucket_id FROM paginationapi_event_edges - INNER JOIN paginationapi_events AS p - ON parent_id = p.id INNER JOIN paginationapi_events AS c ON child_id = c.id + INNER JOIN paginationapi_event_to_bucket AS b + ON b.event_nid = parent_id WHERE c.event_id = $1 - AND p.bucket_id IS NOT NULL + AND b.bucket_id IS NOT NULL ` const latestBucketSQL = ` -SELECT bucket_id FROM paginationapi_latest_buckets AS l -INNER JOIN paginationapi_rooms AS r ON r.id = l.room_id +SELECT + bucket_id +FROM paginationapi_latest_buckets AS l + INNER JOIN paginationapi_rooms AS r + ON r.id = l.room_id WHERE r.room_id = $1 ` +const maxSizeOfBuckets uint = 5 + type eventBucketsStatements struct { getBucketForEventStmt *sql.Stmt getBucketsForParentEventsStmt *sql.Stmt @@ -171,7 +188,7 @@ func (s *eventBucketsStatements) getLatestBucketForEvents(ctx context.Context, t } } - return nil, nil + return maxBucketID, nil } func (s *eventBucketsStatements) getLatestBucketID(ctx context.Context, txn *sql.Tx, roomID string) (*uint, error) { @@ -208,11 +225,20 @@ func (s *eventBucketsStatements) insertIntoOrAfterBucket( // TODO: Create a new bucket if there are already too many events in that // bucket - _, err := txn.ExecContext( - ctx, "INSERT INTO paginationapi_events (event_id, bucket_id) VALUES ($1, $2)", - event.EventID(), bucketID, - ) - return err + var count uint + err := txn.QueryRowContext( + ctx, "SELECT count(*) FROM paginationapi_event_to_bucket WHERE bucket_id = $1", bucketID, + ).Scan(&count) + + if err != nil { + return err + } + + if count >= maxSizeOfBuckets { + return s.insertAfterBucket(ctx, txn, event, bucketID) + } + + return s.insertEventToBucket(ctx, txn, event, bucketID) } func (s *eventBucketsStatements) insertAfterBucket( @@ -220,7 +246,7 @@ func (s *eventBucketsStatements) insertAfterBucket( ) error { var roomNID uint err := txn.QueryRowContext( - ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), + ctx, "SELECT id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), ).Scan(&roomNID) if err != nil { @@ -239,6 +265,11 @@ func (s *eventBucketsStatements) insertAfterBucket( return err } + err = s.insertEventToBucket(ctx, txn, event, newBucketID) + if err != nil { + return err + } + _, err = txn.ExecContext( ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2) ON CONFLICT (room_id) DO UPDATE SET bucket_id = $2", roomNID, newBucketID, @@ -250,10 +281,10 @@ func (s *eventBucketsStatements) insertAfterBucket( func (s *eventBucketsStatements) createBucketAndInsert( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, ) error { - var roomNID uint - err := txn.QueryRowContext( - ctx, "SELECT room_id FROM paginationapi_rooms WHERE room_id = $1", event.RoomID(), - ).Scan(&roomNID) + _, err := txn.ExecContext( + ctx, "INSERT INTO paginationapi_rooms (room_id) VALUES ($1) ON CONFLICT (room_id) DO NOTHING", + event.RoomID(), + ) if err != nil { return err @@ -261,18 +292,165 @@ func (s *eventBucketsStatements) createBucketAndInsert( var newBucketID uint err = txn.QueryRowContext( - ctx, "INSERT INTO paginationapi_buckets (room_id) VALUES ($1) RETURNING id", - roomNID, + ctx, "INSERT INTO paginationapi_buckets (room_id) SELECT id FROM paginationapi_rooms WHERE room_id = $1 RETURNING id", + event.RoomID(), ).Scan(&newBucketID) if err != nil { return err } + err = s.insertEventToBucket(ctx, txn, event, newBucketID) + if err != nil { + return err + } + _, err = txn.ExecContext( - ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) VALUES ($1, $2)", - roomNID, newBucketID, + ctx, "INSERT INTO paginationapi_latest_buckets (room_id, bucket_id) SELECT id, $2 FROM paginationapi_rooms WHERE room_id = $1", + event.RoomID(), newBucketID, ) return err } + +func (s *eventBucketsStatements) insertEventToBucket(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, bucketID uint) error { + _, err := txn.ExecContext( + ctx, "INSERT INTO paginationapi_events (event_id) VALUES ($1) ON CONFLICT (event_id) DO NOTHING", + event.EventID(), + ) + + if err != nil { + return err + } + + var maxOrder sql.NullInt64 + err = txn.QueryRowContext( + ctx, + "SELECT min(ordering) FROM paginationapi_event_to_bucket INNER JOIN paginationapi_event_edges ON parent_id = event_nid INNER JOIN paginationapi_events AS e ON child_id = e.id WHERE e.event_id = $1", + event.EventID(), + ).Scan(&maxOrder) + if err != nil { + return err + } + + var order uint + if maxOrder.Valid { + order = uint(maxOrder.Int64) + + _, err = txn.ExecContext( + ctx, "UPDATE paginationapi_event_to_bucket SET ordering = ordering + 1 WHERE bucket_id = $1 AND ordering >= $2", + bucketID, order, + ) + if err != nil { + return err + } + } else { + var currentMaxOrder uint + err = txn.QueryRowContext( + ctx, + "SELECT COALESCE(max(ordering), 0) FROM paginationapi_event_to_bucket WHERE bucket_id = $1", + bucketID, + ).Scan(¤tMaxOrder) + if err != nil { + return err + } + + order = currentMaxOrder + 1 + } + + var eventNID uint + err = txn.QueryRowContext( + ctx, "SELECT id FROM paginationapi_events WHERE event_id = $1", + event.EventID(), + ).Scan(&eventNID) + if err != nil { + return err + } + + _, err = txn.ExecContext( + ctx, "INSERT INTO paginationapi_event_to_bucket (event_nid, bucket_id, ordering) SELECT id, $2, $3 FROM paginationapi_events WHERE event_id = $1", + event.EventID(), bucketID, order, + ) + + if err != nil { + return err + } + + for _, childID := range event.PrevEventIDs() { + _, err = txn.ExecContext( + ctx, "INSERT INTO paginationapi_events (event_id) VALUES ($1) ON CONFLICT (event_id) DO NOTHING", + childID, + ) + + if err != nil { + return err + } + + _, err = txn.ExecContext( + ctx, "INSERT INTO paginationapi_event_edges (parent_id, child_id) SELECT $1, id FROM paginationapi_events WHERE event_id = $2", + eventNID, childID, + ) + + if err != nil { + return err + } + } + + return nil +} + +func (s *eventBucketsStatements) paginate(ctx context.Context, txn *sql.Tx, baseEventID string, limit int) ([]string, error) { + var eventIDs []string + + var ( + bucketID uint + order uint + ) + err := txn.QueryRowContext( + ctx, "SELECT bucket_id, ordering FROM paginationapi_events INNER JOIN paginationapi_event_to_bucket ON id = event_nid WHERE event_id = $1", + baseEventID, + ).Scan(&bucketID, &order) + + if err != nil { + return nil, errors.WithMessage(err, "failed to lookup current bucket") + } + + for len(eventIDs) < limit { + var rows *sql.Rows + if order > 0 { + rows, err = txn.QueryContext(ctx, "SELECT event_id FROM paginationapi_event_to_bucket INNER JOIN paginationapi_events ON event_nid = id WHERE bucket_id = $1 AND ordering < $2 ORDER BY ordering DESC", bucketID, order) + } else { + rows, err = txn.QueryContext(ctx, "SELECT event_id FROM paginationapi_event_to_bucket INNER JOIN paginationapi_events ON event_nid = id WHERE bucket_id = $1 ORDER BY ordering DESC", bucketID) + } + if err != nil { + return nil, err + } + + for rows.Next() { + var eventID string + if err = rows.Scan(&eventID); err != nil { + return nil, err + } + + eventIDs = append(eventIDs, eventID) + } + + if len(eventIDs) >= limit { + break + } + + order = 0 + err := txn.QueryRowContext( + ctx, "SELECT previous_bucket FROM paginationapi_buckets WHERE id = $1", + bucketID, + ).Scan(&bucketID) + + if err == sql.ErrNoRows { + break + } else if err != nil { + return nil, err + } + } + + return eventIDs, nil +} diff --git a/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go b/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go index aeb2b4bc..18a6859b 100644 --- a/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go +++ b/src/github.com/matrix-org/dendrite/paginationapi/storage/storage.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" + "github.com/sirupsen/logrus" + // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -47,8 +49,16 @@ func NewPaginationAPIDatabase(dataSourceName string) (*PaginationAPIDatabase, er return &d, nil } -func (s *PaginationAPIDatabase) Paginate(ctx context.Context, eventID string, limit uint) ([]string, error) { - return nil, nil +func (s *PaginationAPIDatabase) Paginate(ctx context.Context, eventID string, limit int) ([]string, error) { + var eventIDs []string + + err := common.WithTransaction(s.db, func(txn *sql.Tx) error { + var err error + eventIDs, err = s.eventBuckets.paginate(ctx, txn, eventID, limit) + return err + }) + + return eventIDs, err } func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixserverlib.Event) error { @@ -83,6 +93,12 @@ func (s *PaginationAPIDatabase) AddEvent(ctx context.Context, event *gomatrixser result := CalculateEventInsertion(minParentBucketID, maxChildBucketID, maxCurrentBucketID, true) + logrus.WithFields(logrus.Fields{ + "behaviour": result.Behaviour, + "bucket_id": result.BucketID, + "event_id": eventID, + }).Warn("Resutl") + switch result.Behaviour { case AppendAfter: return s.eventBuckets.insertAfterBucket(ctx, txn, event, result.BucketID) diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 8a5b9648..b6b8b712 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -297,6 +297,7 @@ func (d *SyncServerDatabase) CompleteSync( stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) + jr.Timeline.PrevBatch = recentEvents[len(recentEvents)-1].EventID() jr.Timeline.Limited = true jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[roomID] = *jr @@ -425,6 +426,7 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( jr := types.NewJoinResponse() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + jr.Timeline.PrevBatch = recentEvents[len(recentEvents)-1].EventID() jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Join[delta.roomID] = *jr case "leave":