Store and retrieve backward extremities to and from the database

This commit is contained in:
Brendan Abolivier 2018-12-13 09:26:55 +00:00
parent 570a76926d
commit ad5b16a17b
No known key found for this signature in database
GPG key ID: 8EF1500759F70623
3 changed files with 190 additions and 41 deletions

View file

@ -186,8 +186,6 @@ func (r *messagesReq) retrieveEvents() (
if len(streamEvents) == 0 {
if events, err = r.handleEmptyEventsSlice(); err != nil {
return
} else if len(events) == 0 {
return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil
}
} else {
if events, err = r.handleNonEmptyEventsSlice(streamEvents); err != nil {
@ -195,6 +193,11 @@ func (r *messagesReq) retrieveEvents() (
}
}
// If we didn't get any event, we don't need to proceed any further.
if len(events) == 0 {
return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil
}
// Sort the events to ensure we send them in the right order. We currently
// do that based on the event's timestamp.
if r.backwardOrdering {
@ -267,37 +270,18 @@ func (r *messagesReq) retrieveEvents() (
func (r *messagesReq) handleEmptyEventsSlice() (
events []gomatrixserverlib.Event, err error,
) {
var evs []storage.StreamEvent
// Determine what could be the oldest position of interest in the room's
// topology for this.
var laterPosition types.StreamPosition
if r.backwardOrdering {
laterPosition = r.from.Position + 1
} else {
laterPosition = r.to.Position + 1
}
backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
// Retrieve events at that position.
evs, err = r.db.EventsAtTopologicalPosition(r.ctx, r.roomID, laterPosition)
if err != nil {
return
}
// Check if one of these events is a backward extremity.
backwardExtremity, err := r.containsBackwardExtremity(evs)
if err != nil {
return
}
// If so, retrieve as much events as requested through backfilling.
if backwardExtremity {
events, err = r.backfill(evs[0].EventID(), r.limit)
// Check if we have backward extremities for this room.
if len(backwardExtremities) > 0 {
// If so, retrieve as much events as needed through backfilling.
events, err = r.backfill(backwardExtremities, r.limit)
if err != nil {
return
}
} else {
// If not, it means the slice was empty because we reached the limit of
// the room's topology, so return an empty slice.
// If not, it means the slice was empty because we reached the room's
// creation, so return an empty slice.
events = []gomatrixserverlib.Event{}
}
@ -327,17 +311,17 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []storage.StreamEve
}
// Check if the slice contains a backward extremity.
backwardExtremity, err := r.containsBackwardExtremity(streamEvents)
backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID)
if err != nil {
return
}
// Backfill is needed if we've reached a backward extremity and need more
// events. It's only needed if the direction is backward.
if backwardExtremity && !isSetLargeEnough && r.backwardOrdering {
if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering {
var pdus []gomatrixserverlib.Event
// Only ask the remote server for enough events to reach the limit.
pdus, err = r.backfill(streamEvents[0].EventID(), r.limit-len(streamEvents))
pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents))
if err != nil {
return
}
@ -404,13 +388,13 @@ func (r *messagesReq) containsBackwardExtremity(events []storage.StreamEvent) (b
// event, or if there is no remote homeserver to contact.
// Returns an error if there was an issue with retrieving the list of servers in
// the room or sending the request.
func (r *messagesReq) backfill(fromEventID string, limit int) ([]gomatrixserverlib.Event, error) {
// Query the list of servers in the room when the earlier event we know
// of was sent.
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.Event, error) {
// Query the list of servers in the room when one of the backward extremities
// was sent.
var serversResponse api.QueryServersInRoomAtEventResponse
serversRequest := api.QueryServersInRoomAtEventRequest{
RoomID: r.roomID,
EventID: fromEventID,
EventID: fromEventIDs[0],
}
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
return nil, err
@ -438,7 +422,7 @@ func (r *messagesReq) backfill(fromEventID string, limit int) ([]gomatrixserverl
// send it a request for backfill.
if len(srvToBackfillFrom) > 0 {
txn, err := r.federation.Backfill(
r.ctx, srvToBackfillFrom, r.roomID, limit, []string{fromEventID},
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
)
if err != nil {
return nil, err

View file

@ -0,0 +1,118 @@
// Copyright 2018 New Vector Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"database/sql"
)
const backwardExtremitiesSchema = `
-- Stores output room events received from the roomserver.
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
-- The 'room_id' key for the event.
room_id TEXT NOT NULL,
-- The event ID for the event.
event_id TEXT NOT NULL,
PRIMARY KEY(room_id, event_id)
);
`
const insertBackwardExtremitySQL = "" +
"INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
" VALUES ($1, $2)"
const selectBackwardExtremitiesForRoomSQL = "" +
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
const isBackwardExtremitySQL = "" +
"SELECT EXISTS (" +
" SELECT TRUE FROM syncapi_backward_extremities" +
" WHERE room_id = $1 AND event_id = $2" +
")"
const deleteBackwardExtremitySQL = "" +
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2"
type backwardExtremitiesStatements struct {
insertBackwardExtremityStmt *sql.Stmt
selectBackwardExtremitiesForRoomStmt *sql.Stmt
isBackwardExtremityStmt *sql.Stmt
deleteBackwardExtremityStmt *sql.Stmt
}
func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
_, err = db.Exec(backwardExtremitiesSchema)
if err != nil {
return
}
if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil {
return
}
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
return
}
if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
return
}
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
return
}
return
}
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
ctx context.Context, roomID, eventID string,
) (err error) {
_, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
return
}
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (eventIDs []string, err error) {
eventIDs = make([]string, 0)
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
if err != nil {
return
}
for rows.Next() {
var eID string
if err = rows.Scan(&eID); err != nil {
return
}
eventIDs = append(eventIDs, eID)
}
return
}
func (s *backwardExtremitiesStatements) isBackwardExtremity(
ctx context.Context, roomID, eventID string,
) (isBE bool, err error) {
err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
return
}
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
ctx context.Context, roomID, eventID string,
) (err error) {
_, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
return
}

View file

@ -52,11 +52,12 @@ type StreamEvent struct {
type SyncServerDatabase struct {
db *sql.DB
common.PartitionOffsetStatements
accountData accountDataStatements
events outputRoomEventsStatements
roomstate currentRoomStateStatements
invites inviteEventsStatements
topology outputRoomEventsTopologyStatements
accountData accountDataStatements
events outputRoomEventsStatements
roomstate currentRoomStateStatements
invites inviteEventsStatements
topology outputRoomEventsTopologyStatements
backwardExtremities backwardExtremitiesStatements
}
// NewSyncServerDatabase creates a new sync server database
@ -84,6 +85,9 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) {
if err := d.topology.prepare(d.db); err != nil {
return nil, err
}
if err := d.backwardExtremities.prepare(d.db); err != nil {
return nil, err
}
return &d, nil
}
@ -132,6 +136,41 @@ func (d *SyncServerDatabase) WriteEvent(
return err
}
// If the event is already known as a backward extremity, don't consider
// it as such anymore now that we have it.
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
if err != nil {
return err
}
if isBackwardExtremity {
if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
return err
}
}
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs())
if err != nil {
return err
}
var found bool
for _, eID := range ev.PrevEventIDs() {
found = false
for _, prevEv := range prevEvents {
if eID == prevEv.EventID() {
found = true
}
}
// If the event is missing, consider it a backward extremity.
if !found {
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
return err
}
}
}
if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 {
// Nothing to do, the event may have just been a message event.
return nil
@ -261,6 +300,14 @@ func (d *SyncServerDatabase) GetEventsInRange(
return
}
// BackwardExtremitiesForRoom returns the event IDs of all of the backward
// extremities we know of for a given room.
func (d *SyncServerDatabase) BackwardExtremitiesForRoom(
ctx context.Context, roomID string,
) (backwardExtremities []string, err error) {
return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID)
}
// MaxTopologicalPosition returns the highest topological position for a given
// room.
func (d *SyncServerDatabase) MaxTopologicalPosition(