Emit redacted_event from the roomserver when redactions are validated (#1186)

* Emit redacted_event from the roomserver when redactions are validated

- Consume them in the currentstateserver and act accordingly.
- Add integration test for the roomserver to check that injecting
  `m.room.redaction` events result in `redacted_event` being emitted.

* Linting

* Ignore events that redact themselves
This commit is contained in:
Kegsay 2020-07-07 12:51:55 +01:00 committed by GitHub
parent d7a8bbff72
commit 99ea1f9b48
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 406 additions and 107 deletions

View file

@ -19,6 +19,7 @@ package internal
import (
"context"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/types"
@ -31,6 +32,8 @@ import (
// TODO(#375): This should be rewritten to allow concurrent calls. The
// difficulty is in ensuring that we correctly annotate events with the correct
// state deltas when sending to kafka streams
// TODO: Break up function - we should probably do transaction ID checks before calling this.
// nolint:gocyclo
func (r *RoomserverInternalAPI) processRoomEvent(
ctx context.Context,
input api.InputRoomEvent,
@ -60,10 +63,18 @@ func (r *RoomserverInternalAPI) processRoomEvent(
}
// Store the event.
roomNID, stateAtEvent, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
roomNID, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
if err != nil {
return
}
// if storing this event results in it being redacted then do so.
if redactedEventID == event.EventID() {
r, rerr := eventutil.RedactEvent(redactionEvent, &event)
if rerr != nil {
return "", rerr
}
event = *r
}
// For outliers we can stop after we've stored the event itself as it
// doesn't have any associated state to store and we don't need to
@ -97,6 +108,25 @@ func (r *RoomserverInternalAPI) processRoomEvent(
return
}
// processing this event resulted in an event (which may not be the one we're processing)
// being redacted. We are guaranteed to have both sides (the redaction/redacted event),
// so notify downstream components to redact this event - they should have it if they've
// been tracking our output log.
if redactedEventID != "" {
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
{
Type: api.OutputTypeRedactedEvent,
RedactedEvent: &api.OutputRedactedEvent{
RedactedEventID: redactedEventID,
RedactedBecause: redactionEvent.Headered(headered.RoomVersion),
},
},
})
if err != nil {
return
}
}
// Update the extremities of the event graph for the room
return event.EventID(), nil
}

View file

@ -880,11 +880,24 @@ func persistEvents(ctx context.Context, db storage.Database, events []gomatrixse
i++
}
var stateAtEvent types.StateAtEvent
roomNID, stateAtEvent, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
var redactedEventID string
var redactionEvent *gomatrixserverlib.Event
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), nil, authNids)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
continue
}
// If storing this event results in it being redacted, then do so.
// It's also possible for this event to be a redaction which results in another event being
// redacted, which we don't care about since we aren't returning it in this backfill.
if redactedEventID == ev.EventID() {
ev = ev.Redact().Headered(ev.RoomVersion)
err = ev.SetUnsignedField("redacted_because", redactionEvent)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to set unsigned field")
continue
}
}
backfilledEventMap[ev.EventID()] = types.Event{
EventNID: stateAtEvent.StateEntry.EventNID,
Event: ev.Unwrap(),