From b7e3b81a22775697ad8ff463247d83077ea6cbe3 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Feb 2021 11:45:49 +0000 Subject: [PATCH 1/9] Fix ON CONFLICT on sync API account data (#1745) (#1750) --- syncapi/storage/postgres/account_data_table.go | 2 +- syncapi/storage/sqlite3/account_data_table.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/syncapi/storage/postgres/account_data_table.go b/syncapi/storage/postgres/account_data_table.go index 67eb1e86..25bdb1da 100644 --- a/syncapi/storage/postgres/account_data_table.go +++ b/syncapi/storage/postgres/account_data_table.go @@ -53,7 +53,7 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_account_data_id_idx ON syncapi_account const insertAccountDataSQL = "" + "INSERT INTO syncapi_account_data_type (user_id, room_id, type) VALUES ($1, $2, $3)" + " ON CONFLICT ON CONSTRAINT syncapi_account_data_unique" + - " DO UPDATE SET id = EXCLUDED.id" + + " DO UPDATE SET id = nextval('syncapi_stream_id')" + " RETURNING id" const selectAccountDataInRangeSQL = "" + diff --git a/syncapi/storage/sqlite3/account_data_table.go b/syncapi/storage/sqlite3/account_data_table.go index 1c65cb6a..24c44224 100644 --- a/syncapi/storage/sqlite3/account_data_table.go +++ b/syncapi/storage/sqlite3/account_data_table.go @@ -39,7 +39,7 @@ CREATE TABLE IF NOT EXISTS syncapi_account_data_type ( const insertAccountDataSQL = "" + "INSERT INTO syncapi_account_data_type (id, user_id, room_id, type) VALUES ($1, $2, $3, $4)" + " ON CONFLICT (user_id, room_id, type) DO UPDATE" + - " SET id = EXCLUDED.id" + " SET id = $5" const selectAccountDataInRangeSQL = "" + "SELECT room_id, type FROM syncapi_account_data_type" + @@ -86,7 +86,7 @@ func (s *accountDataStatements) InsertAccountData( if err != nil { return } - _, err = sqlutil.TxStmt(txn, s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType) + _, err = sqlutil.TxStmt(txn, s.insertAccountDataStmt).ExecContext(ctx, pos, userID, roomID, dataType, pos) return } From 6099379ea48cf47f9570e9bc51aba4bb3fa8c066 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Feb 2021 11:52:49 +0000 Subject: [PATCH 2/9] Remove rooms table from federation sender (#1751) * Remove last sent event ID column from federation sender * Remove EventIDMismatchError * Remove the federationsender rooms table altogether, it's useless * Add migration * Fix migrations * Fix migrations --- .../postgres/deltas/2021020411080000_rooms.go | 46 ++++++++ .../storage/postgres/room_table.go | 104 ----------------- federationsender/storage/postgres/storage.go | 11 +- federationsender/storage/shared/storage.go | 26 +---- .../sqlite3/deltas/2021020411080000_rooms.go | 46 ++++++++ .../storage/sqlite3/room_table.go | 105 ------------------ federationsender/storage/sqlite3/storage.go | 11 +- federationsender/storage/tables/interface.go | 6 - federationsender/types/types.go | 18 --- 9 files changed, 105 insertions(+), 268 deletions(-) create mode 100644 federationsender/storage/postgres/deltas/2021020411080000_rooms.go delete mode 100644 federationsender/storage/postgres/room_table.go create mode 100644 federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go delete mode 100644 federationsender/storage/sqlite3/room_table.go diff --git a/federationsender/storage/postgres/deltas/2021020411080000_rooms.go b/federationsender/storage/postgres/deltas/2021020411080000_rooms.go new file mode 100644 index 00000000..cc4bdadf --- /dev/null +++ b/federationsender/storage/postgres/deltas/2021020411080000_rooms.go @@ -0,0 +1,46 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func LoadRemoveRoomsTable(m *sqlutil.Migrations) { + m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func UpRemoveRoomsTable(tx *sql.Tx) error { + _, err := tx.Exec(` + DROP TABLE IF EXISTS federationsender_rooms; + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownRemoveRoomsTable(tx *sql.Tx) error { + // We can't reverse this. + return nil +} diff --git a/federationsender/storage/postgres/room_table.go b/federationsender/storage/postgres/room_table.go deleted file mode 100644 index 8d3ed20f..00000000 --- a/federationsender/storage/postgres/room_table.go +++ /dev/null @@ -1,104 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package postgres - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal/sqlutil" -) - -const roomSchema = ` -CREATE TABLE IF NOT EXISTS federationsender_rooms ( - -- The string ID of the room - room_id TEXT PRIMARY KEY, - -- The most recent event state by the room server. - -- We can use this to tell if our view of the room state has become - -- desynchronised. - last_event_id TEXT NOT NULL -);` - -const insertRoomSQL = "" + - "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" + - " ON CONFLICT DO NOTHING" - -const selectRoomForUpdateSQL = "" + - "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1 FOR UPDATE" - -const updateRoomSQL = "" + - "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1" - -type roomStatements struct { - db *sql.DB - insertRoomStmt *sql.Stmt - selectRoomForUpdateStmt *sql.Stmt - updateRoomStmt *sql.Stmt -} - -func NewPostgresRoomsTable(db *sql.DB) (s *roomStatements, err error) { - s = &roomStatements{ - db: db, - } - _, err = s.db.Exec(roomSchema) - if err != nil { - return - } - if s.insertRoomStmt, err = s.db.Prepare(insertRoomSQL); err != nil { - return - } - if s.selectRoomForUpdateStmt, err = s.db.Prepare(selectRoomForUpdateSQL); err != nil { - return - } - if s.updateRoomStmt, err = s.db.Prepare(updateRoomSQL); err != nil { - return - } - return -} - -// insertRoom inserts the room if it didn't already exist. -// If the room didn't exist then last_event_id is set to the empty string. -func (s *roomStatements) InsertRoom( - ctx context.Context, txn *sql.Tx, roomID string, -) error { - _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) - return err -} - -// selectRoomForUpdate locks the row for the room and returns the last_event_id. -// The row must already exist in the table. Callers can ensure that the row -// exists by calling insertRoom first. -func (s *roomStatements) SelectRoomForUpdate( - ctx context.Context, txn *sql.Tx, roomID string, -) (string, error) { - var lastEventID string - stmt := sqlutil.TxStmt(txn, s.selectRoomForUpdateStmt) - err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID) - if err != nil { - return "", err - } - return lastEventID, nil -} - -// updateRoom updates the last_event_id for the room. selectRoomForUpdate should -// have already been called earlier within the transaction. -func (s *roomStatements) UpdateRoom( - ctx context.Context, txn *sql.Tx, roomID, lastEventID string, -) error { - stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) - _, err := stmt.ExecContext(ctx, roomID, lastEventID) - return err -} diff --git a/federationsender/storage/postgres/storage.go b/federationsender/storage/postgres/storage.go index b9827ca1..5edc08ad 100644 --- a/federationsender/storage/postgres/storage.go +++ b/federationsender/storage/postgres/storage.go @@ -18,6 +18,7 @@ package postgres import ( "database/sql" + "github.com/matrix-org/dendrite/federationsender/storage/postgres/deltas" "github.com/matrix-org/dendrite/federationsender/storage/shared" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" @@ -56,10 +57,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } - rooms, err := NewPostgresRoomsTable(d.db) - if err != nil { - return nil, err - } blacklist, err := NewPostgresBlacklistTable(d.db) if err != nil { return nil, err @@ -72,6 +69,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + m := sqlutil.NewMigrations() + deltas.LoadRemoveRoomsTable(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Cache: cache, @@ -80,7 +82,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderQueuePDUs: queuePDUs, FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueJSON: queueJSON, - FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, FederationSenderInboundPeeks: inboundPeeks, FederationSenderOutboundPeeks: outboundPeeks, diff --git a/federationsender/storage/shared/storage.go b/federationsender/storage/shared/storage.go index 4c949042..2e74e9d6 100644 --- a/federationsender/storage/shared/storage.go +++ b/federationsender/storage/shared/storage.go @@ -34,7 +34,6 @@ type Database struct { FederationSenderQueueEDUs tables.FederationSenderQueueEDUs FederationSenderQueueJSON tables.FederationSenderQueueJSON FederationSenderJoinedHosts tables.FederationSenderJoinedHosts - FederationSenderRooms tables.FederationSenderRooms FederationSenderBlacklist tables.FederationSenderBlacklist FederationSenderOutboundPeeks tables.FederationSenderOutboundPeeks FederationSenderInboundPeeks tables.FederationSenderInboundPeeks @@ -64,29 +63,6 @@ func (d *Database) UpdateRoom( removeHosts []string, ) (joinedHosts []types.JoinedHost, err error) { err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error { - err = d.FederationSenderRooms.InsertRoom(ctx, txn, roomID) - if err != nil { - return err - } - - lastSentEventID, err := d.FederationSenderRooms.SelectRoomForUpdate(ctx, txn, roomID) - if err != nil { - return err - } - - if lastSentEventID == newEventID { - // We've handled this message before, so let's just ignore it. - // We can only get a duplicate for the last message we processed, - // so its enough just to compare the newEventID with lastSentEventID - return nil - } - - if lastSentEventID != "" && lastSentEventID != oldEventID { - return types.EventIDMismatchError{ - DatabaseID: lastSentEventID, RoomServerID: oldEventID, - } - } - joinedHosts, err = d.FederationSenderJoinedHosts.SelectJoinedHostsWithTx(ctx, txn, roomID) if err != nil { return err @@ -101,7 +77,7 @@ func (d *Database) UpdateRoom( if err = d.FederationSenderJoinedHosts.DeleteJoinedHosts(ctx, txn, removeHosts); err != nil { return err } - return d.FederationSenderRooms.UpdateRoom(ctx, txn, roomID, newEventID) + return nil }) return } diff --git a/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go b/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go new file mode 100644 index 00000000..cc4bdadf --- /dev/null +++ b/federationsender/storage/sqlite3/deltas/2021020411080000_rooms.go @@ -0,0 +1,46 @@ +// Copyright 2021 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deltas + +import ( + "database/sql" + "fmt" + + "github.com/matrix-org/dendrite/internal/sqlutil" + "github.com/pressly/goose" +) + +func LoadFromGoose() { + goose.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func LoadRemoveRoomsTable(m *sqlutil.Migrations) { + m.AddMigration(UpRemoveRoomsTable, DownRemoveRoomsTable) +} + +func UpRemoveRoomsTable(tx *sql.Tx) error { + _, err := tx.Exec(` + DROP TABLE IF EXISTS federationsender_rooms; + `) + if err != nil { + return fmt.Errorf("failed to execute upgrade: %w", err) + } + return nil +} + +func DownRemoveRoomsTable(tx *sql.Tx) error { + // We can't reverse this. + return nil +} diff --git a/federationsender/storage/sqlite3/room_table.go b/federationsender/storage/sqlite3/room_table.go deleted file mode 100644 index 0710ccca..00000000 --- a/federationsender/storage/sqlite3/room_table.go +++ /dev/null @@ -1,105 +0,0 @@ -// Copyright 2017-2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package sqlite3 - -import ( - "context" - "database/sql" - - "github.com/matrix-org/dendrite/internal/sqlutil" -) - -const roomSchema = ` -CREATE TABLE IF NOT EXISTS federationsender_rooms ( - -- The string ID of the room - room_id TEXT PRIMARY KEY, - -- The most recent event state by the room server. - -- We can use this to tell if our view of the room state has become - -- desynchronised. - last_event_id TEXT NOT NULL -);` - -const insertRoomSQL = "" + - "INSERT INTO federationsender_rooms (room_id, last_event_id) VALUES ($1, '')" + - " ON CONFLICT DO NOTHING" - -const selectRoomForUpdateSQL = "" + - "SELECT last_event_id FROM federationsender_rooms WHERE room_id = $1" - -const updateRoomSQL = "" + - "UPDATE federationsender_rooms SET last_event_id = $2 WHERE room_id = $1" - -type roomStatements struct { - db *sql.DB - insertRoomStmt *sql.Stmt - selectRoomForUpdateStmt *sql.Stmt - updateRoomStmt *sql.Stmt -} - -func NewSQLiteRoomsTable(db *sql.DB) (s *roomStatements, err error) { - s = &roomStatements{ - db: db, - } - _, err = db.Exec(roomSchema) - if err != nil { - return - } - - if s.insertRoomStmt, err = db.Prepare(insertRoomSQL); err != nil { - return - } - if s.selectRoomForUpdateStmt, err = db.Prepare(selectRoomForUpdateSQL); err != nil { - return - } - if s.updateRoomStmt, err = db.Prepare(updateRoomSQL); err != nil { - return - } - return -} - -// insertRoom inserts the room if it didn't already exist. -// If the room didn't exist then last_event_id is set to the empty string. -func (s *roomStatements) InsertRoom( - ctx context.Context, txn *sql.Tx, roomID string, -) error { - _, err := sqlutil.TxStmt(txn, s.insertRoomStmt).ExecContext(ctx, roomID) - return err -} - -// selectRoomForUpdate locks the row for the room and returns the last_event_id. -// The row must already exist in the table. Callers can ensure that the row -// exists by calling insertRoom first. -func (s *roomStatements) SelectRoomForUpdate( - ctx context.Context, txn *sql.Tx, roomID string, -) (string, error) { - var lastEventID string - stmt := sqlutil.TxStmt(txn, s.selectRoomForUpdateStmt) - err := stmt.QueryRowContext(ctx, roomID).Scan(&lastEventID) - if err != nil { - return "", err - } - return lastEventID, nil -} - -// updateRoom updates the last_event_id for the room. selectRoomForUpdate should -// have already been called earlier within the transaction. -func (s *roomStatements) UpdateRoom( - ctx context.Context, txn *sql.Tx, roomID, lastEventID string, -) error { - stmt := sqlutil.TxStmt(txn, s.updateRoomStmt) - _, err := stmt.ExecContext(ctx, roomID, lastEventID) - return err -} diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 2b135858..84a9ff86 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -21,6 +21,7 @@ import ( _ "github.com/mattn/go-sqlite3" "github.com/matrix-org/dendrite/federationsender/storage/shared" + "github.com/matrix-org/dendrite/federationsender/storage/sqlite3/deltas" "github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/sqlutil" "github.com/matrix-org/dendrite/setup/config" @@ -46,10 +47,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } - rooms, err := NewSQLiteRoomsTable(d.db) - if err != nil { - return nil, err - } queuePDUs, err := NewSQLiteQueuePDUsTable(d.db) if err != nil { return nil, err @@ -74,6 +71,11 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS if err != nil { return nil, err } + m := sqlutil.NewMigrations() + deltas.LoadRemoveRoomsTable(m) + if err = m.RunDeltas(d.db, dbProperties); err != nil { + return nil, err + } d.Database = shared.Database{ DB: d.db, Cache: cache, @@ -82,7 +84,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions, cache caching.FederationS FederationSenderQueuePDUs: queuePDUs, FederationSenderQueueEDUs: queueEDUs, FederationSenderQueueJSON: queueJSON, - FederationSenderRooms: rooms, FederationSenderBlacklist: blacklist, FederationSenderOutboundPeeks: outboundPeeks, FederationSenderInboundPeeks: inboundPeeks, diff --git a/federationsender/storage/tables/interface.go b/federationsender/storage/tables/interface.go index 22fd5554..34ff0b97 100644 --- a/federationsender/storage/tables/interface.go +++ b/federationsender/storage/tables/interface.go @@ -56,12 +56,6 @@ type FederationSenderJoinedHosts interface { SelectJoinedHostsForRooms(ctx context.Context, roomIDs []string) ([]gomatrixserverlib.ServerName, error) } -type FederationSenderRooms interface { - InsertRoom(ctx context.Context, txn *sql.Tx, roomID string) error - SelectRoomForUpdate(ctx context.Context, txn *sql.Tx, roomID string) (string, error) - UpdateRoom(ctx context.Context, txn *sql.Tx, roomID, lastEventID string) error -} - type FederationSenderBlacklist interface { InsertBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) error SelectBlacklist(ctx context.Context, txn *sql.Tx, serverName gomatrixserverlib.ServerName) (bool, error) diff --git a/federationsender/types/types.go b/federationsender/types/types.go index 90da310c..c486c05c 100644 --- a/federationsender/types/types.go +++ b/federationsender/types/types.go @@ -15,8 +15,6 @@ package types import ( - "fmt" - "github.com/matrix-org/gomatrixserverlib" ) @@ -34,22 +32,6 @@ func (s ServerNames) Len() int { return len(s) } func (s ServerNames) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s ServerNames) Less(i, j int) bool { return s[i] < s[j] } -// A EventIDMismatchError indicates that we have got out of sync with the -// room server. -type EventIDMismatchError struct { - // The event ID we have stored in our local database. - DatabaseID string - // The event ID received from the room server. - RoomServerID string -} - -func (e EventIDMismatchError) Error() string { - return fmt.Sprintf( - "mismatched last sent event ID: had %q in database got %q from room server", - e.DatabaseID, e.RoomServerID, - ) -} - // tracks peeks we're performing on another server over federation type OutboundPeek struct { PeekID string From 6e44450cc9b541e5e79b7e601462fd3d3cf35fe5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Feb 2021 12:20:37 +0000 Subject: [PATCH 3/9] Don't re-request state events that are already in the timeline (#1739) * Don't request state events if we already have the timeline events (Postgres only) * Rename variable * nocyclo * Add SQLite * Tweaks * Revert query change * Don't dedupe if asking for full state * Update query --- syncapi/storage/interface.go | 2 +- .../postgres/current_room_state_table.go | 5 ++- syncapi/storage/shared/syncserver.go | 8 ++--- .../sqlite3/current_room_state_table.go | 3 +- syncapi/storage/sqlite3/filtering.go | 9 ++++- .../sqlite3/output_room_events_table.go | 6 ++-- syncapi/storage/tables/interface.go | 2 +- syncapi/streams/stream_pdu.go | 33 ++++++++++++------- 8 files changed, 45 insertions(+), 23 deletions(-) diff --git a/syncapi/storage/interface.go b/syncapi/storage/interface.go index 22d80161..9cff4cad 100644 --- a/syncapi/storage/interface.go +++ b/syncapi/storage/interface.go @@ -35,7 +35,7 @@ type Database interface { MaxStreamPositionForAccountData(ctx context.Context) (types.StreamPosition, error) MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error) - CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) diff --git a/syncapi/storage/postgres/current_room_state_table.go b/syncapi/storage/postgres/current_room_state_table.go index 77e1e363..ee649c16 100644 --- a/syncapi/storage/postgres/current_room_state_table.go +++ b/syncapi/storage/postgres/current_room_state_table.go @@ -84,7 +84,8 @@ const selectCurrentStateSQL = "" + " AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" + " AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" + " AND ( $6::bool IS NULL OR contains_url = $6 )" + - " LIMIT $7" + " AND (event_id = ANY($7)) IS NOT TRUE" + + " LIMIT $8" const selectJoinedUsersSQL = "" + "SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'" @@ -197,6 +198,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, + excludeEventIDs []string, ) ([]*gomatrixserverlib.HeaderedEvent, error) { stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt) rows, err := stmt.QueryContext(ctx, roomID, @@ -205,6 +207,7 @@ func (s *currentRoomStateStatements) SelectCurrentState( pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)), pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)), stateFilter.ContainsURL, + pq.StringArray(excludeEventIDs), stateFilter.Limit, ) if err != nil { diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 239f6812..ee2c176c 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -103,8 +103,8 @@ func (d *Database) MaxStreamPositionForAccountData(ctx context.Context) (types.S return types.StreamPosition(id), nil } -func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) { - return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart) +func (d *Database) CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) { + return d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilterPart, excludeEventIDs) } func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error) { @@ -195,7 +195,7 @@ func (d *Database) GetStateEvent( func (d *Database) GetStateEventsForRoom( ctx context.Context, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) (stateEvents []*gomatrixserverlib.HeaderedEvent, err error) { - stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter) + stateEvents, err = d.CurrentRoomState.SelectCurrentState(ctx, nil, roomID, stateFilter, nil) return } @@ -870,7 +870,7 @@ func (d *Database) currentStateStreamEventsForRoom( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, ) ([]types.StreamEvent, error) { - allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter) + allState, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter, nil) if err != nil { return nil, err } diff --git a/syncapi/storage/sqlite3/current_room_state_table.go b/syncapi/storage/sqlite3/current_room_state_table.go index 55ed27a4..4fbbf45c 100644 --- a/syncapi/storage/sqlite3/current_room_state_table.go +++ b/syncapi/storage/sqlite3/current_room_state_table.go @@ -178,6 +178,7 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership( func (s *currentRoomStateStatements) SelectCurrentState( ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, + excludeEventIDs []string, ) ([]*gomatrixserverlib.HeaderedEvent, error) { stmt, params, err := prepareWithFilters( s.db, txn, selectCurrentStateSQL, @@ -186,7 +187,7 @@ func (s *currentRoomStateStatements) SelectCurrentState( }, stateFilter.Senders, stateFilter.NotSenders, stateFilter.Types, stateFilter.NotTypes, - stateFilter.Limit, FilterOrderNone, + excludeEventIDs, stateFilter.Limit, FilterOrderNone, ) if err != nil { return nil, fmt.Errorf("s.prepareWithFilters: %w", err) diff --git a/syncapi/storage/sqlite3/filtering.go b/syncapi/storage/sqlite3/filtering.go index 0faf5297..050f3a26 100644 --- a/syncapi/storage/sqlite3/filtering.go +++ b/syncapi/storage/sqlite3/filtering.go @@ -23,9 +23,10 @@ const ( // fields might come from either a StateFilter or an EventFilter, // and it's easier just to have the caller extract the relevant // parts. +// nolint:gocyclo func prepareWithFilters( db *sql.DB, txn *sql.Tx, query string, params []interface{}, - senders, notsenders, types, nottypes []string, + senders, notsenders, types, nottypes []string, excludeEventIDs []string, limit int, order FilterOrder, ) (*sql.Stmt, []interface{}, error) { offset := len(params) @@ -53,6 +54,12 @@ func prepareWithFilters( params, offset = append(params, v), offset+1 } } + if count := len(excludeEventIDs); count > 0 { + query += " AND event_id NOT IN " + sqlutil.QueryVariadicOffset(count, offset) + for _, v := range excludeEventIDs { + params, offset = append(params, v), offset+1 + } + } switch order { case FilterOrderAsc: query += " ORDER BY id ASC" diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 019aba8b..3c70a499 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -150,7 +150,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( }, stateFilter.Senders, stateFilter.NotSenders, stateFilter.Types, stateFilter.NotTypes, - stateFilter.Limit, FilterOrderAsc, + nil, stateFilter.Limit, FilterOrderAsc, ) if err != nil { return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err) @@ -326,7 +326,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents( }, eventFilter.Senders, eventFilter.NotSenders, eventFilter.Types, eventFilter.NotTypes, - eventFilter.Limit+1, FilterOrderDesc, + nil, eventFilter.Limit+1, FilterOrderDesc, ) if err != nil { return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err) @@ -374,7 +374,7 @@ func (s *outputRoomEventsStatements) SelectEarlyEvents( }, eventFilter.Senders, eventFilter.NotSenders, eventFilter.Types, eventFilter.NotTypes, - eventFilter.Limit, FilterOrderAsc, + nil, eventFilter.Limit, FilterOrderAsc, ) if err != nil { return nil, fmt.Errorf("s.prepareWithFilters: %w", err) diff --git a/syncapi/storage/tables/interface.go b/syncapi/storage/tables/interface.go index 997486dd..02887271 100644 --- a/syncapi/storage/tables/interface.go +++ b/syncapi/storage/tables/interface.go @@ -91,7 +91,7 @@ type CurrentRoomState interface { DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error DeleteRoomStateForRoom(ctx context.Context, txn *sql.Tx, roomID string) error // SelectCurrentState returns all the current state events for the given room. - SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]*gomatrixserverlib.HeaderedEvent, error) + SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error) // SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state. SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error) // SelectJoinedUsers returns a map of room ID to a list of joined user IDs. diff --git a/syncapi/streams/stream_pdu.go b/syncapi/streams/stream_pdu.go index ae38dc30..e11ac8dd 100644 --- a/syncapi/streams/stream_pdu.go +++ b/syncapi/streams/stream_pdu.go @@ -98,7 +98,7 @@ func (p *PDUStreamProvider) CompleteSync( var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, roomID, r, &stateFilter, &eventFilter, req.Device, + ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -124,7 +124,7 @@ func (p *PDUStreamProvider) CompleteSync( if !peek.Deleted { var jr *types.JoinResponse jr, err = p.getJoinResponseForCompleteSync( - ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device, + ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device, ) if err != nil { req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed") @@ -254,26 +254,37 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse( return nil } +// nolint:gocyclo func (p *PDUStreamProvider) getJoinResponseForCompleteSync( ctx context.Context, roomID string, r types.Range, stateFilter *gomatrixserverlib.StateFilter, eventFilter *gomatrixserverlib.RoomEventFilter, + wantFullState bool, device *userapi.Device, ) (jr *types.JoinResponse, err error) { - var stateEvents []*gomatrixserverlib.HeaderedEvent - stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter) + // TODO: When filters are added, we may need to call this multiple times to get enough events. + // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 + recentStreamEvents, limited, err := p.DB.RecentEvents( + ctx, roomID, r, eventFilter, true, true, + ) if err != nil { return } - // TODO: When filters are added, we may need to call this multiple times to get enough events. - // See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316 - var recentStreamEvents []types.StreamEvent - var limited bool - recentStreamEvents, limited, err = p.DB.RecentEvents( - ctx, roomID, r, eventFilter, true, true, - ) + + // Get the event IDs of the stream events we fetched. There's no point in us + var excludingEventIDs []string + if !wantFullState { + excludingEventIDs = make([]string, 0, len(recentStreamEvents)) + for _, event := range recentStreamEvents { + if event.StateKey() != nil { + excludingEventIDs = append(excludingEventIDs, event.EventID()) + } + } + } + + stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs) if err != nil { return } From bd72ed50d426f6025004a2beb1372912ecf1d500 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Feb 2021 12:25:31 +0000 Subject: [PATCH 4/9] Reduce log level of 'Failed to send transaction' log line, since quite often it is flooding logs for dead servers --- federationsender/queue/destinationqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 99b9e449..d9567eeb 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -444,7 +444,7 @@ func (oq *destinationQueue) nextTransaction( log.WithFields(log.Fields{ "destination": oq.destination, log.ErrorKey: err, - }).Infof("Failed to send transaction %q", t.TransactionID) + }).Debugf("Failed to send transaction %q", t.TransactionID) return false, 0, 0, err } } From 397b158c05be7ee395257913820f86616302e6ed Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Feb 2021 12:39:43 +0000 Subject: [PATCH 5/9] Version 0.3.9 --- CHANGES.md | 14 ++++++++++++++ internal/version.go | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index b11c3d7a..b76a6acf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,19 @@ # Changelog +## Dendrite 0.3.9 (2021-02-04) + +### Features + +* Performance of initial/complete syncs has been improved dramatically +* State events that can't be authed are now dropped when joining a room rather than unexpectedly causing the room join to fail +* State events that already appear in the timeline will no longer be requested from the sync API database more than once, which may reduce memory usage in some cases + +### Fixes + +* A crash at startup due to a conflict in the sync API account data has been fixed +* A crash at startup due to mismatched event IDs in the federation sender has been fixed +* A redundant check which may cause the roomserver memberships table to get out of sync has been removed + ## Dendrite 0.3.8 (2021-01-28) ### Fixes diff --git a/internal/version.go b/internal/version.go index f5c6a423..7d098037 100644 --- a/internal/version.go +++ b/internal/version.go @@ -17,7 +17,7 @@ var build string const ( VersionMajor = 0 VersionMinor = 3 - VersionPatch = 8 + VersionPatch = 9 VersionTag = "" // example: "rc1" ) From 02e6d89cc2bde930be36323e626ce54643d56a7f Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sat, 6 Feb 2021 11:49:18 +0000 Subject: [PATCH 6/9] Fix crash in membership updater (#1753) * Fix nil pointer exception in membership updater * goimports --- roomserver/internal/input/input_membership.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/roomserver/internal/input/input_membership.go b/roomserver/internal/input/input_membership.go index bc646c3c..44435bfd 100644 --- a/roomserver/internal/input/input_membership.go +++ b/roomserver/internal/input/input_membership.go @@ -107,6 +107,23 @@ func (r *Inputer) updateMembership( return updates, nil } + // In an ideal world, we shouldn't ever have "add" be nil and "remove" be + // set, as this implies that we're deleting a state event without replacing + // it (a thing that ordinarily shouldn't happen in Matrix). However, state + // resets are sadly a thing occasionally and we have to account for that. + // Beforehand there used to be a check here which stopped dead if we hit + // this scenario, but that meant that the membership table got out of sync + // after a state reset, often thinking that the user was still joined to + // the room even though the room state said otherwise, and this would prevent + // the user from being able to attempt to rejoin the room without modifying + // the database. So instead what we'll do is we'll just update the membership + // table to say that the user is "leave" and we'll use the old event to + // avoid nil pointer exceptions on the code path that follows. + if add == nil { + add = remove + newMembership = gomatrixserverlib.Leave + } + mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) if err != nil { return nil, err From 82df1948588919800866151cde569c726577b0c3 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Sat, 6 Feb 2021 16:56:55 +0000 Subject: [PATCH 7/9] Increase limit --- setup/mscs/msc2946/msc2946.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/setup/mscs/msc2946/msc2946.go b/setup/mscs/msc2946/msc2946.go index 3580d4d2..694c9732 100644 --- a/setup/mscs/msc2946/msc2946.go +++ b/setup/mscs/msc2946/msc2946.go @@ -46,7 +46,7 @@ const ( // Defaults sets the request defaults func Defaults(r *gomatrixserverlib.MSC2946SpacesRequest) { - r.Limit = 100 + r.Limit = 2000 r.MaxRoomsPerSpace = -1 } @@ -108,9 +108,6 @@ func federatedSpacesHandler( JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()), } } - if r.Limit > 100 { - r.Limit = 100 - } w := walker{ req: &r, rootRoomID: roomID, From 9a199ba179273250330c85f6d543381b2ac1474c Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Sat, 6 Feb 2021 17:05:00 +0000 Subject: [PATCH 8/9] Remove 100 default --- setup/mscs/msc2946/msc2946.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/setup/mscs/msc2946/msc2946.go b/setup/mscs/msc2946/msc2946.go index 694c9732..03af0666 100644 --- a/setup/mscs/msc2946/msc2946.go +++ b/setup/mscs/msc2946/msc2946.go @@ -144,9 +144,6 @@ func spacesHandler( if resErr := chttputil.UnmarshalJSONRequest(req, &r); resErr != nil { return *resErr } - if r.Limit > 100 { - r.Limit = 100 - } w := walker{ req: &r, rootRoomID: roomID, From 85aaaf9bb964c92a0eb7cd236509d5db0c164aec Mon Sep 17 00:00:00 2001 From: David Florness Date: Fri, 12 Feb 2021 04:25:35 -0500 Subject: [PATCH 9/9] Fix a few indentation mistakes (#1752) Signed-off-by: David Florness --- build/docker/docker-compose.polylith.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/build/docker/docker-compose.polylith.yml b/build/docker/docker-compose.polylith.yml index f377e36f..e95e1957 100644 --- a/build/docker/docker-compose.polylith.yml +++ b/build/docker/docker-compose.polylith.yml @@ -70,7 +70,7 @@ services: volumes: - ./config:/etc/dendrite networks: - - internal + - internal signing_key_server: hostname: signing_key_server @@ -86,9 +86,9 @@ services: image: matrixdotorg/dendrite-polylith:latest command: userapi volumes: - - ./config:/etc/dendrite + - ./config:/etc/dendrite networks: - - internal + - internal appservice_api: hostname: appservice_api