diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go new file mode 100644 index 00000000..c698c5ad --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_topology_table.go @@ -0,0 +1,140 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" + + "github.com/matrix-org/dendrite/syncapi/types" + "github.com/matrix-org/gomatrixserverlib" +) + +const outputRoomEventsTopologySchema = ` +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology ( + -- The event ID for the event. + event_id TEXT PRIMARY KEY, + -- The place of the event in the room's topology. This can usually be determined + -- from the event's depth. + topological_position BIGINT NOT NULL, + -- The 'room_id' key for the event. + room_id TEXT NOT NULL +); +-- The topological order will be used in events selection and ordering +CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, room_id); +` + +const insertEventInTopologySQL = "" + + "INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" + + " VALUES ($1, $2, $3)" + +const selectEventIDsInRangeASCSQL = "" + + "SELECT event_id FROM syncapi_output_room_events_topology" + + " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + + " ORDER BY topological_position ASC" + +const selectEventIDsInRangeDESCSQL = "" + + "SELECT event_id FROM syncapi_output_room_events_topology" + + " WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" + + " ORDER BY topological_position DESC" + +const selectPositionInTopologySQL = "" + + "SELECT topological_position FROM syncapi_output_room_events_topology" + + " WHERE event_id = $1" + +type outputRoomEventsTopologyStatements struct { + insertEventInTopologyStmt *sql.Stmt + selectEventIDsInRangeASCStmt *sql.Stmt + selectEventIDsInRangeDESCStmt *sql.Stmt + selectPositionInTopologyStmt *sql.Stmt +} + +func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(outputRoomEventsTopologySchema) + if err != nil { + return + } + if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil { + return + } + if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil { + return + } + if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil { + return + } + if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil { + return + } + return +} + +// insertEventInTopology inserts the given event in the room's topology, based +// on the event's depth. +func (s *outputRoomEventsTopologyStatements) insertEventInTopology( + ctx context.Context, event *gomatrixserverlib.Event, +) (err error) { + _, err = s.insertEventInTopologyStmt.ExecContext( + ctx, event.EventID(), event.Depth(), event.RoomID(), + ) + return +} + +// selectEventIDsInRange selects the IDs of events which positions are within a +// given range in a given room's topological order. +// Returns an empty slice if no events match the given range. +func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange( + ctx context.Context, roomID string, fromPos, toPos types.StreamPosition, + chronologicalOrder bool, +) (eventIDs []string, err error) { + // Decide on the selection's order according to whether chronological order + // is requested or not. + var stmt *sql.Stmt + if chronologicalOrder { + stmt = s.selectEventIDsInRangeASCStmt + } else { + stmt = s.selectEventIDsInRangeDESCStmt + } + + // Query the event IDs. + rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos) + if err == sql.ErrNoRows { + // If no event matched the request, return an empty slice. + return []string{}, nil + } else if err != nil { + return + } + + // Return the IDs. + var eventID string + for rows.Next() { + if err = rows.Scan(&eventID); err != nil { + return + } + eventIDs = append(eventIDs, eventID) + } + + return +} + +// selectPositionInTopology returns the position of a given event in the +// topology of the room it belongs to. +func (s *outputRoomEventsTopologyStatements) selectPositionInTopology( + eventID string, +) (pos types.StreamPosition, err error) { + err = s.selectPositionInTopologyStmt.QueryRow(eventID).Scan(&pos) + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 380cf10b..5859f96b 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -56,6 +56,7 @@ type SyncServerDatabase struct { events outputRoomEventsStatements roomstate currentRoomStateStatements invites inviteEventsStatements + topology outputRoomEventsTopologyStatements } // NewSyncServerDatabase creates a new sync server database @@ -80,6 +81,9 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err := d.invites.prepare(d.db); err != nil { return nil, err } + if err := d.topology.prepare(d.db); err != nil { + return nil, err + } return &d, nil } @@ -124,6 +128,10 @@ func (d *SyncServerDatabase) WriteEvent( } streamPos = types.StreamPosition(pos) + if err = d.topology.insertEventInTopology(ctx, ev); err != nil { + return err + } + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. return nil @@ -310,7 +318,6 @@ func (d *SyncServerDatabase) CompleteSync( // Build up a /sync response. Add joined rooms. res := types.NewResponse(pos) - var prevBatch types.StreamPosition for _, roomID := range roomIDs { var stateEvents []gomatrixserverlib.Event stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID) @@ -328,16 +335,24 @@ func (d *SyncServerDatabase) CompleteSync( return nil, err } + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var backwardTopologyPos types.StreamPosition + backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID()) + if err != nil { + return nil, err + } + if backwardTopologyPos-1 <= 0 { + backwardTopologyPos = types.StreamPosition(1) + } + // We don't include a device here as we don't need to send down // transaction IDs for complete syncs recentEvents := StreamEventsToEvents(nil, recentStreamEvents) stateEvents = removeDuplicates(stateEvents, recentEvents) jr := types.NewJoinResponse() - if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 { - prevBatch = 1 - } jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeStream, prevBatch, + types.PaginationTokenTypeTopology, backwardTopologyPos-1, ).String() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = true @@ -465,15 +480,23 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( return nil } + // Retrieve the backward topology position, i.e. the position of the + // oldest event in the room's topology. + var backwardTopologyPos types.StreamPosition + backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID()) + if err != nil { + return err + } + if backwardTopologyPos-1 <= 0 { + backwardTopologyPos = types.StreamPosition(1) + } + switch delta.membership { case "join": jr := types.NewJoinResponse() - var prevBatch types.StreamPosition - if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 { - prevBatch = 1 - } + jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeStream, prevBatch, + types.PaginationTokenTypeTopology, backwardTopologyPos-1, ).String() jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true @@ -485,12 +508,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse( // TODO: recentEvents may contain events that this user is not allowed to see because they are // no longer in the room. lr := types.NewLeaveResponse() - var prevBatch types.StreamPosition - if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 { - prevBatch = 1 - } lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition( - types.PaginationTokenTypeStream, prevBatch, + types.PaginationTokenTypeStream, backwardTopologyPos-1, ).String() lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync) lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true