From ad5b16a17bd83a2fd32b0d640a96b29df296f3e7 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 13 Dec 2018 09:26:55 +0000 Subject: [PATCH] Store and retrieve backward extremities to and from the database --- .../dendrite/syncapi/routing/messages.go | 56 +++------ .../storage/backward_extremities_table.go | 118 ++++++++++++++++++ .../dendrite/syncapi/storage/syncserver.go | 57 ++++++++- 3 files changed, 190 insertions(+), 41 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/syncapi/storage/backward_extremities_table.go diff --git a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go index 0b75279e..ef8768b7 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go +++ b/src/github.com/matrix-org/dendrite/syncapi/routing/messages.go @@ -186,8 +186,6 @@ func (r *messagesReq) retrieveEvents() ( if len(streamEvents) == 0 { if events, err = r.handleEmptyEventsSlice(); err != nil { return - } else if len(events) == 0 { - return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil } } else { if events, err = r.handleNonEmptyEventsSlice(streamEvents); err != nil { @@ -195,6 +193,11 @@ func (r *messagesReq) retrieveEvents() ( } } + // If we didn't get any event, we don't need to proceed any further. + if len(events) == 0 { + return []gomatrixserverlib.ClientEvent{}, r.from, r.to, nil + } + // Sort the events to ensure we send them in the right order. We currently // do that based on the event's timestamp. if r.backwardOrdering { @@ -267,37 +270,18 @@ func (r *messagesReq) retrieveEvents() ( func (r *messagesReq) handleEmptyEventsSlice() ( events []gomatrixserverlib.Event, err error, ) { - var evs []storage.StreamEvent - // Determine what could be the oldest position of interest in the room's - // topology for this. - var laterPosition types.StreamPosition - if r.backwardOrdering { - laterPosition = r.from.Position + 1 - } else { - laterPosition = r.to.Position + 1 - } + backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID) - // Retrieve events at that position. - evs, err = r.db.EventsAtTopologicalPosition(r.ctx, r.roomID, laterPosition) - if err != nil { - return - } - - // Check if one of these events is a backward extremity. - backwardExtremity, err := r.containsBackwardExtremity(evs) - if err != nil { - return - } - - // If so, retrieve as much events as requested through backfilling. - if backwardExtremity { - events, err = r.backfill(evs[0].EventID(), r.limit) + // Check if we have backward extremities for this room. + if len(backwardExtremities) > 0 { + // If so, retrieve as much events as needed through backfilling. + events, err = r.backfill(backwardExtremities, r.limit) if err != nil { return } } else { - // If not, it means the slice was empty because we reached the limit of - // the room's topology, so return an empty slice. + // If not, it means the slice was empty because we reached the room's + // creation, so return an empty slice. events = []gomatrixserverlib.Event{} } @@ -327,17 +311,17 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []storage.StreamEve } // Check if the slice contains a backward extremity. - backwardExtremity, err := r.containsBackwardExtremity(streamEvents) + backwardExtremities, err := r.db.BackwardExtremitiesForRoom(r.ctx, r.roomID) if err != nil { return } // Backfill is needed if we've reached a backward extremity and need more // events. It's only needed if the direction is backward. - if backwardExtremity && !isSetLargeEnough && r.backwardOrdering { + if len(backwardExtremities) > 0 && !isSetLargeEnough && r.backwardOrdering { var pdus []gomatrixserverlib.Event // Only ask the remote server for enough events to reach the limit. - pdus, err = r.backfill(streamEvents[0].EventID(), r.limit-len(streamEvents)) + pdus, err = r.backfill(backwardExtremities, r.limit-len(streamEvents)) if err != nil { return } @@ -404,13 +388,13 @@ func (r *messagesReq) containsBackwardExtremity(events []storage.StreamEvent) (b // event, or if there is no remote homeserver to contact. // Returns an error if there was an issue with retrieving the list of servers in // the room or sending the request. -func (r *messagesReq) backfill(fromEventID string, limit int) ([]gomatrixserverlib.Event, error) { - // Query the list of servers in the room when the earlier event we know - // of was sent. +func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.Event, error) { + // Query the list of servers in the room when one of the backward extremities + // was sent. var serversResponse api.QueryServersInRoomAtEventResponse serversRequest := api.QueryServersInRoomAtEventRequest{ RoomID: r.roomID, - EventID: fromEventID, + EventID: fromEventIDs[0], } if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil { return nil, err @@ -438,7 +422,7 @@ func (r *messagesReq) backfill(fromEventID string, limit int) ([]gomatrixserverl // send it a request for backfill. if len(srvToBackfillFrom) > 0 { txn, err := r.federation.Backfill( - r.ctx, srvToBackfillFrom, r.roomID, limit, []string{fromEventID}, + r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs, ) if err != nil { return nil, err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/backward_extremities_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/backward_extremities_table.go new file mode 100644 index 00000000..147a11bf --- /dev/null +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/backward_extremities_table.go @@ -0,0 +1,118 @@ +// Copyright 2018 New Vector Ltd +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + "database/sql" +) + +const backwardExtremitiesSchema = ` +-- Stores output room events received from the roomserver. +CREATE TABLE IF NOT EXISTS syncapi_backward_extremities ( + -- The 'room_id' key for the event. + room_id TEXT NOT NULL, + -- The event ID for the event. + event_id TEXT NOT NULL, + + PRIMARY KEY(room_id, event_id) +); +` + +const insertBackwardExtremitySQL = "" + + "INSERT INTO syncapi_backward_extremities (room_id, event_id)" + + " VALUES ($1, $2)" + +const selectBackwardExtremitiesForRoomSQL = "" + + "SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1" + +const isBackwardExtremitySQL = "" + + "SELECT EXISTS (" + + " SELECT TRUE FROM syncapi_backward_extremities" + + " WHERE room_id = $1 AND event_id = $2" + + ")" + +const deleteBackwardExtremitySQL = "" + + "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2" + +type backwardExtremitiesStatements struct { + insertBackwardExtremityStmt *sql.Stmt + selectBackwardExtremitiesForRoomStmt *sql.Stmt + isBackwardExtremityStmt *sql.Stmt + deleteBackwardExtremityStmt *sql.Stmt +} + +func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) { + _, err = db.Exec(backwardExtremitiesSchema) + if err != nil { + return + } + if s.insertBackwardExtremityStmt, err = db.Prepare(insertBackwardExtremitySQL); err != nil { + return + } + if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil { + return + } + if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil { + return + } + if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil { + return + } + return +} + +func (s *backwardExtremitiesStatements) insertsBackwardExtremity( + ctx context.Context, roomID, eventID string, +) (err error) { + _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + return +} + +func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom( + ctx context.Context, roomID string, +) (eventIDs []string, err error) { + eventIDs = make([]string, 0) + + rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID) + if err != nil { + return + } + + for rows.Next() { + var eID string + if err = rows.Scan(&eID); err != nil { + return + } + + eventIDs = append(eventIDs, eID) + } + + return +} + +func (s *backwardExtremitiesStatements) isBackwardExtremity( + ctx context.Context, roomID, eventID string, +) (isBE bool, err error) { + err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE) + return +} + +func (s *backwardExtremitiesStatements) deleteBackwardExtremity( + ctx context.Context, roomID, eventID string, +) (err error) { + _, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID) + return +} diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 1d9673e7..34f723f3 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -52,11 +52,12 @@ type StreamEvent struct { type SyncServerDatabase struct { db *sql.DB common.PartitionOffsetStatements - accountData accountDataStatements - events outputRoomEventsStatements - roomstate currentRoomStateStatements - invites inviteEventsStatements - topology outputRoomEventsTopologyStatements + accountData accountDataStatements + events outputRoomEventsStatements + roomstate currentRoomStateStatements + invites inviteEventsStatements + topology outputRoomEventsTopologyStatements + backwardExtremities backwardExtremitiesStatements } // NewSyncServerDatabase creates a new sync server database @@ -84,6 +85,9 @@ func NewSyncServerDatabase(dataSourceName string) (*SyncServerDatabase, error) { if err := d.topology.prepare(d.db); err != nil { return nil, err } + if err := d.backwardExtremities.prepare(d.db); err != nil { + return nil, err + } return &d, nil } @@ -132,6 +136,41 @@ func (d *SyncServerDatabase) WriteEvent( return err } + // If the event is already known as a backward extremity, don't consider + // it as such anymore now that we have it. + isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID()) + if err != nil { + return err + } + if isBackwardExtremity { + if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + return err + } + } + + // Check if we have all of the event's previous events. If an event is + // missing, add it to the room's backward extremities. + prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs()) + if err != nil { + return err + } + var found bool + for _, eID := range ev.PrevEventIDs() { + found = false + for _, prevEv := range prevEvents { + if eID == prevEv.EventID() { + found = true + } + } + + // If the event is missing, consider it a backward extremity. + if !found { + if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil { + return err + } + } + } + if len(addStateEvents) == 0 && len(removeStateEventIDs) == 0 { // Nothing to do, the event may have just been a message event. return nil @@ -261,6 +300,14 @@ func (d *SyncServerDatabase) GetEventsInRange( return } +// BackwardExtremitiesForRoom returns the event IDs of all of the backward +// extremities we know of for a given room. +func (d *SyncServerDatabase) BackwardExtremitiesForRoom( + ctx context.Context, roomID string, +) (backwardExtremities []string, err error) { + return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID) +} + // MaxTopologicalPosition returns the highest topological position for a given // room. func (d *SyncServerDatabase) MaxTopologicalPosition(