mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-08 04:43:39 +00:00
Store room topology and use it in responses to sync requests
This commit is contained in:
parent
e1ee10c7bb
commit
b205931819
2 changed files with 174 additions and 15 deletions
|
@ -0,0 +1,140 @@
|
|||
// Copyright 2018 New Vector Ltd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
const outputRoomEventsTopologySchema = `
|
||||
-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_output_room_events_topology (
|
||||
-- The event ID for the event.
|
||||
event_id TEXT PRIMARY KEY,
|
||||
-- The place of the event in the room's topology. This can usually be determined
|
||||
-- from the event's depth.
|
||||
topological_position BIGINT NOT NULL,
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL
|
||||
);
|
||||
-- The topological order will be used in events selection and ordering
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON syncapi_output_room_events_topology(topological_position, room_id);
|
||||
`
|
||||
|
||||
const insertEventInTopologySQL = "" +
|
||||
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id)" +
|
||||
" VALUES ($1, $2, $3)"
|
||||
|
||||
const selectEventIDsInRangeASCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
" WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" +
|
||||
" ORDER BY topological_position ASC"
|
||||
|
||||
const selectEventIDsInRangeDESCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
" WHERE room_id = $1 AND topological_position > $2 AND topological_position <= $3" +
|
||||
" ORDER BY topological_position DESC"
|
||||
|
||||
const selectPositionInTopologySQL = "" +
|
||||
"SELECT topological_position FROM syncapi_output_room_events_topology" +
|
||||
" WHERE event_id = $1"
|
||||
|
||||
type outputRoomEventsTopologyStatements struct {
|
||||
insertEventInTopologyStmt *sql.Stmt
|
||||
selectEventIDsInRangeASCStmt *sql.Stmt
|
||||
selectEventIDsInRangeDESCStmt *sql.Stmt
|
||||
selectPositionInTopologyStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsTopologyStatements) prepare(db *sql.DB) (err error) {
|
||||
_, err = db.Exec(outputRoomEventsTopologySchema)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if s.insertEventInTopologyStmt, err = db.Prepare(insertEventInTopologySQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventIDsInRangeASCStmt, err = db.Prepare(selectEventIDsInRangeASCSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectEventIDsInRangeDESCStmt, err = db.Prepare(selectEventIDsInRangeDESCSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectPositionInTopologyStmt, err = db.Prepare(selectPositionInTopologySQL); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// insertEventInTopology inserts the given event in the room's topology, based
|
||||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) insertEventInTopology(
|
||||
ctx context.Context, event *gomatrixserverlib.Event,
|
||||
) (err error) {
|
||||
_, err = s.insertEventInTopologyStmt.ExecContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// selectEventIDsInRange selects the IDs of events which positions are within a
|
||||
// given range in a given room's topological order.
|
||||
// Returns an empty slice if no events match the given range.
|
||||
func (s *outputRoomEventsTopologyStatements) selectEventIDsInRange(
|
||||
ctx context.Context, roomID string, fromPos, toPos types.StreamPosition,
|
||||
chronologicalOrder bool,
|
||||
) (eventIDs []string, err error) {
|
||||
// Decide on the selection's order according to whether chronological order
|
||||
// is requested or not.
|
||||
var stmt *sql.Stmt
|
||||
if chronologicalOrder {
|
||||
stmt = s.selectEventIDsInRangeASCStmt
|
||||
} else {
|
||||
stmt = s.selectEventIDsInRangeDESCStmt
|
||||
}
|
||||
|
||||
// Query the event IDs.
|
||||
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos)
|
||||
if err == sql.ErrNoRows {
|
||||
// If no event matched the request, return an empty slice.
|
||||
return []string{}, nil
|
||||
} else if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Return the IDs.
|
||||
var eventID string
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&eventID); err != nil {
|
||||
return
|
||||
}
|
||||
eventIDs = append(eventIDs, eventID)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// selectPositionInTopology returns the position of a given event in the
|
||||
// topology of the room it belongs to.
|
||||
func (s *outputRoomEventsTopologyStatements) selectPositionInTopology(
|
||||
eventID string,
|
||||
) (pos types.StreamPosition, err error) {
|
||||
err = s.selectPositionInTopologyStmt.QueryRow(eventID).Scan(&pos)
|
||||
return
|
||||
}
|
|
@ -56,6 +56,7 @@ type SyncServerDatabase struct {
|
|||
events outputRoomEventsStatements
|
||||
roomstate currentRoomStateStatements
|
||||
invites inviteEventsStatements
|
||||
topology outputRoomEventsTopologyStatements
|
||||
}
|
||||
|
||||
// NewSyncServerDatabase creates a new sync server database
|
||||
|
@ -80,6 +81,9 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
|
|||
if err := d.invites.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := d.topology.prepare(d.db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
|
@ -124,6 +128,10 @@ func (d *SyncServerDatabase) WriteEvent(
|
|||
}
|
||||
streamPos = types.StreamPosition(pos)
|
||||
|
||||
if err = d.topology.insertEventInTopology(ctx, ev); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
|
||||
// Nothing to do, the event may have just been a message event.
|
||||
return nil
|
||||
|
@ -310,7 +318,6 @@ func (d *SyncServerDatabase) CompleteSync(
|
|||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
res := types.NewResponse(pos)
|
||||
var prevBatch types.StreamPosition
|
||||
for _, roomID := range roomIDs {
|
||||
var stateEvents []gomatrixserverlib.Event
|
||||
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
|
||||
|
@ -328,16 +335,24 @@ func (d *SyncServerDatabase) CompleteSync(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Retrieve the backward topology position, i.e. the position of the
|
||||
// oldest event in the room's topology.
|
||||
var backwardTopologyPos types.StreamPosition
|
||||
backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if backwardTopologyPos-1 <= 0 {
|
||||
backwardTopologyPos = types.StreamPosition(1)
|
||||
}
|
||||
|
||||
// We don't include a device here as we don't need to send down
|
||||
// transaction IDs for complete syncs
|
||||
recentEvents := StreamEventsToEvents(nil, recentStreamEvents)
|
||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||
jr := types.NewJoinResponse()
|
||||
if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 {
|
||||
prevBatch = 1
|
||||
}
|
||||
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
|
||||
types.PaginationTokenTypeStream, prevBatch,
|
||||
types.PaginationTokenTypeTopology, backwardTopologyPos-1,
|
||||
).String()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = true
|
||||
|
@ -465,15 +480,23 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
|||
return nil
|
||||
}
|
||||
|
||||
// Retrieve the backward topology position, i.e. the position of the
|
||||
// oldest event in the room's topology.
|
||||
var backwardTopologyPos types.StreamPosition
|
||||
backwardTopologyPos, err = d.topology.selectPositionInTopology(recentStreamEvents[0].EventID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if backwardTopologyPos-1 <= 0 {
|
||||
backwardTopologyPos = types.StreamPosition(1)
|
||||
}
|
||||
|
||||
switch delta.membership {
|
||||
case "join":
|
||||
jr := types.NewJoinResponse()
|
||||
var prevBatch types.StreamPosition
|
||||
if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 {
|
||||
prevBatch = 1
|
||||
}
|
||||
|
||||
jr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
|
||||
types.PaginationTokenTypeStream, prevBatch,
|
||||
types.PaginationTokenTypeTopology, backwardTopologyPos-1,
|
||||
).String()
|
||||
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
|
@ -485,12 +508,8 @@ func (d *SyncServerDatabase) addRoomDeltaToResponse(
|
|||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||
// no longer in the room.
|
||||
lr := types.NewLeaveResponse()
|
||||
var prevBatch types.StreamPosition
|
||||
if prevBatch = recentStreamEvents[0].StreamPosition - 1; prevBatch <= 0 {
|
||||
prevBatch = 1
|
||||
}
|
||||
lr.Timeline.PrevBatch = types.NewPaginationTokenFromTypeAndPosition(
|
||||
types.PaginationTokenTypeStream, prevBatch,
|
||||
types.PaginationTokenTypeStream, backwardTopologyPos-1,
|
||||
).String()
|
||||
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||
|
|
Loading…
Reference in a new issue