mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 23:48:27 +00:00
Fix sqlite locking bugs present on sytest (#1543)
* Fix sqite locking bugs present on sytest Comments do the explaining. * Fix deadlock in sqlite mode Caused by starting a writer whilst within a writer * Only complain about invalid state deltas for non-overwrite events * Do not re-process outlier unnecessarily
This commit is contained in:
parent
92982a402f
commit
eb86e2b336
4 changed files with 52 additions and 13 deletions
|
@ -17,6 +17,7 @@
|
||||||
package input
|
package input
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -26,6 +27,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/roomserver/state"
|
"github.com/matrix-org/dendrite/roomserver/state"
|
||||||
"github.com/matrix-org/dendrite/roomserver/types"
|
"github.com/matrix-org/dendrite/roomserver/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
"github.com/matrix-org/util"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,6 +46,28 @@ func (r *Inputer) processRoomEvent(
|
||||||
headered := input.Event
|
headered := input.Event
|
||||||
event := headered.Unwrap()
|
event := headered.Unwrap()
|
||||||
|
|
||||||
|
// if we have already got this event then do not process it again, if the input kind is an outlier.
|
||||||
|
// Outliers contain no extra information which may warrant a re-processing.
|
||||||
|
if input.Kind == api.KindOutlier {
|
||||||
|
evs, err := r.DB.EventsFromIDs(ctx, []string{event.EventID()})
|
||||||
|
if err == nil && len(evs) == 1 {
|
||||||
|
// check hash matches if we're on early room versions where the event ID was a random string
|
||||||
|
idFormat, err := headered.RoomVersion.EventIDFormat()
|
||||||
|
if err == nil {
|
||||||
|
switch idFormat {
|
||||||
|
case gomatrixserverlib.EventIDFormatV1:
|
||||||
|
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
||||||
|
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
|
||||||
|
return event.EventID(), nil
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
util.GetLogger(ctx).WithField("event_id", event.EventID()).Infof("Already processed event; ignoring")
|
||||||
|
return event.EventID(), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Check that the event passes authentication checks and work out
|
// Check that the event passes authentication checks and work out
|
||||||
// the numeric IDs for the auth events.
|
// the numeric IDs for the auth events.
|
||||||
isRejected := false
|
isRejected := false
|
||||||
|
|
|
@ -233,7 +233,7 @@ func (u *latestEventsUpdater) latestState() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
return fmt.Errorf("roomState.DifferenceBetweenStateSnapshots: %w", err)
|
||||||
}
|
}
|
||||||
if len(u.removed) > len(u.added) {
|
if !u.stateAtEvent.Overwrite && len(u.removed) > len(u.added) {
|
||||||
// This really shouldn't happen.
|
// This really shouldn't happen.
|
||||||
// TODO: What is ultimately the best way to handle this situation?
|
// TODO: What is ultimately the best way to handle this situation?
|
||||||
logrus.Errorf(
|
logrus.Errorf(
|
||||||
|
|
|
@ -70,16 +70,14 @@ func (u *LatestEventsUpdater) CurrentStateSnapshotNID() types.StateSnapshotNID {
|
||||||
return u.currentStateSnapshotNID
|
return u.currentStateSnapshotNID
|
||||||
}
|
}
|
||||||
|
|
||||||
// StorePreviousEvents implements types.RoomRecentEventsUpdater
|
// StorePreviousEvents implements types.RoomRecentEventsUpdater - This must be called from a Writer
|
||||||
func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
|
func (u *LatestEventsUpdater) StorePreviousEvents(eventNID types.EventNID, previousEventReferences []gomatrixserverlib.EventReference) error {
|
||||||
return u.d.Writer.Do(u.d.DB, u.txn, func(txn *sql.Tx) error {
|
|
||||||
for _, ref := range previousEventReferences {
|
for _, ref := range previousEventReferences {
|
||||||
if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
|
if err := u.d.PrevEventsTable.InsertPreviousEvent(u.ctx, u.txn, ref.EventID, ref.EventSHA256, eventNID); err != nil {
|
||||||
return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
|
return fmt.Errorf("u.d.PrevEventsTable.InsertPreviousEvent: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsReferenced implements types.RoomRecentEventsUpdater
|
// IsReferenced implements types.RoomRecentEventsUpdater
|
||||||
|
|
|
@ -492,15 +492,32 @@ func (d *Database) StoreEvent(
|
||||||
if roomInfo == nil && len(prevEvents) > 0 {
|
if roomInfo == nil && len(prevEvents) > 0 {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
|
||||||
}
|
}
|
||||||
|
// Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of
|
||||||
|
// GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This
|
||||||
|
// function only does SELECTs though so the created txn (at this point) is just a read txn like
|
||||||
|
// any other so this is fine. If we ever update GetLatestEventsForUpdate or NewLatestEventsUpdater
|
||||||
|
// to do writes however then this will need to go inside `Writer.Do`.
|
||||||
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
|
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
|
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
|
||||||
}
|
}
|
||||||
|
// Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
|
||||||
|
// and EndTransaction in a writer then it's possible for a new write txn to be made between the two
|
||||||
|
// function calls which will then fail with 'database is locked'. This new write txn would HAVE to be
|
||||||
|
// something like SetRoomAlias/RemoveRoomAlias as normal input events are already done sequentially due to
|
||||||
|
// SupportsConcurrentRoomInputs() == false on sqlite, though this does not apply to setting room aliases
|
||||||
|
// as they don't go via InputRoomEvents
|
||||||
|
err = d.Writer.Do(d.DB, updater.txn, func(txn *sql.Tx) error {
|
||||||
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
|
if err = updater.StorePreviousEvents(eventNID, prevEvents); err != nil {
|
||||||
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
return fmt.Errorf("updater.StorePreviousEvents: %w", err)
|
||||||
}
|
}
|
||||||
succeeded := true
|
succeeded := true
|
||||||
err = sqlutil.EndTransaction(updater, &succeeded)
|
err = sqlutil.EndTransaction(updater, &succeeded)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return 0, types.StateAtEvent{}, nil, "", err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return roomNID, types.StateAtEvent{
|
return roomNID, types.StateAtEvent{
|
||||||
|
|
Loading…
Reference in a new issue