mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-01 22:02:46 +00:00
Drop reference_sha
column (#3083)
Companion PR to https://github.com/matrix-org/gomatrixserverlib/pull/383
This commit is contained in:
parent
5d6221d191
commit
11b557097c
29 changed files with 299 additions and 227 deletions
|
@ -0,0 +1,72 @@
|
|||
// Copyright 2023 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deltas
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func UpDropEventReferenceSHA(ctx context.Context, tx *sql.Tx) error {
|
||||
var count int
|
||||
err := tx.QueryRowContext(ctx, `SELECT count(*) FROM roomserver_events GROUP BY event_id HAVING count(event_id) > 1`).
|
||||
Scan(&count)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return fmt.Errorf("failed to query duplicate event ids")
|
||||
}
|
||||
if count > 0 {
|
||||
return fmt.Errorf("unable to drop column, as there are duplicate event ids")
|
||||
}
|
||||
_, err = tx.ExecContext(ctx, `ALTER TABLE roomserver_events DROP COLUMN reference_sha256;`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpDropEventReferenceSHAPrevEvents(ctx context.Context, tx *sql.Tx) error {
|
||||
// rename the table
|
||||
if _, err := tx.ExecContext(ctx, `ALTER TABLE roomserver_previous_events RENAME TO _roomserver_previous_events;`); err != nil {
|
||||
return fmt.Errorf("tx.ExecContext: %w", err)
|
||||
}
|
||||
|
||||
// create new table
|
||||
if _, err := tx.ExecContext(ctx, `CREATE TABLE IF NOT EXISTS roomserver_previous_events (
|
||||
previous_event_id TEXT NOT NULL,
|
||||
event_nids TEXT NOT NULL,
|
||||
UNIQUE (previous_event_id)
|
||||
);`); err != nil {
|
||||
return fmt.Errorf("tx.ExecContext: %w", err)
|
||||
}
|
||||
|
||||
// move data
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
INSERT
|
||||
INTO roomserver_previous_events (
|
||||
previous_event_id, event_nids
|
||||
) SELECT
|
||||
previous_event_id, event_nids
|
||||
FROM _roomserver_previous_events
|
||||
;`); err != nil {
|
||||
return fmt.Errorf("tx.ExecContext: %w", err)
|
||||
}
|
||||
// drop old table
|
||||
_, err := tx.ExecContext(ctx, `DROP TABLE _roomserver_previous_events;`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -19,14 +19,14 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
@ -41,17 +41,16 @@ const eventsSchema = `
|
|||
state_snapshot_nid INTEGER NOT NULL DEFAULT 0,
|
||||
depth INTEGER NOT NULL,
|
||||
event_id TEXT NOT NULL UNIQUE,
|
||||
reference_sha256 BLOB NOT NULL,
|
||||
auth_event_nids TEXT NOT NULL DEFAULT '[]',
|
||||
is_rejected BOOLEAN NOT NULL DEFAULT FALSE
|
||||
);
|
||||
`
|
||||
|
||||
const insertEventSQL = `
|
||||
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, auth_event_nids, depth, is_rejected)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
ON CONFLICT DO UPDATE
|
||||
SET is_rejected = $8 WHERE is_rejected = 1
|
||||
SET is_rejected = $7 WHERE is_rejected = 1
|
||||
RETURNING event_nid, state_snapshot_nid;
|
||||
`
|
||||
|
||||
|
@ -100,12 +99,9 @@ const selectEventIDSQL = "" +
|
|||
"SELECT event_id FROM roomserver_events WHERE event_nid = $1"
|
||||
|
||||
const bulkSelectStateAtEventAndReferenceSQL = "" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id, reference_sha256" +
|
||||
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id" +
|
||||
" FROM roomserver_events WHERE event_nid IN ($1)"
|
||||
|
||||
const bulkSelectEventReferenceSQL = "" +
|
||||
"SELECT event_id, reference_sha256 FROM roomserver_events WHERE event_nid IN ($1)"
|
||||
|
||||
const bulkSelectEventIDSQL = "" +
|
||||
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)"
|
||||
|
||||
|
@ -137,7 +133,6 @@ type eventStatements struct {
|
|||
updateEventSentToOutputStmt *sql.Stmt
|
||||
selectEventIDStmt *sql.Stmt
|
||||
bulkSelectStateAtEventAndReferenceStmt *sql.Stmt
|
||||
bulkSelectEventReferenceStmt *sql.Stmt
|
||||
bulkSelectEventIDStmt *sql.Stmt
|
||||
selectEventRejectedStmt *sql.Stmt
|
||||
//bulkSelectEventNIDStmt *sql.Stmt
|
||||
|
@ -147,7 +142,32 @@ type eventStatements struct {
|
|||
|
||||
func CreateEventsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(eventsSchema)
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// check if the column exists
|
||||
var cName string
|
||||
migrationName := "roomserver: drop column reference_sha from roomserver_events"
|
||||
err = db.QueryRowContext(context.Background(), `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_events' AND p.name = 'reference_sha256'`).Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations([]sqlutil.Migration{
|
||||
{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpDropEventReferenceSHA,
|
||||
},
|
||||
}...)
|
||||
return m.Up(context.Background())
|
||||
}
|
||||
|
||||
func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
|
||||
|
@ -167,7 +187,6 @@ func PrepareEventsTable(db *sql.DB) (tables.Events, error) {
|
|||
{&s.selectEventSentToOutputStmt, selectEventSentToOutputSQL},
|
||||
{&s.selectEventIDStmt, selectEventIDSQL},
|
||||
{&s.bulkSelectStateAtEventAndReferenceStmt, bulkSelectStateAtEventAndReferenceSQL},
|
||||
{&s.bulkSelectEventReferenceStmt, bulkSelectEventReferenceSQL},
|
||||
{&s.bulkSelectEventIDStmt, bulkSelectEventIDSQL},
|
||||
//{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
|
||||
//{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL},
|
||||
|
@ -183,7 +202,6 @@ func (s *eventStatements) InsertEvent(
|
|||
eventTypeNID types.EventTypeNID,
|
||||
eventStateKeyNID types.EventStateKeyNID,
|
||||
eventID string,
|
||||
referenceSHA256 []byte,
|
||||
authEventNIDs []types.EventNID,
|
||||
depth int64,
|
||||
isRejected bool,
|
||||
|
@ -194,7 +212,7 @@ func (s *eventStatements) InsertEvent(
|
|||
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
|
||||
err := insertStmt.QueryRowContext(
|
||||
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
|
||||
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,
|
||||
eventID, eventNIDsAsArray(authEventNIDs), depth, isRejected,
|
||||
).Scan(&eventNID, &stateNID)
|
||||
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
|
||||
}
|
||||
|
@ -475,11 +493,10 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference(
|
|||
eventNID int64
|
||||
stateSnapshotNID int64
|
||||
eventID string
|
||||
eventSHA256 []byte
|
||||
)
|
||||
for ; rows.Next(); i++ {
|
||||
if err = rows.Scan(
|
||||
&eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID, &eventSHA256,
|
||||
&eventTypeNID, &eventStateKeyNID, &eventNID, &stateSnapshotNID, &eventID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -489,43 +506,6 @@ func (s *eventStatements) BulkSelectStateAtEventAndReference(
|
|||
result.EventNID = types.EventNID(eventNID)
|
||||
result.BeforeStateSnapshotNID = types.StateSnapshotNID(stateSnapshotNID)
|
||||
result.EventID = eventID
|
||||
result.EventSHA256 = eventSHA256
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func (s *eventStatements) BulkSelectEventReference(
|
||||
ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID,
|
||||
) ([]gomatrixserverlib.EventReference, error) {
|
||||
///////////////
|
||||
iEventNIDs := make([]interface{}, len(eventNIDs))
|
||||
for k, v := range eventNIDs {
|
||||
iEventNIDs[k] = v
|
||||
}
|
||||
selectOrig := strings.Replace(bulkSelectEventReferenceSQL, "($1)", sqlutil.QueryVariadic(len(iEventNIDs)), 1)
|
||||
selectPrep, err := s.db.Prepare(selectOrig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer selectPrep.Close() // nolint:errcheck
|
||||
///////////////
|
||||
|
||||
selectStmt := sqlutil.TxStmt(txn, selectPrep)
|
||||
rows, err := selectStmt.QueryContext(ctx, iEventNIDs...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "bulkSelectEventReference: rows.close() failed")
|
||||
results := make([]gomatrixserverlib.EventReference, len(eventNIDs))
|
||||
i := 0
|
||||
for ; rows.Next(); i++ {
|
||||
result := &results[i]
|
||||
if err = rows.Scan(&result.EventID, &result.EventSHA256); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if i != len(eventNIDs) {
|
||||
return nil, fmt.Errorf("storage: event NIDs missing from the database (%d != %d)", i, len(eventNIDs))
|
||||
|
|
|
@ -18,10 +18,12 @@ package sqlite3
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/tables"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
)
|
||||
|
@ -34,9 +36,8 @@ import (
|
|||
const previousEventSchema = `
|
||||
CREATE TABLE IF NOT EXISTS roomserver_previous_events (
|
||||
previous_event_id TEXT NOT NULL,
|
||||
previous_reference_sha256 BLOB,
|
||||
event_nids TEXT NOT NULL,
|
||||
UNIQUE (previous_event_id, previous_reference_sha256)
|
||||
UNIQUE (previous_event_id)
|
||||
);
|
||||
`
|
||||
|
||||
|
@ -47,20 +48,20 @@ const previousEventSchema = `
|
|||
// The lock is necessary to avoid data races when checking whether an event is already referenced by another event.
|
||||
const insertPreviousEventSQL = `
|
||||
INSERT OR REPLACE INTO roomserver_previous_events
|
||||
(previous_event_id, previous_reference_sha256, event_nids)
|
||||
VALUES ($1, $2, $3)
|
||||
(previous_event_id, event_nids)
|
||||
VALUES ($1, $2)
|
||||
`
|
||||
|
||||
const selectPreviousEventNIDsSQL = `
|
||||
SELECT event_nids FROM roomserver_previous_events
|
||||
WHERE previous_event_id = $1 AND previous_reference_sha256 = $2
|
||||
WHERE previous_event_id = $1
|
||||
`
|
||||
|
||||
// Check if the event is referenced by another event in the table.
|
||||
// This should only be done while holding a "FOR UPDATE" lock on the row in the rooms table for this room.
|
||||
const selectPreviousEventExistsSQL = `
|
||||
SELECT 1 FROM roomserver_previous_events
|
||||
WHERE previous_event_id = $1 AND previous_reference_sha256 = $2
|
||||
WHERE previous_event_id = $1
|
||||
`
|
||||
|
||||
type previousEventStatements struct {
|
||||
|
@ -72,7 +73,30 @@ type previousEventStatements struct {
|
|||
|
||||
func CreatePrevEventsTable(db *sql.DB) error {
|
||||
_, err := db.Exec(previousEventSchema)
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// check if the column exists
|
||||
var cName string
|
||||
migrationName := "roomserver: drop column reference_sha from roomserver_prev_events"
|
||||
err = db.QueryRowContext(context.Background(), `SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_previous_events' AND p.name = 'previous_reference_sha256'`).Scan(&cName)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) { // migration was already executed, as the column was removed
|
||||
if err = sqlutil.InsertMigration(context.Background(), db, migrationName); err != nil {
|
||||
return fmt.Errorf("unable to manually insert migration '%s': %w", migrationName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations([]sqlutil.Migration{
|
||||
{
|
||||
Version: migrationName,
|
||||
Up: deltas.UpDropEventReferenceSHAPrevEvents,
|
||||
},
|
||||
}...)
|
||||
return m.Up(context.Background())
|
||||
}
|
||||
|
||||
func PreparePrevEventsTable(db *sql.DB) (tables.PreviousEvents, error) {
|
||||
|
@ -91,13 +115,12 @@ func (s *previousEventStatements) InsertPreviousEvent(
|
|||
ctx context.Context,
|
||||
txn *sql.Tx,
|
||||
previousEventID string,
|
||||
previousEventReferenceSHA256 []byte,
|
||||
eventNID types.EventNID,
|
||||
) error {
|
||||
var eventNIDs string
|
||||
eventNIDAsString := fmt.Sprintf("%d", eventNID)
|
||||
selectStmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt)
|
||||
err := selectStmt.QueryRowContext(ctx, previousEventID, previousEventReferenceSHA256).Scan(&eventNIDs)
|
||||
err := selectStmt.QueryRowContext(ctx, previousEventID).Scan(&eventNIDs)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return fmt.Errorf("selectStmt.QueryRowContext.Scan: %w", err)
|
||||
}
|
||||
|
@ -115,7 +138,7 @@ func (s *previousEventStatements) InsertPreviousEvent(
|
|||
}
|
||||
insertStmt := sqlutil.TxStmt(txn, s.insertPreviousEventStmt)
|
||||
_, err = insertStmt.ExecContext(
|
||||
ctx, previousEventID, previousEventReferenceSHA256, eventNIDs,
|
||||
ctx, previousEventID, eventNIDs,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
@ -123,9 +146,9 @@ func (s *previousEventStatements) InsertPreviousEvent(
|
|||
// Check if the event reference exists
|
||||
// Returns sql.ErrNoRows if the event reference doesn't exist.
|
||||
func (s *previousEventStatements) SelectPreviousEventExists(
|
||||
ctx context.Context, txn *sql.Tx, eventID string, eventReferenceSHA256 []byte,
|
||||
ctx context.Context, txn *sql.Tx, eventID string,
|
||||
) error {
|
||||
var ok int64
|
||||
stmt := sqlutil.TxStmt(txn, s.selectPreviousEventExistsStmt)
|
||||
return stmt.QueryRowContext(ctx, eventID, eventReferenceSHA256).Scan(&ok)
|
||||
return stmt.QueryRowContext(ctx, eventID).Scan(&ok)
|
||||
}
|
||||
|
|
|
@ -21,14 +21,13 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
|
||||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// A Database is used to store room events and stream offsets.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue