Finish implementing redactions (#1189)

* Add a bit more logging to the fedsender

* bugfix: continue sending PDUs if ones are added whilst sending another PDU

Without this, the queue goes back to sleep on `<-oq.notifyPDUs` which won't
fire because `pendingPDUs` is already > 0. This should fix a flakey sytest.

* Break if no txn is sent

* WIP syncapi work

* More debugging

* Bump GMSL version to pull in working Event.Redact

* Remove logging

* Make redactions work on v3+

* Fix more tests
This commit is contained in:
Kegsay 2020-07-08 17:45:39 +01:00 committed by GitHub
parent a5a51b4141
commit d9648b0615
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 131 additions and 25 deletions

View file

@ -136,4 +136,6 @@ type Database interface {
// Returns the filterID as a string. Otherwise returns an error if something
// goes wrong.
PutFilter(ctx context.Context, localpart string, filter *gomatrixserverlib.Filter) (string, error)
// RedactEvent wipes an event in the database and sets the unsigned.redacted_because key to the redaction event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error
}

View file

@ -99,6 +99,9 @@ const selectEarlyEventsSQL = "" +
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
@ -120,6 +123,7 @@ type outputRoomEventsStatements struct {
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
}
func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
@ -149,9 +153,21 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID())
return err
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.

View file

@ -24,6 +24,7 @@ import (
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
@ -597,6 +598,26 @@ func (d *Database) IncrementalSync(
return res, nil
}
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause *gomatrixserverlib.HeaderedEvent) error {
redactedEvents, err := d.Events(ctx, []string{redactedEventID})
if err != nil {
return err
}
if len(redactedEvents) == 0 {
logrus.WithField("event_id", redactedEventID).WithField("redaction_event", redactedBecause.EventID()).Warnf("missing redacted event for redaction")
return nil
}
eventToRedact := redactedEvents[0].Unwrap()
redactionEvent := redactedBecause.Unwrap()
ev, err := eventutil.RedactEvent(&redactionEvent, &eventToRedact)
if err != nil {
return err
}
newEvent := ev.Headered(redactedBecause.RoomVersion)
return d.OutputEvents.UpdateEventJSON(ctx, &newEvent)
}
// getResponseWithPDUsForCompleteSync creates a response and adds all PDUs needed
// to it. It returns toPos and joinedRoomIDs for use of adding EDUs.
// nolint:nakedret

View file

@ -76,6 +76,9 @@ const selectEarlyEventsSQL = "" +
const selectMaxEventIDSQL = "" +
"SELECT MAX(id) FROM syncapi_output_room_events"
const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
/*
$1 = oldPos,
@ -109,6 +112,7 @@ type outputRoomEventsStatements struct {
selectRecentEventsForSyncStmt *sql.Stmt
selectEarlyEventsStmt *sql.Stmt
selectStateInRangeStmt *sql.Stmt
updateEventJSONStmt *sql.Stmt
}
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
@ -140,9 +144,21 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
return nil, err
}
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
_, err = s.updateEventJSONStmt.ExecContext(ctx, headeredJSON, event.EventID())
return err
}
// selectStateInRange returns the state events between the two given PDU stream positions, exclusive of oldPos, inclusive of newPos.
// Results are bucketed based on the room ID. If the same state is overwritten multiple times between the
// two positions, only the most recent state is returned.

View file

@ -49,6 +49,7 @@ type Events interface {
// SelectEarlyEvents returns the earliest events in the given room.
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
}
// Topology keeps track of the depths and stream positions for all events.