From 11b557097c6745309c09b58f681080d3fcc4f9f5 Mon Sep 17 00:00:00 2001 From: Till <2353100+S7evinK@users.noreply.github.com> Date: Wed, 24 May 2023 12:14:42 +0200 Subject: [PATCH] Drop `reference_sha` column (#3083) Companion PR to https://github.com/matrix-org/gomatrixserverlib/pull/383 --- .gitignore | 5 +- build/gobind-pinecone/monolith_test.go | 10 ++- clientapi/routing/createroom.go | 2 +- go.mod | 2 +- go.sum | 4 +- internal/eventutil/events.go | 29 +----- roomserver/api/query.go | 2 +- roomserver/internal/input/input_events.go | 4 +- .../internal/input/input_latest_events.go | 8 +- roomserver/internal/input/input_missing.go | 6 +- roomserver/internal/perform/perform_admin.go | 8 +- .../internal/perform/perform_inbound_peek.go | 2 +- .../internal/perform/perform_upgrade.go | 2 +- roomserver/internal/query/query_test.go | 7 +- roomserver/storage/interface.go | 4 +- .../20230516154000_drop_reference_sha.go | 54 ++++++++++++ roomserver/storage/postgres/events_table.go | 64 ++++---------- .../storage/postgres/previous_events_table.go | 35 +++++--- roomserver/storage/shared/room_updater.go | 16 +--- roomserver/storage/shared/storage.go | 16 ++-- .../20230516154000_drop_reference_sha.go | 72 +++++++++++++++ roomserver/storage/sqlite3/events_table.go | 88 +++++++------------ .../storage/sqlite3/previous_events_table.go | 47 +++++++--- roomserver/storage/sqlite3/storage.go | 3 +- .../storage/tables/events_table_test.go | 13 +-- roomserver/storage/tables/interface.go | 7 +- .../tables/previous_events_table_test.go | 10 +-- roomserver/types/types.go | 2 +- test/room.go | 4 +- 29 files changed, 299 insertions(+), 227 deletions(-) create mode 100644 roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go create mode 100644 roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go diff --git a/.gitignore b/.gitignore index dcfbf800..043956ee 100644 --- a/.gitignore +++ b/.gitignore @@ -74,4 +74,7 @@ complement/ docs/_site media_store/ -build \ No newline at end of file +build + +# golang workspaces +go.work* \ No newline at end of file diff --git a/build/gobind-pinecone/monolith_test.go b/build/gobind-pinecone/monolith_test.go index 7a7e36c7..f16d1d76 100644 --- a/build/gobind-pinecone/monolith_test.go +++ b/build/gobind-pinecone/monolith_test.go @@ -22,7 +22,10 @@ import ( ) func TestMonolithStarts(t *testing.T) { - monolith := DendriteMonolith{} + monolith := DendriteMonolith{ + StorageDirectory: t.TempDir(), + CacheDirectory: t.TempDir(), + } monolith.Start() monolith.PublicKey() monolith.Stop() @@ -60,7 +63,10 @@ func TestMonolithSetRelayServers(t *testing.T) { } for _, tc := range testCases { - monolith := DendriteMonolith{} + monolith := DendriteMonolith{ + StorageDirectory: t.TempDir(), + CacheDirectory: t.TempDir(), + } monolith.Start() inputRelays := tc.relays diff --git a/clientapi/routing/createroom.go b/clientapi/routing/createroom.go index bc960006..7a7a85e8 100644 --- a/clientapi/routing/createroom.go +++ b/clientapi/routing/createroom.go @@ -470,7 +470,7 @@ func createRoom( } } if i > 0 { - builder.PrevEvents = []gomatrixserverlib.EventReference{builtEvents[i-1].EventReference()} + builder.PrevEvents = []string{builtEvents[i-1].EventID()} } var ev gomatrixserverlib.PDU if err = builder.AddAuthEvents(&authEvents); err != nil { diff --git a/go.mod b/go.mod index bf2dc5de..16e5adc8 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/matrix-org/dugong v0.0.0-20210921133753-66e6b1c67e2e github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 - github.com/matrix-org/gomatrixserverlib v0.0.0-20230523164045-3fddabebb511 + github.com/matrix-org/gomatrixserverlib v0.0.0-20230524095531-95ba6c68efb6 github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 github.com/mattn/go-sqlite3 v1.14.16 diff --git a/go.sum b/go.sum index 574a7bd7..98e7e839 100644 --- a/go.sum +++ b/go.sum @@ -323,8 +323,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91 h1:s7fexw github.com/matrix-org/go-sqlite3-js v0.0.0-20220419092513-28aa791a1c91/go.mod h1:e+cg2q7C7yE5QnAXgzo512tgFh1RbQLC0+jozuegKgo= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530 h1:kHKxCOLcHH8r4Fzarl4+Y3K5hjothkVW5z7T1dUM11U= github.com/matrix-org/gomatrix v0.0.0-20220926102614-ceba4d9f7530/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230523164045-3fddabebb511 h1:om6z/WEVZMxZfgtiyfp5r5ubAObGMyRrnlVD07gIRY4= -github.com/matrix-org/gomatrixserverlib v0.0.0-20230523164045-3fddabebb511/go.mod h1:H9V9N3Uqn1bBJqYJNGK1noqtgJTaCEhtTdcH/mp50uU= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230524095531-95ba6c68efb6 h1:FQpdh/KGCCQJytz4GAdG6pbx3DJ1HNzdKFc/BCZ0hP0= +github.com/matrix-org/gomatrixserverlib v0.0.0-20230524095531-95ba6c68efb6/go.mod h1:H9V9N3Uqn1bBJqYJNGK1noqtgJTaCEhtTdcH/mp50uU= github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a h1:awrPDf9LEFySxTLKYBMCiObelNx/cBuv/wzllvCCH3A= github.com/matrix-org/pinecone v0.11.1-0.20230210171230-8c3b24f2649a/go.mod h1:HchJX9oKMXaT2xYFs0Ha/6Zs06mxLU8k6F1ODnrGkeQ= github.com/matrix-org/util v0.0.0-20221111132719-399730281e66 h1:6z4KxomXSIGWqhHcfzExgkH3Z3UkIXry4ibJS4Aqz2Y= diff --git a/internal/eventutil/events.go b/internal/eventutil/events.go index 79882d8d..ca052c31 100644 --- a/internal/eventutil/events.go +++ b/internal/eventutil/events.go @@ -129,18 +129,12 @@ func addPrevEventsToEvent( return ErrRoomNoExists{} } - verImpl, err := gomatrixserverlib.GetRoomVersion(queryRes.RoomVersion) - if err != nil { - return fmt.Errorf("GetRoomVersion: %w", err) - } - eventFormat := verImpl.EventFormat() - builder.Depth = queryRes.Depth authEvents := gomatrixserverlib.NewAuthEvents(nil) for i := range queryRes.StateEvents { - err = authEvents.AddEvent(queryRes.StateEvents[i].PDU) + err := authEvents.AddEvent(queryRes.StateEvents[i].PDU) if err != nil { return fmt.Errorf("authEvents.AddEvent: %w", err) } @@ -151,22 +145,7 @@ func addPrevEventsToEvent( return fmt.Errorf("eventsNeeded.AuthEventReferences: %w", err) } - truncAuth, truncPrev := truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) - switch eventFormat { - case gomatrixserverlib.EventFormatV1: - builder.AuthEvents = truncAuth - builder.PrevEvents = truncPrev - case gomatrixserverlib.EventFormatV2: - v2AuthRefs, v2PrevRefs := []string{}, []string{} - for _, ref := range truncAuth { - v2AuthRefs = append(v2AuthRefs, ref.EventID) - } - for _, ref := range truncPrev { - v2PrevRefs = append(v2PrevRefs, ref.EventID) - } - builder.AuthEvents = v2AuthRefs - builder.PrevEvents = v2PrevRefs - } + builder.AuthEvents, builder.PrevEvents = truncateAuthAndPrevEvents(refs, queryRes.LatestEvents) return nil } @@ -176,8 +155,8 @@ func addPrevEventsToEvent( // NOTSPEC: The limits here feel a bit arbitrary but they are currently // here because of https://github.com/matrix-org/matrix-doc/issues/2307 // and because Synapse will just drop events that don't comply. -func truncateAuthAndPrevEvents(auth, prev []gomatrixserverlib.EventReference) ( - truncAuth, truncPrev []gomatrixserverlib.EventReference, +func truncateAuthAndPrevEvents(auth, prev []string) ( + truncAuth, truncPrev []string, ) { truncAuth, truncPrev = auth, prev if len(truncAuth) > 10 { diff --git a/roomserver/api/query.go b/roomserver/api/query.go index 55d1a6db..1726bfe1 100644 --- a/roomserver/api/query.go +++ b/roomserver/api/query.go @@ -49,7 +49,7 @@ type QueryLatestEventsAndStateResponse struct { RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` // The latest events in the room. // These are used to set the prev_events when sending an event. - LatestEvents []gomatrixserverlib.EventReference `json:"latest_events"` + LatestEvents []string `json:"latest_events"` // The state events requested. // This list will be in an arbitrary order. // These are used to set the auth_events when sending an event. diff --git a/roomserver/internal/input/input_events.go b/roomserver/internal/input/input_events.go index cd78b372..02a1a280 100644 --- a/roomserver/internal/input/input_events.go +++ b/roomserver/internal/input/input_events.go @@ -883,9 +883,7 @@ func (r *Inputer) kickGuests(ctx context.Context, event gomatrixserverlib.PDU, r Origin: senderDomain, SendAsServer: string(senderDomain), }) - prevEvents = []gomatrixserverlib.EventReference{ - event.EventReference(), - } + prevEvents = []string{event.EventID()} } inputReq := &api.InputRoomEventsRequest{ diff --git a/roomserver/internal/input/input_latest_events.go b/roomserver/internal/input/input_latest_events.go index 54a5f623..7a7a021a 100644 --- a/roomserver/internal/input/input_latest_events.go +++ b/roomserver/internal/input/input_latest_events.go @@ -154,8 +154,8 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error { extremitiesChanged, err := u.calculateLatest( u.oldLatest, u.event, types.StateAtEventAndReference{ - EventReference: u.event.EventReference(), - StateAtEvent: u.stateAtEvent, + EventID: u.event.EventID(), + StateAtEvent: u.stateAtEvent, }, ) if err != nil { @@ -349,7 +349,7 @@ func (u *latestEventsUpdater) calculateLatest( // If the "new" event is already referenced by an existing event // then do nothing - it's not a candidate to be a new extremity if // it has been referenced. - if referenced, err := u.updater.IsReferenced(newEvent.EventReference()); err != nil { + if referenced, err := u.updater.IsReferenced(newEvent.EventID()); err != nil { return false, fmt.Errorf("u.updater.IsReferenced(new): %w", err) } else if referenced { u.latest = oldLatest @@ -360,7 +360,7 @@ func (u *latestEventsUpdater) calculateLatest( // have entries in the previous events table. If they do then we // will no longer include them as forward extremities. for k, l := range existingRefs { - referenced, err := u.updater.IsReferenced(l.EventReference) + referenced, err := u.updater.IsReferenced(l.EventID) if err != nil { return false, fmt.Errorf("u.updater.IsReferenced: %w", err) } else if referenced { diff --git a/roomserver/internal/input/input_missing.go b/roomserver/internal/input/input_missing.go index 8a123522..10486138 100644 --- a/roomserver/internal/input/input_missing.go +++ b/roomserver/internal/input/input_missing.go @@ -520,9 +520,9 @@ func (t *missingStateReq) getMissingEvents(ctx context.Context, e gomatrixserver return nil, false, false, fmt.Errorf("t.DB.LatestEventIDs: %w", err) } latestEvents := make([]string, len(latest)) - for i, ev := range latest { - latestEvents[i] = ev.EventID - t.hadEvent(ev.EventID) + for i := range latest { + latestEvents[i] = latest[i] + t.hadEvent(latest[i]) } var missingResp *fclient.RespMissingEvents diff --git a/roomserver/internal/perform/perform_admin.go b/roomserver/internal/perform/perform_admin.go index fadc8bcf..17296feb 100644 --- a/roomserver/internal/perform/perform_admin.go +++ b/roomserver/internal/perform/perform_admin.go @@ -131,9 +131,7 @@ func (r *Admin) PerformAdminEvacuateRoom( SendAsServer: string(senderDomain), }) affected = append(affected, stateKey) - prevEvents = []gomatrixserverlib.EventReference{ - event.EventReference(), - } + prevEvents = []string{event.EventID()} } inputReq := &api.InputRoomEventsRequest{ @@ -253,9 +251,9 @@ func (r *Admin) PerformAdminDownloadState( for _, fwdExtremity := range fwdExtremities { var state gomatrixserverlib.StateResponse - state, err = r.Inputer.FSAPI.LookupState(ctx, r.Inputer.ServerName, serverName, roomID, fwdExtremity.EventID, roomInfo.RoomVersion) + state, err = r.Inputer.FSAPI.LookupState(ctx, r.Inputer.ServerName, serverName, roomID, fwdExtremity, roomInfo.RoomVersion) if err != nil { - return fmt.Errorf("r.Inputer.FSAPI.LookupState (%q): %s", fwdExtremity.EventID, err) + return fmt.Errorf("r.Inputer.FSAPI.LookupState (%q): %s", fwdExtremity, err) } for _, authEvent := range state.GetAuthEvents().UntrustedEvents(roomInfo.RoomVersion) { if err = gomatrixserverlib.VerifyEventSignatures(ctx, authEvent, r.Inputer.KeyRing); err != nil { diff --git a/roomserver/internal/perform/perform_inbound_peek.go b/roomserver/internal/perform/perform_inbound_peek.go index 3094a17f..3ac0f6f4 100644 --- a/roomserver/internal/perform/perform_inbound_peek.go +++ b/roomserver/internal/perform/perform_inbound_peek.go @@ -64,7 +64,7 @@ func (r *InboundPeeker) PerformInboundPeek( if err != nil { return err } - latestEvents, err := r.DB.EventsFromIDs(ctx, info, []string{latestEventRefs[0].EventID}) + latestEvents, err := r.DB.EventsFromIDs(ctx, info, []string{latestEventRefs[0]}) if err != nil { return err } diff --git a/roomserver/internal/perform/perform_upgrade.go b/roomserver/internal/perform/perform_upgrade.go index abe63145..60085cb6 100644 --- a/roomserver/internal/perform/perform_upgrade.go +++ b/roomserver/internal/perform/perform_upgrade.go @@ -471,7 +471,7 @@ func (r *Upgrader) sendInitialEvents(ctx context.Context, evTime time.Time, user return fmt.Errorf("failed to set content of new %q event: %w", proto.Type, err) } if i > 0 { - proto.PrevEvents = []gomatrixserverlib.EventReference{builtEvents[i-1].EventReference()} + proto.PrevEvents = []string{builtEvents[i-1].EventID()} } var verImpl gomatrixserverlib.IRoomVersion diff --git a/roomserver/internal/query/query_test.go b/roomserver/internal/query/query_test.go index b6715cb0..619d9303 100644 --- a/roomserver/internal/query/query_test.go +++ b/roomserver/internal/query/query_test.go @@ -43,13 +43,10 @@ func createEventDB() *getEventDB { // Adds a fake event to the storage with given auth events. func (db *getEventDB) addFakeEvent(eventID string, authIDs []string) error { - authEvents := []gomatrixserverlib.EventReference{} + authEvents := make([]any, 0, len(authIDs)) for _, authID := range authIDs { - authEvents = append(authEvents, gomatrixserverlib.EventReference{ - EventID: authID, - }) + authEvents = append(authEvents, []any{authID, struct{}{}}) } - builder := map[string]interface{}{ "event_id": eventID, "auth_events": authEvents, diff --git a/roomserver/storage/interface.go b/roomserver/storage/interface.go index 6bc4ce9a..7d22df00 100644 --- a/roomserver/storage/interface.go +++ b/roomserver/storage/interface.go @@ -102,7 +102,7 @@ type Database interface { // Look up event references for the latest events in the room and the current state snapshot. // Returns the latest events, the current state and the maximum depth of the latest events plus 1. // Returns an error if there was a problem talking to the database. - LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) + LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]string, types.StateSnapshotNID, int64, error) // Look up the active invites targeting a user in a room and return the // numeric state key IDs for the user IDs who sent them along with the event IDs for the invites. // Returns an error if there was a problem talking to the database. @@ -206,7 +206,7 @@ type RoomDatabase interface { BulkSelectSnapshotsFromEventIDs(ctx context.Context, eventIDs []string) (map[types.StateSnapshotNID][]string, error) StateEntriesForTuples(ctx context.Context, stateBlockNIDs []types.StateBlockNID, stateKeyTuples []types.StateKeyTuple) ([]types.StateEntryList, error) AddState(ctx context.Context, roomNID types.RoomNID, stateBlockNIDs []types.StateBlockNID, state []types.StateEntry) (types.StateSnapshotNID, error) - LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]gomatrixserverlib.EventReference, types.StateSnapshotNID, int64, error) + LatestEventIDs(ctx context.Context, roomNID types.RoomNID) ([]string, types.StateSnapshotNID, int64, error) GetOrCreateRoomInfo(ctx context.Context, event gomatrixserverlib.PDU) (*types.RoomInfo, error) GetOrCreateEventTypeNID(ctx context.Context, eventType string) (eventTypeNID types.EventTypeNID, err error) GetOrCreateEventStateKeyNID(ctx context.Context, eventStateKey *string) (types.EventStateKeyNID, error) diff --git a/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go new file mode 100644 index 00000000..c1957771 --- /dev/null +++ b/roomserver/storage/postgres/deltas/20230516154000_drop_reference_sha.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" +) + +func UpDropEventReferenceSHAEvents(ctx context.Context, tx *sql.Tx) error { + var count int + err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`). + Scan(&count) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to query duplicate event ids") + } + if count > 0 { + return fmt.Errorf("unable to drop column, as there are duplicate event ids") + } + _, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN IF EXISTS reference_sha256;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error { + _, err := tx.ExecContext(ctx, "ALTER TABLE roomserver_previous_events DROP CONSTRAINT roomserver_previous_event_id_unique;") + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + _, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events DROP COLUMN IF EXISTS previous_reference_sha256;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + _, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events ADD CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id);`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} diff --git a/roomserver/storage/postgres/events_table.go b/roomserver/storage/postgres/events_table.go index c935608a..a00b4b1d 100644 --- a/roomserver/storage/postgres/events_table.go +++ b/roomserver/storage/postgres/events_table.go @@ -22,10 +22,9 @@ import ( "sort" "github.com/lib/pq" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -62,9 +61,6 @@ CREATE TABLE IF NOT EXISTS roomserver_events ( -- Needed for state resolution. -- An event may only appear in this table once. event_id TEXT NOT NULL CONSTRAINT roomserver_event_id_unique UNIQUE, - -- The sha256 reference hash for the event. - -- Needed for setting reference hashes when sending new events. - reference_sha256 BYTEA NOT NULL, -- A list of numeric IDs for events that can authenticate this event. auth_event_nids BIGINT[] NOT NULL, is_rejected BOOLEAN NOT NULL DEFAULT FALSE @@ -75,10 +71,10 @@ CREATE INDEX IF NOT EXISTS roomserver_events_memberships_idx ON roomserver_event ` const insertEventSQL = "" + - "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)" + - " VALUES ($1, $2, $3, $4, $5, $6, $7, $8)" + + "INSERT INTO roomserver_events AS e (room_nid, event_type_nid, event_state_key_nid, event_id, auth_event_nids, depth, is_rejected)" + + " VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT ON CONSTRAINT roomserver_event_id_unique DO UPDATE" + - " SET is_rejected = $8 WHERE e.event_id = $4 AND e.is_rejected = TRUE" + + " SET is_rejected = $7 WHERE e.event_id = $4 AND e.is_rejected = TRUE" + " RETURNING event_nid, state_snapshot_nid" const selectEventSQL = "" + @@ -130,12 +126,9 @@ const selectEventIDSQL = "" + "SELECT event_id FROM roomserver_events WHERE event_nid = $1" const bulkSelectStateAtEventAndReferenceSQL = "" + - "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" + + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id" + " FROM roomserver_events WHERE event_nid = ANY($1)" -const bulkSelectEventReferenceSQL = "" + - "SELECT event_id, reference_sha256 FROM roomserver_events WHERE event_nid = ANY($1)" - const bulkSelectEventIDSQL = "" + "SELECT event_nid, event_id FROM roomserver_events WHERE event_nid = ANY($1)" @@ -167,7 +160,6 @@ type eventStatements struct { updateEventSentToOutputStmt *sql.Stmt selectEventIDStmt *sql.Stmt bulkSelectStateAtEventAndReferenceStmt *sql.Stmt - bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt bulkSelectEventNIDStmt *sql.Stmt bulkSelectUnsentEventNIDStmt *sql.Stmt @@ -178,7 +170,18 @@ type eventStatements struct { func CreateEventsTable(db *sql.DB) error { _, err := db.Exec(eventsSchema) - return err + if err != nil { + return err + } + + m := sqlutil.NewMigrator(db) + m.AddMigrations([]sqlutil.Migration{ + { + Version: "roomserver: drop column reference_sha from roomserver_events", + Up: deltas.UpDropEventReferenceSHAEvents, + }, + }...) + return m.Up(context.Background()) } func PrepareEventsTable(db *sql.DB) (tables.Events, error) { @@ -197,7 +200,6 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.selectEventSentToOutputStmt, selectEventSentToOutputSQL}, {&s.selectEventIDStmt, selectEventIDSQL}, {&s.bulkSelectStateAtEventAndReferenceStmt, bulkSelectStateAtEventAndReferenceSQL}, - {&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, {&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, {&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, @@ -214,7 +216,6 @@ func (s *eventStatements) InsertEvent( eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, eventID string, - referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool, @@ -224,7 +225,7 @@ func (s *eventStatements) InsertEvent( stmt := sqlutil.TxStmt(txn, s.insertEventStmt) err := stmt.QueryRowContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), - eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, + eventID, eventNIDsAsArray(authEventNIDs), depth, isRejected, ).Scan(&eventNID, &stateNID) return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err @@ -441,11 +442,10 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference( eventNID int64 stateSnapshotNID int64 eventID string - eventSHA256 []byte ) for ; rows.Next(); i++ { if err = rows.Scan( - &eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256, + &eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, ); err != nil { return nil, err } @@ -455,32 +455,6 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference( result.EventNID = types.EventNID(eventNID) result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID) result.EventID = eventID - result.EventSHA256 = eventSHA256 - } - if err = rows.Err(); err != nil { - return nil, err - } - if i != len(eventNIDs) { - return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs)) - } - return results, nil -} - -func (s *eventStatements) BulkSelectEventReference( - ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, -) ([]gomatrixserverlib.EventReference, error) { - rows, err := s.bulkSelectEventReferenceStmt.QueryContext(ctx, eventNIDsAsArray(eventNIDs)) - if err != nil { - return nil, err - } - defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventReference: rows.close() failed") - results := make([]gomatrixserverlib.EventReference, len(eventNIDs)) - i := 0 - for ; rows.Next(); i++ { - result := &results[i] - if err = rows.Scan(&result.EventID, &result.EventSHA256); err != nil { - return nil, err - } } if err = rows.Err(); err != nil { return nil, err diff --git a/roomserver/storage/postgres/previous_events_table.go b/roomserver/storage/postgres/previous_events_table.go index 26999a29..ceb5e26b 100644 --- a/roomserver/storage/postgres/previous_events_table.go +++ b/roomserver/storage/postgres/previous_events_table.go @@ -20,6 +20,7 @@ import ( "database/sql" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/postgres/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -32,11 +33,9 @@ const previousEventSchema = ` CREATE TABLE IF NOT EXISTS roomserver_previous_events ( -- The string event ID taken from the prev_events key of an event. previous_event_id TEXT NOT NULL, - -- The SHA256 reference hash taken from the prev_events key of an event. - previous_reference_sha256 BYTEA NOT NULL, -- A list of numeric event IDs of events that reference this prev_event. event_nids BIGINT[] NOT NULL, - CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id, previous_reference_sha256) + CONSTRAINT roomserver_previous_event_id_unique UNIQUE (previous_event_id) ); ` @@ -47,17 +46,17 @@ CREATE TABLE IF NOT EXISTS roomserver_previous_events ( // The lock is necessary to avoid data races when checking whether an event is already referenced by another event. const insertPreviousEventSQL = "" + "INSERT INTO roomserver_previous_events" + - " (previous_event_id, previous_reference_sha256, event_nids)" + - " VALUES ($1, $2, array_append('{}'::bigint[], $3))" + + " (previous_event_id, event_nids)" + + " VALUES ($1, array_append('{}'::bigint[], $2))" + " ON CONFLICT ON CONSTRAINT roomserver_previous_event_id_unique" + - " DO UPDATE SET event_nids = array_append(roomserver_previous_events.event_nids, $3)" + - " WHERE $3 != ALL(roomserver_previous_events.event_nids)" + " DO UPDATE SET event_nids = array_append(roomserver_previous_events.event_nids, $2)" + + " WHERE $2 != ALL(roomserver_previous_events.event_nids)" // Check if the event is referenced by another event in the table. // This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room. const selectPreviousEventExistsSQL = "" + "SELECT 1 FROM roomserver_previous_events" + - " WHERE previous_event_id = $1 AND previous_reference_sha256 = $2" + " WHERE previous_event_id = $1" type previousEventStatements struct { insertPreviousEventStmt *sql.Stmt @@ -66,7 +65,18 @@ type previousEventStatements struct { func CreatePrevEventsTable(db *sql.DB) error { _, err := db.Exec(previousEventSchema) - return err + if err != nil { + return err + } + + m := sqlutil.NewMigrator(db) + m.AddMigrations([]sqlutil.Migration{ + { + Version: "roomserver: drop column reference_sha from roomserver_prev_events", + Up: deltas.UpDropEventReferenceSHAPrevEvents, + }, + }...) + return m.Up(context.Background()) } func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) { @@ -82,12 +92,11 @@ func (s *previousEventStatements) InsertPreviousEvent( ctx context.Context, txn *sql.Tx, previousEventID string, - previousEventReferenceSHA256 []byte, eventNID types.EventNID, ) error { stmt := sqlutil.TxStmt(txn, s.insertPreviousEventStmt) _, err := stmt.ExecContext( - ctx, previousEventID, previousEventReferenceSHA256, int64(eventNID), + ctx, previousEventID, int64(eventNID), ) return err } @@ -95,9 +104,9 @@ func (s *previousEventStatements) InsertPreviousEvent( // Check if the event reference exists // Returns sql.ErrNoRows if the event reference doesn't exist. func (s *previousEventStatements) SelectPreviousEventExists( - ctx context.Context, txn *sql.Tx, eventID string, eventReferenceSHA256 []byte, + ctx context.Context, txn *sql.Tx, eventID string, ) error { var ok int64 stmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt) - return stmt.QueryRowContext(ctx, eventID, eventReferenceSHA256).Scan(&ok) + return stmt.QueryRowContext(ctx, eventID).Scan(&ok) } diff --git a/roomserver/storage/shared/room_updater.go b/roomserver/storage/shared/room_updater.go index 5a20c67b..70672a33 100644 --- a/roomserver/storage/shared/room_updater.go +++ b/roomserver/storage/shared/room_updater.go @@ -104,18 +104,6 @@ func (u *RoomUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID { return u.currentStateSnapshotNID } -// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer -func (u *RoomUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error { - return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error { - for _, ref := range previousEventReferences { - if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { - return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) - } - } - return nil - }) -} - func (u *RoomUpdater) Events(ctx context.Context, _ gomatrixserverlib.RoomVersion, eventNIDs []types.EventNID) ([]types.Event, error) { if u.roomInfo == nil { return nil, types.ErrorInvalidRoomInfo @@ -203,8 +191,8 @@ func (u *RoomUpdater) EventsFromIDs(ctx context.Context, roomInfo *types.RoomInf } // IsReferenced implements types.RoomRecentEventsUpdater -func (u *RoomUpdater) IsReferenced(eventReference gomatrixserverlib.EventReference) (bool, error) { - err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventReference.EventID, eventReference.EventSHA256) +func (u *RoomUpdater) IsReferenced(eventID string) (bool, error) { + err := u.d.PrevEventsTable.SelectPreviousEventExists(u.ctx, u.txn, eventID) if err == nil { return true, nil } diff --git a/roomserver/storage/shared/storage.go b/roomserver/storage/shared/storage.go index 60e46c47..cefa58a3 100644 --- a/roomserver/storage/shared/storage.go +++ b/roomserver/storage/shared/storage.go @@ -398,15 +398,13 @@ func (d *EventDatabase) eventsFromIDs(ctx context.Context, txn *sql.Tx, roomInfo return d.events(ctx, txn, roomInfo.RoomVersion, nids) } -func (d *Database) LatestEventIDs( - ctx context.Context, roomNID types.RoomNID, -) (references []gomatrixserverlib.EventReference, currentStateSnapshotNID types.StateSnapshotNID, depth int64, err error) { +func (d *Database) LatestEventIDs(ctx context.Context, roomNID types.RoomNID) (references []string, currentStateSnapshotNID types.StateSnapshotNID, depth int64, err error) { var eventNIDs []types.EventNID eventNIDs, currentStateSnapshotNID, err = d.RoomsTable.SelectLatestEventNIDs(ctx, nil, roomNID) if err != nil { return } - references, err = d.EventsTable.BulkSelectEventReference(ctx, nil, eventNIDs) + eventNIDMap, err := d.EventsTable.BulkSelectEventID(ctx, nil, eventNIDs) if err != nil { return } @@ -414,6 +412,9 @@ func (d *Database) LatestEventIDs( if err != nil { return } + for _, eventID := range eventNIDMap { + references = append(references, eventID) + } return } @@ -742,7 +743,6 @@ func (d *EventDatabase) StoreEvent( eventTypeNID, eventStateKeyNID, event.EventID(), - event.EventReference().EventSHA256, authEventNIDs, event.Depth(), isRejected, @@ -762,7 +762,7 @@ func (d *EventDatabase) StoreEvent( return fmt.Errorf("d.EventJSONTable.InsertEventJSON: %w", err) } - if prevEvents := event.PrevEvents(); len(prevEvents) > 0 { + if prevEvents := event.PrevEventIDs(); len(prevEvents) > 0 { // Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of // GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This // function only does SELECTs though so the created txn (at this point) is just a read txn like @@ -770,8 +770,8 @@ func (d *EventDatabase) StoreEvent( // to do writes however then this will need to go inside `Writer.Do`. // The following is a copy of RoomUpdater.StorePreviousEvents - for _, ref := range prevEvents { - if err = d.PrevEventsTable.InsertPreviousEvent(ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil { + for _, eventID := range prevEvents { + if err = d.PrevEventsTable.InsertPreviousEvent(ctx, txn, eventID, eventNID); err != nil { return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err) } } diff --git a/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go new file mode 100644 index 00000000..452d72ac --- /dev/null +++ b/roomserver/storage/sqlite3/deltas/20230516154000_drop_reference_sha.go @@ -0,0 +1,72 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "context" + "database/sql" + "fmt" +) + +func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error { + var count int + err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`). + Scan(&count) + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to query duplicate event ids") + } + if count > 0 { + return fmt.Errorf("unable to drop column, as there are duplicate event ids") + } + _, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN reference_sha256;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error { + // rename the table + if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events RENAME TO _roomserver_previous_events;`); err != nil { + return fmt.Errorf("tx.ExecContext: %w", err) + } + + // create new table + if _, err := tx.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS roomserver_previous_events ( + previous_event_id TEXT NOT NULL, + event_nids TEXT NOT NULL, + UNIQUE (previous_event_id) + );`); err != nil { + return fmt.Errorf("tx.ExecContext: %w", err) + } + + // move data + if _, err := tx.ExecContext(ctx, ` +INSERT + INTO roomserver_previous_events ( + previous_event_id, event_nids + ) SELECT + previous_event_id, event_nids + FROM _roomserver_previous_events +;`); err != nil { + return fmt.Errorf("tx.ExecContext: %w", err) + } + // drop old table + _, err := tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} diff --git a/roomserver/storage/sqlite3/events_table.go b/roomserver/storage/sqlite3/events_table.go index aacf4bc9..c49c6dc3 100644 --- a/roomserver/storage/sqlite3/events_table.go +++ b/roomserver/storage/sqlite3/events_table.go @@ -19,14 +19,14 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "sort" "strings" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -41,17 +41,16 @@ const eventsSchema = ` state_snapshot_nid INTEGER NOT NULL DEFAULT 0, depth INTEGER NOT NULL, event_id TEXT NOT NULL UNIQUE, - reference_sha256 BLOB NOT NULL, auth_event_nids TEXT NOT NULL DEFAULT '[]', is_rejected BOOLEAN NOT NULL DEFAULT FALSE ); ` const insertEventSQL = ` - INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, auth_event_nids, depth, is_rejected) + VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT DO UPDATE - SET is_rejected = $8 WHERE is_rejected = 1 + SET is_rejected = $7 WHERE is_rejected = 1 RETURNING event_nid, state_snapshot_nid; ` @@ -100,12 +99,9 @@ const selectEventIDSQL = "" + "SELECT event_id FROM roomserver_events WHERE event_nid = $1" const bulkSelectStateAtEventAndReferenceSQL = "" + - "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" + + "SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id" + " FROM roomserver_events WHERE event_nid IN ($1)" -const bulkSelectEventReferenceSQL = "" + - "SELECT event_id, reference_sha256 FROM roomserver_events WHERE event_nid IN ($1)" - const bulkSelectEventIDSQL = "" + "SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)" @@ -137,7 +133,6 @@ type eventStatements struct { updateEventSentToOutputStmt *sql.Stmt selectEventIDStmt *sql.Stmt bulkSelectStateAtEventAndReferenceStmt *sql.Stmt - bulkSelectEventReferenceStmt *sql.Stmt bulkSelectEventIDStmt *sql.Stmt selectEventRejectedStmt *sql.Stmt //bulkSelectEventNIDStmt *sql.Stmt @@ -147,7 +142,32 @@ type eventStatements struct { func CreateEventsTable(db *sql.DB) error { _, err := db.Exec(eventsSchema) - return err + if err != nil { + return err + } + + // check if the column exists + var cName string + migrationName := "roomserver: drop column reference_sha from roomserver_events" + err = db.QueryRowContext(context.Background(), `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_events' AND p.name = 'reference_sha256'`).Scan(&cName) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed + if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil { + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) + } + return nil + } + return err + } + + m := sqlutil.NewMigrator(db) + m.AddMigrations([]sqlutil.Migration{ + { + Version: migrationName, + Up: deltas.UpDropEventReferenceSHA, + }, + }...) + return m.Up(context.Background()) } func PrepareEventsTable(db *sql.DB) (tables.Events, error) { @@ -167,7 +187,6 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) { {&s.selectEventSentToOutputStmt, selectEventSentToOutputSQL}, {&s.selectEventIDStmt, selectEventIDSQL}, {&s.bulkSelectStateAtEventAndReferenceStmt, bulkSelectStateAtEventAndReferenceSQL}, - {&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL}, {&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL}, //{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL}, //{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL}, @@ -183,7 +202,6 @@ func (s *eventStatements) InsertEvent( eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, eventID string, - referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool, @@ -194,7 +212,7 @@ func (s *eventStatements) InsertEvent( insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt) err := insertStmt.QueryRowContext( ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID), - eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected, + eventID, eventNIDsAsArray(authEventNIDs), depth, isRejected, ).Scan(&eventNID, &stateNID) return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err } @@ -475,11 +493,10 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference( eventNID int64 stateSnapshotNID int64 eventID string - eventSHA256 []byte ) for ; rows.Next(); i++ { if err = rows.Scan( - &eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256, + &eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, ); err != nil { return nil, err } @@ -489,43 +506,6 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference( result.EventNID = types.EventNID(eventNID) result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID) result.EventID = eventID - result.EventSHA256 = eventSHA256 - } - if i != len(eventNIDs) { - return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs)) - } - return results, nil -} - -func (s *eventStatements) BulkSelectEventReference( - ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID, -) ([]gomatrixserverlib.EventReference, error) { - /////////////// - iEventNIDs := make([]interface{}, len(eventNIDs)) - for k, v := range eventNIDs { - iEventNIDs[k] = v - } - selectOrig := strings.Replace(bulkSelectEventReferenceSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1) - selectPrep, err := s.db.Prepare(selectOrig) - if err != nil { - return nil, err - } - defer selectPrep.Close() // nolint:errcheck - /////////////// - - selectStmt := sqlutil.TxStmt(txn, selectPrep) - rows, err := selectStmt.QueryContext(ctx, iEventNIDs...) - if err != nil { - return nil, err - } - defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventReference: rows.close() failed") - results := make([]gomatrixserverlib.EventReference, len(eventNIDs)) - i := 0 - for ; rows.Next(); i++ { - result := &results[i] - if err = rows.Scan(&result.EventID, &result.EventSHA256); err != nil { - return nil, err - } } if i != len(eventNIDs) { return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs)) diff --git a/roomserver/storage/sqlite3/previous_events_table.go b/roomserver/storage/sqlite3/previous_events_table.go index 2a146ef6..4e59fbba 100644 --- a/roomserver/storage/sqlite3/previous_events_table.go +++ b/roomserver/storage/sqlite3/previous_events_table.go @@ -18,10 +18,12 @@ package sqlite3 import ( "context" "database/sql" + "errors" "fmt" "strings" "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/storage/tables" "github.com/matrix-org/dendrite/roomserver/types" ) @@ -34,9 +36,8 @@ import ( const previousEventSchema = ` CREATE TABLE IF NOT EXISTS roomserver_previous_events ( previous_event_id TEXT NOT NULL, - previous_reference_sha256 BLOB, event_nids TEXT NOT NULL, - UNIQUE (previous_event_id, previous_reference_sha256) + UNIQUE (previous_event_id) ); ` @@ -47,20 +48,20 @@ const previousEventSchema = ` // The lock is necessary to avoid data races when checking whether an event is already referenced by another event. const insertPreviousEventSQL = ` INSERT OR REPLACE INTO roomserver_previous_events - (previous_event_id, previous_reference_sha256, event_nids) - VALUES ($1, $2, $3) + (previous_event_id, event_nids) + VALUES ($1, $2) ` const selectPreviousEventNIDsSQL = ` SELECT event_nids FROM roomserver_previous_events - WHERE previous_event_id = $1 AND previous_reference_sha256 = $2 + WHERE previous_event_id = $1 ` // Check if the event is referenced by another event in the table. // This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room. const selectPreviousEventExistsSQL = ` SELECT 1 FROM roomserver_previous_events - WHERE previous_event_id = $1 AND previous_reference_sha256 = $2 + WHERE previous_event_id = $1 ` type previousEventStatements struct { @@ -72,7 +73,30 @@ type previousEventStatements struct { func CreatePrevEventsTable(db *sql.DB) error { _, err := db.Exec(previousEventSchema) - return err + if err != nil { + return err + } + // check if the column exists + var cName string + migrationName := "roomserver: drop column reference_sha from roomserver_prev_events" + err = db.QueryRowContext(context.Background(), `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_previous_events' AND p.name = 'previous_reference_sha256'`).Scan(&cName) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed + if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil { + return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err) + } + return nil + } + return err + } + m := sqlutil.NewMigrator(db) + m.AddMigrations([]sqlutil.Migration{ + { + Version: migrationName, + Up: deltas.UpDropEventReferenceSHAPrevEvents, + }, + }...) + return m.Up(context.Background()) } func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) { @@ -91,13 +115,12 @@ func (s *previousEventStatements) InsertPreviousEvent( ctx context.Context, txn *sql.Tx, previousEventID string, - previousEventReferenceSHA256 []byte, eventNID types.EventNID, ) error { var eventNIDs string eventNIDAsString := fmt.Sprintf("%d", eventNID) selectStmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt) - err := selectStmt.QueryRowContext(ctx, previousEventID, previousEventReferenceSHA256).Scan(&eventNIDs) + err := selectStmt.QueryRowContext(ctx, previousEventID).Scan(&eventNIDs) if err != nil && err != sql.ErrNoRows { return fmt.Errorf("selectStmt.QueryRowContext.Scan: %w", err) } @@ -115,7 +138,7 @@ func (s *previousEventStatements) InsertPreviousEvent( } insertStmt := sqlutil.TxStmt(txn, s.insertPreviousEventStmt) _, err = insertStmt.ExecContext( - ctx, previousEventID, previousEventReferenceSHA256, eventNIDs, + ctx, previousEventID, eventNIDs, ) return err } @@ -123,9 +146,9 @@ func (s *previousEventStatements) InsertPreviousEvent( // Check if the event reference exists // Returns sql.ErrNoRows if the event reference doesn't exist. func (s *previousEventStatements) SelectPreviousEventExists( - ctx context.Context, txn *sql.Tx, eventID string, eventReferenceSHA256 []byte, + ctx context.Context, txn *sql.Tx, eventID string, ) error { var ok int64 stmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt) - return stmt.QueryRowContext(ctx, eventID, eventReferenceSHA256).Scan(&ok) + return stmt.QueryRowContext(ctx, eventID).Scan(&ok) } diff --git a/roomserver/storage/sqlite3/storage.go b/roomserver/storage/sqlite3/storage.go index 89e16fc1..6ab427a8 100644 --- a/roomserver/storage/sqlite3/storage.go +++ b/roomserver/storage/sqlite3/storage.go @@ -21,14 +21,13 @@ import ( "errors" "fmt" - "github.com/matrix-org/gomatrixserverlib" - "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/roomserver/storage/shared" "github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" + "github.com/matrix-org/gomatrixserverlib" ) // A Database is used to store room events and stream offsets. diff --git a/roomserver/storage/tables/events_table_test.go b/roomserver/storage/tables/events_table_test.go index 107af478..5ed80564 100644 --- a/roomserver/storage/tables/events_table_test.go +++ b/roomserver/storage/tables/events_table_test.go @@ -11,7 +11,6 @@ import ( "github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/setup/config" "github.com/matrix-org/dendrite/test" - "github.com/matrix-org/gomatrixserverlib" "github.com/stretchr/testify/assert" ) @@ -48,10 +47,9 @@ func Test_EventsTable(t *testing.T) { // create some dummy data eventIDs := make([]string, 0, len(room.Events())) wantStateAtEvent := make([]types.StateAtEvent, 0, len(room.Events())) - wantEventReferences := make([]gomatrixserverlib.EventReference, 0, len(room.Events())) wantStateAtEventAndRefs := make([]types.StateAtEventAndReference, 0, len(room.Events())) for _, ev := range room.Events() { - eventNID, snapNID, err := tab.InsertEvent(ctx, nil, 1, 1, 1, ev.EventID(), ev.EventReference().EventSHA256, nil, ev.Depth(), false) + eventNID, snapNID, err := tab.InsertEvent(ctx, nil, 1, 1, 1, ev.EventID(), nil, ev.Depth(), false) assert.NoError(t, err) gotEventNID, gotSnapNID, err := tab.SelectEvent(ctx, nil, ev.EventID()) assert.NoError(t, err) @@ -75,7 +73,6 @@ func Test_EventsTable(t *testing.T) { assert.True(t, sentToOutput) eventIDs = append(eventIDs, ev.EventID()) - wantEventReferences = append(wantEventReferences, ev.EventReference()) // Set the stateSnapshot to 2 for some events to verify they are returned later stateSnapshot := 0 @@ -97,8 +94,8 @@ func Test_EventsTable(t *testing.T) { } wantStateAtEvent = append(wantStateAtEvent, stateAtEvent) wantStateAtEventAndRefs = append(wantStateAtEventAndRefs, types.StateAtEventAndReference{ - StateAtEvent: stateAtEvent, - EventReference: ev.EventReference(), + StateAtEvent: stateAtEvent, + EventID: ev.EventID(), }) } @@ -140,10 +137,6 @@ func Test_EventsTable(t *testing.T) { assert.True(t, ok) } - references, err := tab.BulkSelectEventReference(ctx, nil, nids) - assert.NoError(t, err) - assert.Equal(t, wantEventReferences, references) - stateAndRefs, err := tab.BulkSelectStateAtEventAndReference(ctx, nil, nids) assert.NoError(t, err) assert.Equal(t, wantStateAtEventAndRefs, stateAndRefs) diff --git a/roomserver/storage/tables/interface.go b/roomserver/storage/tables/interface.go index a9c5f8b1..333483b3 100644 --- a/roomserver/storage/tables/interface.go +++ b/roomserver/storage/tables/interface.go @@ -42,7 +42,7 @@ type Events interface { InsertEvent( ctx context.Context, txn *sql.Tx, roomNID types.RoomNID, eventTypeNID types.EventTypeNID, eventStateKeyNID types.EventStateKeyNID, eventID string, - referenceSHA256 []byte, authEventNIDs []types.EventNID, depth int64, isRejected bool, + authEventNIDs []types.EventNID, depth int64, isRejected bool, ) (types.EventNID, types.StateSnapshotNID, error) SelectEvent(ctx context.Context, txn *sql.Tx, eventID string) (types.EventNID, types.StateSnapshotNID, error) BulkSelectSnapshotsFromEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) (map[types.StateSnapshotNID][]string, error) @@ -59,7 +59,6 @@ type Events interface { UpdateEventSentToOutput(ctx context.Context, txn *sql.Tx, eventNID types.EventNID) error SelectEventID(ctx context.Context, txn *sql.Tx, eventNID types.EventNID) (eventID string, err error) BulkSelectStateAtEventAndReference(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) ([]types.StateAtEventAndReference, error) - BulkSelectEventReference(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) ([]gomatrixserverlib.EventReference, error) // BulkSelectEventID returns a map from numeric event ID to string event ID. BulkSelectEventID(ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID) (map[types.EventNID]string, error) // BulkSelectEventNIDs returns a map from string event ID to numeric event ID. @@ -113,10 +112,10 @@ type RoomAliases interface { } type PreviousEvents interface { - InsertPreviousEvent(ctx context.Context, txn *sql.Tx, previousEventID string, previousEventReferenceSHA256 []byte, eventNID types.EventNID) error + InsertPreviousEvent(ctx context.Context, txn *sql.Tx, previousEventID string, eventNID types.EventNID) error // Check if the event reference exists // Returns sql.ErrNoRows if the event reference doesn't exist. - SelectPreviousEventExists(ctx context.Context, txn *sql.Tx, eventID string, eventReferenceSHA256 []byte) error + SelectPreviousEventExists(ctx context.Context, txn *sql.Tx, eventID string) error } type Invites interface { diff --git a/roomserver/storage/tables/previous_events_table_test.go b/roomserver/storage/tables/previous_events_table_test.go index 63d54069..9d41e90b 100644 --- a/roomserver/storage/tables/previous_events_table_test.go +++ b/roomserver/storage/tables/previous_events_table_test.go @@ -45,17 +45,17 @@ func TestPreviousEventsTable(t *testing.T) { defer close() for _, x := range room.Events() { - for _, prevEvent := range x.PrevEvents() { - err := tab.InsertPreviousEvent(ctx, nil, prevEvent.EventID, prevEvent.EventSHA256, 1) + for _, eventID := range x.PrevEventIDs() { + err := tab.InsertPreviousEvent(ctx, nil, eventID, 1) assert.NoError(t, err) - err = tab.SelectPreviousEventExists(ctx, nil, prevEvent.EventID, prevEvent.EventSHA256) + err = tab.SelectPreviousEventExists(ctx, nil, eventID) assert.NoError(t, err) } } - // RandomString with a correct EventSHA256 should fail and return sql.ErrNoRows - err := tab.SelectPreviousEventExists(ctx, nil, util.RandomString(16), room.Events()[0].EventReference().EventSHA256) + // RandomString should fail and return sql.ErrNoRows + err := tab.SelectPreviousEventExists(ctx, nil, util.RandomString(16)) assert.Error(t, err) }) } diff --git a/roomserver/types/types.go b/roomserver/types/types.go index e986b9da..f57978ad 100644 --- a/roomserver/types/types.go +++ b/roomserver/types/types.go @@ -200,7 +200,7 @@ func (s StateAtEvent) IsStateEvent() bool { // The StateAtEvent is used to construct the current state of the room from the latest events. type StateAtEventAndReference struct { StateAtEvent - gomatrixserverlib.EventReference + EventID string } type StateAtEventAndReferences []StateAtEventAndReference diff --git a/test/room.go b/test/room.go index 1c0f01e4..852e3153 100644 --- a/test/room.go +++ b/test/room.go @@ -75,7 +75,7 @@ func NewRoom(t *testing.T, creator *User, modifiers ...roomModifier) *Room { return r } -func (r *Room) MustGetAuthEventRefsForEvent(t *testing.T, needed gomatrixserverlib.StateNeeded) []gomatrixserverlib.EventReference { +func (r *Room) MustGetAuthEventRefsForEvent(t *testing.T, needed gomatrixserverlib.StateNeeded) []string { t.Helper() a, err := needed.AuthEventReferences(&r.authEvents) if err != nil { @@ -176,7 +176,7 @@ func (r *Room) CreateEvent(t *testing.T, creator *User, eventType string, conten t.Fatalf("CreateEvent[%s]: failed to SetContent: %s", eventType, err) } if depth > 1 { - builder.PrevEvents = []gomatrixserverlib.EventReference{r.events[len(r.events)-1].EventReference()} + builder.PrevEvents = []string{r.events[len(r.events)-1].EventID()} } err = builder.AddAuthEvents(&r.authEvents)