mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 22:22:46 +00:00
Finish merging syncserver.go (#1033)
* Refactor all postgres tables; start work on sqlite * wip sqlite merges; database is locked errors to investigate and failing tests * Revert "wip sqlite merges; database is locked errors to investigate and failing tests" This reverts commit 26cbfc5b75ae2dc4fb31a838b917aa39d758f162. * convert current room state table * port over sqlite topology table * remove a few functions * remove more functions * Share more code * factor out completesync and a bit more * Remove remaining code
This commit is contained in:
parent
640a0265df
commit
1b34130a5b
14 changed files with 1284 additions and 2140 deletions
|
@ -1,175 +0,0 @@
|
|||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package tables
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/common"
|
||||
)
|
||||
|
||||
// BackwardsExtremitiesStatements contains the SQL statements to implement.
|
||||
// See BackwardsExtremities to see the parameter and response types.
|
||||
type BackwardsExtremitiesStatements interface {
|
||||
Schema() string
|
||||
InsertBackwardExtremity() string
|
||||
SelectBackwardExtremitiesForRoom() string
|
||||
DeleteBackwardExtremity() string
|
||||
}
|
||||
|
||||
type PostgresBackwardsExtremitiesStatements struct{}
|
||||
|
||||
func (s *PostgresBackwardsExtremitiesStatements) Schema() string {
|
||||
return `-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID for the last known event. This is the backwards extremity.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The prev_events for the last known event. This is used to update extremities.
|
||||
prev_event_id TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||
);
|
||||
`
|
||||
}
|
||||
func (s *PostgresBackwardsExtremitiesStatements) InsertBackwardExtremity() string {
|
||||
return "" +
|
||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT DO NOTHING"
|
||||
}
|
||||
func (s *PostgresBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string {
|
||||
return "SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||
}
|
||||
func (s *PostgresBackwardsExtremitiesStatements) DeleteBackwardExtremity() string {
|
||||
return "DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||
}
|
||||
|
||||
type SqliteBackwardsExtremitiesStatements struct{}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) Schema() string {
|
||||
return `-- Stores output room events received from the roomserver.
|
||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||
-- The 'room_id' key for the event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The event ID for the last known event. This is the backwards extremity.
|
||||
event_id TEXT NOT NULL,
|
||||
-- The prev_events for the last known event. This is used to update extremities.
|
||||
prev_event_id TEXT NOT NULL,
|
||||
|
||||
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||
);
|
||||
`
|
||||
}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) InsertBackwardExtremity() string {
|
||||
return "" +
|
||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||
" VALUES ($1, $2, $3)" +
|
||||
" ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
|
||||
}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) SelectBackwardExtremitiesForRoom() string {
|
||||
return "" +
|
||||
"SELECT DISTINCT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||
}
|
||||
|
||||
func (s *SqliteBackwardsExtremitiesStatements) DeleteBackwardExtremity() string {
|
||||
return "" +
|
||||
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||
}
|
||||
|
||||
// BackwardsExtremities keeps track of backwards extremities for a room.
|
||||
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||
// even earlier events.
|
||||
//
|
||||
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||
// A
|
||||
// | Event C has 1 prev_event ID: A.
|
||||
// B C
|
||||
// |___| Event D has 2 prev_event IDs: B and C.
|
||||
// |
|
||||
// D
|
||||
// The earliest known event we have is D, so this table has 2 rows.
|
||||
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||
// a backwards extremity because there are no more rows with event_id=B.
|
||||
type BackwardsExtremities struct {
|
||||
insertBackwardExtremityStmt *sql.Stmt
|
||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||
deleteBackwardExtremityStmt *sql.Stmt
|
||||
}
|
||||
|
||||
// NewBackwardsExtremities prepares the table
|
||||
func NewBackwardsExtremities(db *sql.DB, stmts BackwardsExtremitiesStatements) (table BackwardsExtremities, err error) {
|
||||
_, err = db.Exec(stmts.Schema())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if table.insertBackwardExtremityStmt, err = db.Prepare(stmts.InsertBackwardExtremity()); err != nil {
|
||||
return
|
||||
}
|
||||
if table.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(stmts.SelectBackwardExtremitiesForRoom()); err != nil {
|
||||
return
|
||||
}
|
||||
if table.deleteBackwardExtremityStmt, err = db.Prepare(stmts.DeleteBackwardExtremity()); err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// InsertsBackwardExtremity inserts a new backwards extremity.
|
||||
func (s *BackwardsExtremities) InsertsBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
|
||||
return
|
||||
}
|
||||
|
||||
// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
|
||||
func (s *BackwardsExtremities) SelectBackwardExtremitiesForRoom(
|
||||
ctx context.Context, roomID string,
|
||||
) (eventIDs []string, err error) {
|
||||
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
|
||||
|
||||
for rows.Next() {
|
||||
var eID string
|
||||
if err = rows.Scan(&eID); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
eventIDs = append(eventIDs, eID)
|
||||
}
|
||||
|
||||
return eventIDs, rows.Err()
|
||||
}
|
||||
|
||||
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
|
||||
func (s *BackwardsExtremities) DeleteBackwardExtremity(
|
||||
ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
|
||||
) (err error) {
|
||||
_, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||
return
|
||||
}
|
|
@ -30,3 +30,61 @@ type Events interface {
|
|||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos types.StreamPosition, limit int) ([]types.StreamEvent, error)
|
||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
||||
}
|
||||
|
||||
type Topology interface {
|
||||
// InsertEventInTopology inserts the given event in the room's topology, based
|
||||
// on the event's depth.
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
||||
// SelectEventIDsInRange selects the IDs of events which positions are within a
|
||||
// given range in a given room's topological order.
|
||||
// Returns an empty slice if no events match the given range.
|
||||
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, fromPos, toPos, toMicroPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
|
||||
// SelectPositionInTopology returns the position of a given event in the
|
||||
// topology of the room it belongs to.
|
||||
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (pos, spos types.StreamPosition, err error)
|
||||
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
||||
// SelectEventIDsFromPosition returns the IDs of all events that have a given
|
||||
// position in the topology of a given room.
|
||||
SelectEventIDsFromPosition(ctx context.Context, txn *sql.Tx, roomID string, pos types.StreamPosition) (eventIDs []string, err error)
|
||||
}
|
||||
|
||||
type CurrentRoomState interface {
|
||||
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
||||
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
||||
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
|
||||
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
||||
// SelectCurrentState returns all the current state events for the given room.
|
||||
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]gomatrixserverlib.HeaderedEvent, error)
|
||||
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
||||
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
|
||||
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
|
||||
SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
|
||||
}
|
||||
|
||||
// BackwardsExtremities keeps track of backwards extremities for a room.
|
||||
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||
// even earlier events.
|
||||
//
|
||||
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||
// A
|
||||
// | Event C has 1 prev_event ID: A.
|
||||
// B C
|
||||
// |___| Event D has 2 prev_event IDs: B and C.
|
||||
// |
|
||||
// D
|
||||
// The earliest known event we have is D, so this table has 2 rows.
|
||||
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||
// a backwards extremity because there are no more rows with event_id=B.
|
||||
type BackwardsExtremities interface {
|
||||
// InsertsBackwardExtremity inserts a new backwards extremity.
|
||||
InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error)
|
||||
// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room.
|
||||
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (eventIDs []string, err error)
|
||||
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
|
||||
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue