Implement gomatrixserverlib.HeaderedEvent in roomserver Kafka output (#914)

* Use Event.Headered

* Use HeaderedEvent in roomserver kafka output

* Fix syncserver-integration-tests

* Update producers to roomserver inputs

* Update gomatrixserverlib

* Update gomatrixserverlib

* Update gomatrixserverlib

* Update gomatrixserverlib

* Update gomatrixserverlib

* Update gomatrixserverlib
This commit is contained in:
Neil Alexander 2020-03-17 11:01:25 +00:00 committed by GitHub
parent 9f74a8798e
commit aebf347a79
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 289 additions and 71 deletions

View file

@ -250,10 +250,13 @@ func (r *RoomserverAliasAPI) sendUpdatedAliasesEvent(
return err
}
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
// Create the request
ire := roomserverAPI.InputRoomEvent{
Kind: roomserverAPI.KindNew,
Event: event,
Event: event.Headered(roomVersion),
AuthEventIDs: event.AuthEventIDs(),
SendAsServer: serverName,
}

View file

@ -51,7 +51,7 @@ type InputRoomEvent struct {
// This controls how the event is processed.
Kind int `json:"kind"`
// The event JSON for the event to add.
Event gomatrixserverlib.Event `json:"event"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
// List of state event IDs that authenticate this event.
// These are likely derived from the "auth_events" JSON key of the event.
// But can be different because the "auth_events" key can be incomplete or wrong.
@ -85,7 +85,7 @@ type TransactionID struct {
// the usual context a matrix room event would have. We usually do not have
// access to the events needed to check the event auth rules for the invite.
type InputInviteEvent struct {
Event gomatrixserverlib.Event `json:"event"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
}
// InputRoomEventsRequest is a request to InputRoomEvents

View file

@ -54,7 +54,7 @@ type OutputEvent struct {
// prev_events.
type OutputNewRoomEvent struct {
// The Event.
Event gomatrixserverlib.Event `json:"event"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
// The latest events in the room after this event.
// This can be used to set the prev events for new events in the room.
// This also can be used to get the full current state after this event.
@ -117,7 +117,7 @@ type OutputNewRoomEvent struct {
// tracked separately from the room events themselves.
type OutputNewInviteEvent struct {
// The "m.room.member" invite event.
Event gomatrixserverlib.Event `json:"event"`
Event gomatrixserverlib.HeaderedEvent `json:"event"`
}
// An OutputRetireInviteEvent is written whenever an existing invite is no longer

View file

@ -95,7 +95,7 @@ func processRoomEvent(
event := input.Event
// Check that the event passes authentication checks and work out the numeric IDs for the auth events.
authEventNIDs, err := checkAuthEvents(ctx, db, event, input.AuthEventIDs)
authEventNIDs, err := checkAuthEvents(ctx, db, event.Event, input.AuthEventIDs)
if err != nil {
return
}
@ -112,7 +112,7 @@ func processRoomEvent(
}
// Store the event
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event, input.TransactionID, authEventNIDs)
roomNID, stateAtEvent, err := db.StoreEvent(ctx, event.Event, input.TransactionID, authEventNIDs)
if err != nil {
return
}
@ -127,7 +127,7 @@ func processRoomEvent(
if stateAtEvent.BeforeStateSnapshotNID == 0 {
// We haven't calculated a state for this event yet.
// Lets calculate one.
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event)
err = calculateAndSetState(ctx, db, input, roomNID, &stateAtEvent, event.Event)
if err != nil {
return
}
@ -140,7 +140,7 @@ func processRoomEvent(
// Update the extremities of the event graph for the room
return event.EventID(), updateLatestEvents(
ctx, db, ow, roomNID, stateAtEvent, event, input.SendAsServer, input.TransactionID,
ctx, db, ow, roomNID, stateAtEvent, event.Event, input.SendAsServer, input.TransactionID,
)
}
@ -234,7 +234,7 @@ func processInviteEvent(
return nil
}
outputUpdates, err := updateToInviteMembership(updater, &input.Event, nil)
outputUpdates, err := updateToInviteMembership(updater, &input.Event.Event, nil)
if err != nil {
return err
}

View file

@ -253,8 +253,11 @@ func (u *latestEventsUpdater) makeOutputNewRoomEvent() (*api.OutputEvent, error)
latestEventIDs[i] = u.latest[i].EventID
}
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
ore := api.OutputNewRoomEvent{
Event: u.event,
Event: u.event.Headered(roomVersion),
LastSentEventID: u.lastEventIDSent,
LatestEventIDs: latestEventIDs,
TransactionID: u.transactionID,

View file

@ -136,13 +136,14 @@ func updateToInviteMembership(
return nil, err
}
if needsSending {
roomVersion := gomatrixserverlib.RoomVersionV1
// We notify the consumers using a special event even though we will
// notify them about the change in current state as part of the normal
// room event stream. This ensures that the consumers only have to
// consider a single stream of events when determining whether a user
// is invited, rather than having to combine multiple streams themselves.
onie := api.OutputNewInviteEvent{
Event: *add,
Event: (*add).Headered(roomVersion),
}
updates = append(updates, api.OutputEvent{
Type: api.OutputTypeNewInviteEvent,

View file

@ -147,12 +147,7 @@ func (r *RoomserverQueryAPI) QueryLatestEventsAndState(
}
for _, event := range stateEvents {
response.StateEvents = append(response.StateEvents, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
}
return nil
@ -209,12 +204,7 @@ func (r *RoomserverQueryAPI) QueryStateAfterEvents(
}
for _, event := range stateEvents {
response.StateEvents = append(response.StateEvents, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
}
return nil
@ -247,12 +237,7 @@ func (r *RoomserverQueryAPI) QueryEventsByID(
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
response.Events = append(response.Events, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.Events = append(response.Events, event.Headered(roomVersion))
}
return nil
@ -534,12 +519,7 @@ func (r *RoomserverQueryAPI) QueryMissingEvents(
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
response.Events = append(response.Events, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.Events = append(response.Events, event.Headered(roomVersion))
}
}
@ -585,12 +565,7 @@ func (r *RoomserverQueryAPI) QueryBackfill(
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
response.Events = append(response.Events, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.Events = append(response.Events, event.Headered(roomVersion))
}
return err
@ -711,24 +686,14 @@ func (r *RoomserverQueryAPI) QueryStateAndAuthChain(
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
response.StateEvents = append(response.StateEvents, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.StateEvents = append(response.StateEvents, event.Headered(roomVersion))
}
for _, event := range authEvents {
// TODO: Room version here
roomVersion := gomatrixserverlib.RoomVersionV1
response.AuthChainEvents = append(response.AuthChainEvents, gomatrixserverlib.HeaderedEvent{
EventHeader: gomatrixserverlib.EventHeader{
RoomVersion: roomVersion,
},
Event: event,
})
response.AuthChainEvents = append(response.AuthChainEvents, event.Headered(roomVersion))
}
return err