mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-26 15:08:28 +00:00
WIP test to avoid roomserver deadlocks
This commit is contained in:
parent
3c419be6af
commit
ed5dc8646d
12 changed files with 25 additions and 18 deletions
|
@ -56,7 +56,7 @@ func CheckForSoftFail(
|
||||||
|
|
||||||
// Then get the state entries for the current state snapshot.
|
// Then get the state entries for the current state snapshot.
|
||||||
// We'll use this to check if the event is allowed right now.
|
// We'll use this to check if the event is allowed right now.
|
||||||
roomState := state.NewStateResolution(db, *roomInfo)
|
roomState := state.NewStateResolution(db, *roomInfo, nil)
|
||||||
authStateEntries, err = roomState.LoadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
authStateEntries, err = roomState.LoadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
|
return true, fmt.Errorf("roomState.LoadStateAtSnapshot: %w", err)
|
||||||
|
|
|
@ -172,7 +172,7 @@ func GetMembershipsAtState(
|
||||||
}
|
}
|
||||||
|
|
||||||
func StateBeforeEvent(ctx context.Context, db storage.Database, info types.RoomInfo, eventNID types.EventNID) ([]types.StateEntry, error) {
|
func StateBeforeEvent(ctx context.Context, db storage.Database, info types.RoomInfo, eventNID types.EventNID) ([]types.StateEntry, error) {
|
||||||
roomState := state.NewStateResolution(db, info)
|
roomState := state.NewStateResolution(db, info, nil)
|
||||||
// Lookup the event NID
|
// Lookup the event NID
|
||||||
eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
|
eIDs, err := db.EventIDs(ctx, []types.EventNID{eventNID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -217,7 +217,7 @@ func LoadStateEvents(
|
||||||
func CheckServerAllowedToSeeEvent(
|
func CheckServerAllowedToSeeEvent(
|
||||||
ctx context.Context, db storage.Database, info types.RoomInfo, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool,
|
ctx context.Context, db storage.Database, info types.RoomInfo, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
roomState := state.NewStateResolution(db, info)
|
roomState := state.NewStateResolution(db, info, nil)
|
||||||
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
|
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
|
@ -379,7 +379,7 @@ func QueryLatestEventsAndState(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
roomState := state.NewStateResolution(db, *roomInfo)
|
roomState := state.NewStateResolution(db, *roomInfo, nil)
|
||||||
response.RoomExists = true
|
response.RoomExists = true
|
||||||
response.RoomVersion = roomInfo.RoomVersion
|
response.RoomVersion = roomInfo.RoomVersion
|
||||||
|
|
||||||
|
|
|
@ -249,7 +249,7 @@ func (r *Inputer) calculateAndSetState(
|
||||||
isRejected bool,
|
isRejected bool,
|
||||||
) error {
|
) error {
|
||||||
var err error
|
var err error
|
||||||
roomState := state.NewStateResolution(r.DB, roomInfo)
|
roomState := state.NewStateResolution(r.DB, roomInfo, nil)
|
||||||
|
|
||||||
if input.HasState && !isRejected {
|
if input.HasState && !isRejected {
|
||||||
// Check here if we think we're in the room already.
|
// Check here if we think we're in the room already.
|
||||||
|
@ -271,7 +271,7 @@ func (r *Inputer) calculateAndSetState(
|
||||||
}
|
}
|
||||||
entries = types.DeduplicateStateEntries(entries)
|
entries = types.DeduplicateStateEntries(entries)
|
||||||
|
|
||||||
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, roomInfo.RoomNID, nil, entries); err != nil {
|
if stateAtEvent.BeforeStateSnapshotNID, err = r.DB.AddState(ctx, nil, roomInfo.RoomNID, nil, entries); err != nil {
|
||||||
return fmt.Errorf("r.DB.AddState: %w", err)
|
return fmt.Errorf("r.DB.AddState: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -199,7 +199,7 @@ func (u *latestEventsUpdater) doUpdateLatestEvents() error {
|
||||||
|
|
||||||
func (u *latestEventsUpdater) latestState() error {
|
func (u *latestEventsUpdater) latestState() error {
|
||||||
var err error
|
var err error
|
||||||
roomState := state.NewStateResolution(u.api.DB, *u.roomInfo)
|
roomState := state.NewStateResolution(u.api.DB, *u.roomInfo, u.updater.Txn)
|
||||||
|
|
||||||
// Work out if the state at the extremities has actually changed
|
// Work out if the state at the extremities has actually changed
|
||||||
// or not. If they haven't then we won't bother doing all of the
|
// or not. If they haven't then we won't bother doing all of the
|
||||||
|
|
|
@ -146,7 +146,7 @@ func (r *Backfiller) backfillViaFederation(ctx context.Context, req *api.Perform
|
||||||
}
|
}
|
||||||
|
|
||||||
var beforeStateSnapshotNID types.StateSnapshotNID
|
var beforeStateSnapshotNID types.StateSnapshotNID
|
||||||
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, roomNID, nil, entries); err != nil {
|
if beforeStateSnapshotNID, err = r.DB.AddState(ctx, nil, roomNID, nil, entries); err != nil {
|
||||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist state entries to get snapshot nid")
|
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("backfillViaFederation: failed to persist state entries to get snapshot nid")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ func (r *InboundPeeker) PerformInboundPeek(
|
||||||
response.LatestEvent = sortedLatestEvents[0].Headered(info.RoomVersion)
|
response.LatestEvent = sortedLatestEvents[0].Headered(info.RoomVersion)
|
||||||
|
|
||||||
// XXX: do we actually need to do a state resolution here?
|
// XXX: do we actually need to do a state resolution here?
|
||||||
roomState := state.NewStateResolution(r.DB, *info)
|
roomState := state.NewStateResolution(r.DB, *info, nil)
|
||||||
|
|
||||||
var stateEntries []types.StateEntry
|
var stateEntries []types.StateEntry
|
||||||
stateEntries, err = roomState.LoadStateAtSnapshot(
|
stateEntries, err = roomState.LoadStateAtSnapshot(
|
||||||
|
|
|
@ -231,7 +231,7 @@ func buildInviteStrippedState(
|
||||||
StateKey: "",
|
StateKey: "",
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
roomState := state.NewStateResolution(db, *info)
|
roomState := state.NewStateResolution(db, *info, nil)
|
||||||
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
|
stateEntries, err := roomState.LoadStateAtSnapshotForStringTuples(
|
||||||
ctx, info.StateSnapshotNID, stateWanted,
|
ctx, info.StateSnapshotNID, stateWanted,
|
||||||
)
|
)
|
||||||
|
|
|
@ -62,7 +62,7 @@ func (r *Queryer) QueryStateAfterEvents(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
roomState := state.NewStateResolution(r.DB, *info)
|
roomState := state.NewStateResolution(r.DB, *info, nil)
|
||||||
response.RoomExists = true
|
response.RoomExists = true
|
||||||
response.RoomVersion = info.RoomVersion
|
response.RoomVersion = info.RoomVersion
|
||||||
|
|
||||||
|
@ -507,7 +507,7 @@ func (r *Queryer) QueryStateAndAuthChain(
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]*gomatrixserverlib.Event, error) {
|
func (r *Queryer) loadStateAtEventIDs(ctx context.Context, roomInfo types.RoomInfo, eventIDs []string) ([]*gomatrixserverlib.Event, error) {
|
||||||
roomState := state.NewStateResolution(r.DB, roomInfo)
|
roomState := state.NewStateResolution(r.DB, roomInfo, nil)
|
||||||
prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
|
prevStates, err := r.DB.StateAtEventIDs(ctx, eventIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
|
|
|
@ -18,6 +18,7 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
|
@ -34,13 +35,15 @@ type StateResolution struct {
|
||||||
db storage.Database
|
db storage.Database
|
||||||
roomInfo types.RoomInfo
|
roomInfo types.RoomInfo
|
||||||
events map[types.EventNID]*gomatrixserverlib.Event
|
events map[types.EventNID]*gomatrixserverlib.Event
|
||||||
|
txn *sql.Tx // optional
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStateResolution(db storage.Database, roomInfo types.RoomInfo) StateResolution {
|
func NewStateResolution(db storage.Database, roomInfo types.RoomInfo, tx *sql.Tx) StateResolution {
|
||||||
return StateResolution{
|
return StateResolution{
|
||||||
db: db,
|
db: db,
|
||||||
roomInfo: roomInfo,
|
roomInfo: roomInfo,
|
||||||
events: make(map[types.EventNID]*gomatrixserverlib.Event),
|
events: make(map[types.EventNID]*gomatrixserverlib.Event),
|
||||||
|
txn: tx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -549,7 +552,7 @@ func (v *StateResolution) CalculateAndStoreStateAfterEvents(
|
||||||
// 2) There weren't any prev_events for this event so the state is
|
// 2) There weren't any prev_events for this event so the state is
|
||||||
// empty.
|
// empty.
|
||||||
metrics.algorithm = "empty_state"
|
metrics.algorithm = "empty_state"
|
||||||
stateNID, err := v.db.AddState(ctx, v.roomInfo.RoomNID, nil, nil)
|
stateNID, err := v.db.AddState(ctx, v.txn, v.roomInfo.RoomNID, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("v.db.AddState: %w", err)
|
err = fmt.Errorf("v.db.AddState: %w", err)
|
||||||
}
|
}
|
||||||
|
@ -581,7 +584,7 @@ func (v *StateResolution) CalculateAndStoreStateAfterEvents(
|
||||||
// add the state event as a block of size one to the end of the blocks.
|
// add the state event as a block of size one to the end of the blocks.
|
||||||
metrics.algorithm = "single_delta"
|
metrics.algorithm = "single_delta"
|
||||||
stateNID, err := v.db.AddState(
|
stateNID, err := v.db.AddState(
|
||||||
ctx, v.roomInfo.RoomNID, stateBlockNIDs, []types.StateEntry{prevState.StateEntry},
|
ctx, v.txn, v.roomInfo.RoomNID, stateBlockNIDs, []types.StateEntry{prevState.StateEntry},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("v.db.AddState: %w", err)
|
err = fmt.Errorf("v.db.AddState: %w", err)
|
||||||
|
@ -626,7 +629,7 @@ func (v *StateResolution) calculateAndStoreStateAfterManyEvents(
|
||||||
// previous state.
|
// previous state.
|
||||||
metrics.conflictLength = conflictLength
|
metrics.conflictLength = conflictLength
|
||||||
metrics.fullStateLength = len(state)
|
metrics.fullStateLength = len(state)
|
||||||
return metrics.stop(v.db.AddState(ctx, roomNID, nil, state))
|
return metrics.stop(v.db.AddState(ctx, v.txn, roomNID, nil, state))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (v *StateResolution) calculateStateAfterManyEvents(
|
func (v *StateResolution) calculateStateAfterManyEvents(
|
||||||
|
|
|
@ -16,6 +16,7 @@ package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/roomserver/api"
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
"github.com/matrix-org/dendrite/roomserver/storage/shared"
|
||||||
|
@ -32,6 +33,7 @@ type Database interface {
|
||||||
// Store the room state at an event in the database
|
// Store the room state at an event in the database
|
||||||
AddState(
|
AddState(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
roomNID types.RoomNID,
|
roomNID types.RoomNID,
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
stateBlockNIDs []types.StateBlockNID,
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
|
|
|
@ -16,6 +16,7 @@ type LatestEventsUpdater struct {
|
||||||
latestEvents []types.StateAtEventAndReference
|
latestEvents []types.StateAtEventAndReference
|
||||||
lastEventIDSent string
|
lastEventIDSent string
|
||||||
currentStateSnapshotNID types.StateSnapshotNID
|
currentStateSnapshotNID types.StateSnapshotNID
|
||||||
|
Txn *sql.Tx
|
||||||
}
|
}
|
||||||
|
|
||||||
func rollback(txn *sql.Tx) {
|
func rollback(txn *sql.Tx) {
|
||||||
|
@ -46,7 +47,7 @@ func NewLatestEventsUpdater(ctx context.Context, d *Database, txn *sql.Tx, roomI
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return &LatestEventsUpdater{
|
return &LatestEventsUpdater{
|
||||||
transaction{ctx, txn}, d, roomInfo, stateAndRefs, lastEventIDSent, currentStateSnapshotNID,
|
transaction{ctx, txn}, d, roomInfo, stateAndRefs, lastEventIDSent, currentStateSnapshotNID, txn,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -137,11 +137,12 @@ func (d *Database) RoomInfo(ctx context.Context, roomID string) (*types.RoomInfo
|
||||||
|
|
||||||
func (d *Database) AddState(
|
func (d *Database) AddState(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
roomNID types.RoomNID,
|
roomNID types.RoomNID,
|
||||||
stateBlockNIDs []types.StateBlockNID,
|
stateBlockNIDs []types.StateBlockNID,
|
||||||
state []types.StateEntry,
|
state []types.StateEntry,
|
||||||
) (stateNID types.StateSnapshotNID, err error) {
|
) (stateNID types.StateSnapshotNID, err error) {
|
||||||
err = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
|
||||||
if len(state) > 0 {
|
if len(state) > 0 {
|
||||||
var stateBlockNID types.StateBlockNID
|
var stateBlockNID types.StateBlockNID
|
||||||
stateBlockNID, err = d.StateBlockTable.BulkInsertStateData(ctx, txn, state)
|
stateBlockNID, err = d.StateBlockTable.BulkInsertStateData(ctx, txn, state)
|
||||||
|
|
Loading…
Reference in a new issue