Handle event_format federation in /sync responses (#3192)

This commit is contained in:
devonh 2023-08-31 15:33:38 +00:00 committed by GitHub
parent 11fd2f019b
commit bb2ab62cbf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 739 additions and 84 deletions

View file

@ -63,6 +63,11 @@ func (p *InviteStreamProvider) IncrementalSync(
return from
}
eventFormat := synctypes.FormatSync
if req.Filter.EventFormat == synctypes.EventFormatFederation {
eventFormat = synctypes.FormatSyncFederation
}
for roomID, inviteEvent := range invites {
user := spec.UserID{}
validRoomID, err := spec.NewRoomID(inviteEvent.RoomID())
@ -87,7 +92,7 @@ func (p *InviteStreamProvider) IncrementalSync(
if _, ok := req.IgnoredUsers.List[user.String()]; ok {
continue
}
ir := types.NewInviteResponse(inviteEvent, user, sk)
ir := types.NewInviteResponse(inviteEvent, user, sk, eventFormat)
req.Response.Rooms.Invite[roomID] = ir
}

View file

@ -88,6 +88,11 @@ func (p *PDUStreamProvider) CompleteSync(
req.Log.WithError(err).Error("unable to update event filter with ignored users")
}
eventFormat := synctypes.FormatSync
if req.Filter.EventFormat == synctypes.EventFormatFederation {
eventFormat = synctypes.FormatSyncFederation
}
recentEvents, err := snapshot.RecentEvents(ctx, joinedRoomIDs, r, &eventFilter, true, true)
if err != nil {
return from
@ -105,7 +110,7 @@ func (p *PDUStreamProvider) CompleteSync(
// get the join response for each room
jr, jerr := p.getJoinResponseForCompleteSync(
ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, false,
events.Events, events.Limited,
events.Events, events.Limited, eventFormat,
)
if jerr != nil {
req.Log.WithError(jerr).Error("p.getJoinResponseForCompleteSync failed")
@ -142,7 +147,7 @@ func (p *PDUStreamProvider) CompleteSync(
events := recentEvents[roomID]
jr, err = p.getJoinResponseForCompleteSync(
ctx, snapshot, roomID, &stateFilter, req.WantFullState, req.Device, true,
events.Events, events.Limited,
events.Events, events.Limited, eventFormat,
)
if err != nil {
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
@ -346,6 +351,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return r.From, fmt.Errorf("p.DB.GetBackwardTopologyPos: %w", err)
}
eventFormat := synctypes.FormatSync
if req.Filter.EventFormat == synctypes.EventFormatFederation {
eventFormat = synctypes.FormatSyncFederation
}
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
@ -359,7 +369,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
continue
}
var newEvent gomatrixserverlib.PDU
newEvent, err = p.updatePowerLevelEvent(ctx, ev)
newEvent, err = p.updatePowerLevelEvent(ctx, ev, eventFormat)
if err != nil {
return r.From, err
}
@ -383,7 +393,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// update the powerlevel event for state events
if ev.Version() == gomatrixserverlib.RoomVersionPseudoIDs && ev.Type() == spec.MRoomPowerLevels && ev.StateKeyEquals("") {
var newEvent gomatrixserverlib.PDU
newEvent, err = p.updatePowerLevelEvent(ctx, he)
newEvent, err = p.updatePowerLevelEvent(ctx, he, eventFormat)
if err != nil {
return r.From, err
}
@ -413,13 +423,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Join[delta.RoomID] = jr
@ -428,11 +438,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
jr.Timeline.Limited = limited
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Peek[delta.RoomID] = jr
@ -443,13 +453,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case spec.Ban:
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
req.Response.Rooms.Leave[delta.RoomID] = lr
@ -458,7 +468,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}
func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent) (gomatrixserverlib.PDU, error) {
func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstypes.HeaderedEvent, eventFormat synctypes.ClientEventFormat) (gomatrixserverlib.PDU, error) {
pls, err := gomatrixserverlib.NewPowerLevelContentFromEvent(ev)
if err != nil {
return nil, err
@ -467,11 +477,14 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp
var userID *spec.UserID
for user, level := range pls.Users {
validRoomID, _ := spec.NewRoomID(ev.RoomID())
userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
if err != nil {
return nil, err
if eventFormat != synctypes.FormatSyncFederation {
userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
if err != nil {
return nil, err
}
user = userID.String()
}
newPls[userID.String()] = level
newPls[user] = level
}
var newPlBytes, newEv []byte
newPlBytes, err = json.Marshal(newPls)
@ -487,7 +500,7 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp
prevContent := gjson.GetBytes(ev.JSON(), "unsigned.prev_content")
if !prevContent.Exists() {
var evNew gomatrixserverlib.PDU
evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSON(newEv, false)
evNew, err = gomatrixserverlib.MustGetRoomVersion(ev.Version()).NewEventFromTrustedJSON(newEv, false)
if err != nil {
return nil, err
}
@ -503,11 +516,14 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp
newPls = make(map[string]int64)
for user, level := range pls.Users {
validRoomID, _ := spec.NewRoomID(ev.RoomID())
userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
if err != nil {
return nil, err
if eventFormat != synctypes.FormatSyncFederation {
userID, err = p.rsAPI.QueryUserIDForSender(ctx, *validRoomID, spec.SenderID(user))
if err != nil {
return nil, err
}
user = userID.String()
}
newPls[userID.String()] = level
newPls[user] = level
}
newPlBytes, err = json.Marshal(newPls)
if err != nil {
@ -519,7 +535,7 @@ func (p *PDUStreamProvider) updatePowerLevelEvent(ctx context.Context, ev *rstyp
}
var evNew gomatrixserverlib.PDU
evNew, err = gomatrixserverlib.MustGetRoomVersion(gomatrixserverlib.RoomVersionPseudoIDs).NewEventFromTrustedJSONWithEventID(ev.EventID(), newEv, false)
evNew, err = gomatrixserverlib.MustGetRoomVersion(ev.Version()).NewEventFromTrustedJSONWithEventID(ev.EventID(), newEv, false)
if err != nil {
return nil, err
}
@ -592,6 +608,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
isPeek bool,
recentStreamEvents []types.StreamEvent,
limited bool,
eventFormat synctypes.ClientEventFormat,
) (jr *types.JoinResponse, err error) {
jr = types.NewJoinResponse()
// TODO: When filters are added, we may need to call this multiple times to get enough events.
@ -683,7 +700,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
continue
}
newEvent, err := p.updatePowerLevelEvent(ctx, ev)
newEvent, err := p.updatePowerLevelEvent(ctx, ev, eventFormat)
if err != nil {
return nil, err
}
@ -697,7 +714,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
if ev.Type() != spec.MRoomPowerLevels || !ev.StateKeyEquals("") {
continue
}
newEvent, err := p.updatePowerLevelEvent(ctx, ev)
newEvent, err := p.updatePowerLevelEvent(ctx, ev, eventFormat)
if err != nil {
return nil, err
}
@ -705,13 +722,13 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
return p.rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
})
return jr, nil