Use PDU in more places (#3072)

This commit is contained in:
kegsay 2023-04-28 16:00:22 +01:00 committed by GitHub
parent d23d0369cc
commit 1432743d1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 49 additions and 84 deletions

View file

@ -33,6 +33,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@ -169,14 +170,14 @@ func Context(
return jsonerror.InternalServerError()
}
eventsBeforeClient := synctypes.HeaderedToClientEvents(eventsBeforeFiltered, synctypes.FormatAll)
eventsAfterClient := synctypes.HeaderedToClientEvents(eventsAfterFiltered, synctypes.FormatAll)
eventsBeforeClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBeforeFiltered), synctypes.FormatAll)
eventsAfterClient := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfterFiltered), synctypes.FormatAll)
newState := state
if filter.LazyLoadMembers {
allEvents := append(eventsBeforeFiltered, eventsAfterFiltered...)
allEvents = append(allEvents, &requestedEvent)
evs := synctypes.HeaderedToClientEvents(allEvents, synctypes.FormatAll)
evs := synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(allEvents), synctypes.FormatAll)
newState, err = applyLazyLoadMembers(ctx, device, snapshot, roomID, evs, lazyLoadCache)
if err != nil {
logrus.WithError(err).Error("unable to load membership events")
@ -184,12 +185,12 @@ func Context(
}
}
ev := synctypes.HeaderedToClientEvent(&requestedEvent, synctypes.FormatAll)
ev := synctypes.ToClientEvent(&requestedEvent, synctypes.FormatAll)
response := ContextRespsonse{
Event: &ev,
EventsAfter: eventsAfterClient,
EventsBefore: eventsBeforeClient,
State: synctypes.HeaderedToClientEvents(newState, synctypes.FormatAll),
State: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(newState), synctypes.FormatAll),
}
if len(response.State) > filter.Limit {

View file

@ -97,6 +97,6 @@ func GetEvent(
return util.JSONResponse{
Code: http.StatusOK,
JSON: synctypes.HeaderedToClientEvent(events[0], synctypes.FormatAll),
JSON: synctypes.ToClientEvent(events[0], synctypes.FormatAll),
}
}

View file

@ -25,6 +25,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/synctypes"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -134,6 +135,6 @@ func GetMemberships(
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: getMembershipResponse{synctypes.HeaderedToClientEvents(result, synctypes.FormatAll)},
JSON: getMembershipResponse{synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(result), synctypes.FormatAll)},
}
}

View file

@ -256,7 +256,7 @@ func OnIncomingMessagesRequest(
util.GetLogger(req.Context()).WithError(err).Error("failed to apply lazy loading")
return jsonerror.InternalServerError()
}
res.State = append(res.State, synctypes.HeaderedToClientEvents(membershipEvents, synctypes.FormatAll)...)
res.State = append(res.State, synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(membershipEvents), synctypes.FormatAll)...)
}
// If we didn't return any events, set the end to an empty string, so it will be omitted
@ -365,7 +365,7 @@ func (r *messagesReq) retrieveEvents() (
"events_before": len(events),
"events_after": len(filteredEvents),
}).Debug("applied history visibility (messages)")
return synctypes.HeaderedToClientEvents(filteredEvents, synctypes.FormatAll), start, end, err
return synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(filteredEvents), synctypes.FormatAll), start, end, err
}
func (r *messagesReq) getStartEnd(events []*rstypes.HeaderedEvent) (start, end types.TopologyToken, err error) {

View file

@ -22,6 +22,7 @@ import (
"time"
"github.com/blevesearch/bleve/v2/search"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
"github.com/matrix-org/util"
"github.com/sirupsen/logrus"
@ -206,12 +207,12 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
Context: SearchContextResponse{
Start: startToken.String(),
End: endToken.String(),
EventsAfter: synctypes.HeaderedToClientEvents(eventsAfter, synctypes.FormatSync),
EventsBefore: synctypes.HeaderedToClientEvents(eventsBefore, synctypes.FormatSync),
EventsAfter: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsAfter), synctypes.FormatSync),
EventsBefore: synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(eventsBefore), synctypes.FormatSync),
ProfileInfo: profileInfos,
},
Rank: eventScore[event.EventID()].Score,
Result: synctypes.HeaderedToClientEvent(event, synctypes.FormatAll),
Result: synctypes.ToClientEvent(event, synctypes.FormatAll),
})
roomGroup := groups[event.RoomID()]
roomGroup.Results = append(roomGroup.Results, event.EventID())
@ -223,7 +224,7 @@ func Search(req *http.Request, device *api.Device, syncDB storage.Database, fts
logrus.WithError(err).Error("unable to get current state")
return jsonerror.InternalServerError()
}
stateForRooms[event.RoomID()] = synctypes.HeaderedToClientEvents(state, synctypes.FormatSync)
stateForRooms[event.RoomID()] = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(state), synctypes.FormatSync)
}
}

View file

@ -274,20 +274,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
recentStreamEvents := dbEvents[delta.RoomID].Events
limited := dbEvents[delta.RoomID].Limited
hisVisMap := map[string]gomatrixserverlib.HistoryVisibility{}
for _, re := range recentStreamEvents {
hisVisMap[re.EventID()] = re.Visibility
}
recEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
gomatrixserverlib.ToPDUs(toEvents(snapshot.StreamEventsToEvents(device, recentStreamEvents))),
recEvents := gomatrixserverlib.ReverseTopologicalOrdering(
gomatrixserverlib.ToPDUs(snapshot.StreamEventsToEvents(device, recentStreamEvents)),
gomatrixserverlib.TopologicalOrderByPrevEvents,
)
recentEvents := make([]*rstypes.HeaderedEvent, len(recEvents))
for i := range recEvents {
recentEvents[i] = &rstypes.HeaderedEvent{
Event: recEvents[i].(*gomatrixserverlib.Event),
Visibility: hisVisMap[recEvents[i].EventID()],
}
recentEvents[i] = recEvents[i].(*rstypes.HeaderedEvent)
}
// If we didn't return any events at all then don't bother doing anything else.
@ -353,20 +346,13 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
// Now that we've filtered the timeline, work out which state events are still
// left. Anything that appears in the filtered timeline will be removed from the
// "state" section and kept in "timeline".
hisVisMap = map[string]gomatrixserverlib.HistoryVisibility{}
for _, re := range delta.StateEvents {
hisVisMap[re.EventID()] = re.Visibility
}
sEvents := gomatrixserverlib.HeaderedReverseTopologicalOrdering(
gomatrixserverlib.ToPDUs(toEvents(removeDuplicates(delta.StateEvents, events))),
gomatrixserverlib.ToPDUs(removeDuplicates(delta.StateEvents, events)),
gomatrixserverlib.TopologicalOrderByAuthEvents,
)
delta.StateEvents = make([]*rstypes.HeaderedEvent, len(sEvents))
for i := range sEvents {
delta.StateEvents[i] = &rstypes.HeaderedEvent{
Event: sEvents[i].(*gomatrixserverlib.Event),
Visibility: hisVisMap[sEvents[i].EventID()],
}
delta.StateEvents[i] = sEvents[i].(*rstypes.HeaderedEvent)
}
if len(delta.StateEvents) > 0 {
@ -390,20 +376,20 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = synctypes.HeaderedToClientEvents(events, synctypes.FormatSync)
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync)
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = (limited && len(events) == len(recentEvents)) || delta.NewlyJoined
jr.State.Events = synctypes.HeaderedToClientEvents(delta.StateEvents, synctypes.FormatSync)
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync)
req.Response.Rooms.Join[delta.RoomID] = jr
case spec.Peek:
jr := types.NewJoinResponse()
jr.Timeline.PrevBatch = &prevBatch
// TODO: Apply history visibility on peeked rooms
jr.Timeline.Events = synctypes.HeaderedToClientEvents(recentEvents, synctypes.FormatSync)
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(recentEvents), synctypes.FormatSync)
jr.Timeline.Limited = limited
jr.State.Events = synctypes.HeaderedToClientEvents(delta.StateEvents, synctypes.FormatSync)
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync)
req.Response.Rooms.Peek[delta.RoomID] = jr
case spec.Leave:
@ -412,11 +398,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
case spec.Ban:
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = synctypes.HeaderedToClientEvents(events, synctypes.FormatSync)
lr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync)
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
lr.Timeline.Limited = limited && len(events) == len(recentEvents)
lr.State.Events = synctypes.HeaderedToClientEvents(delta.StateEvents, synctypes.FormatSync)
lr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(delta.StateEvents), synctypes.FormatSync)
req.Response.Rooms.Leave[delta.RoomID] = lr
}
@ -566,11 +552,11 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = synctypes.HeaderedToClientEvents(events, synctypes.FormatSync)
jr.Timeline.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(events), synctypes.FormatSync)
// If we are limited by the filter AND the history visibility filter
// didn't "remove" events, return that the response is limited.
jr.Timeline.Limited = limited && len(events) == len(recentEvents)
jr.State.Events = synctypes.HeaderedToClientEvents(stateEvents, synctypes.FormatSync)
jr.State.Events = synctypes.ToClientEvents(gomatrixserverlib.ToPDUs(stateEvents), synctypes.FormatSync)
return jr, nil
}
@ -656,7 +642,7 @@ func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, snapsho
return nil
}
func removeDuplicates(stateEvents, recentEvents []*rstypes.HeaderedEvent) []*rstypes.HeaderedEvent {
func removeDuplicates[T gomatrixserverlib.PDU](stateEvents, recentEvents []T) []T {
for _, recentEv := range recentEvents {
if recentEv.StateKey() == nil {
continue // not a state event
@ -677,11 +663,3 @@ func removeDuplicates(stateEvents, recentEvents []*rstypes.HeaderedEvent) []*rst
}
return stateEvents
}
func toEvents(events []*rstypes.HeaderedEvent) []*gomatrixserverlib.Event {
result := make([]*gomatrixserverlib.Event, len(events))
for i := range events {
result[i] = events[i].Event
}
return result
}

View file

@ -16,7 +16,6 @@
package synctypes
import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/gomatrixserverlib/spec"
)
@ -45,7 +44,7 @@ type ClientEvent struct {
}
// ToClientEvents converts server events to client events.
func ToClientEvents(serverEvs []*gomatrixserverlib.Event, format ClientEventFormat) []ClientEvent {
func ToClientEvents(serverEvs []gomatrixserverlib.PDU, format ClientEventFormat) []ClientEvent {
evs := make([]ClientEvent, 0, len(serverEvs))
for _, se := range serverEvs {
if se == nil {
@ -56,20 +55,8 @@ func ToClientEvents(serverEvs []*gomatrixserverlib.Event, format ClientEventForm
return evs
}
// HeaderedToClientEvents converts headered server events to client events.
func HeaderedToClientEvents(serverEvs []*types.HeaderedEvent, format ClientEventFormat) []ClientEvent {
evs := make([]ClientEvent, 0, len(serverEvs))
for _, se := range serverEvs {
if se == nil {
continue // TODO: shouldn't happen?
}
evs = append(evs, HeaderedToClientEvent(se, format))
}
return evs
}
// ToClientEvent converts a single server event to a client event.
func ToClientEvent(se *gomatrixserverlib.Event, format ClientEventFormat) ClientEvent {
func ToClientEvent(se gomatrixserverlib.PDU, format ClientEventFormat) ClientEvent {
ce := ClientEvent{
Content: spec.RawJSON(se.Content()),
Sender: se.Sender(),
@ -85,8 +72,3 @@ func ToClientEvent(se *gomatrixserverlib.Event, format ClientEventFormat) Client
}
return ce
}
// HeaderedToClientEvent converts a single headered server event to a client event.
func HeaderedToClientEvent(se *types.HeaderedEvent, format ClientEventFormat) ClientEvent {
return ToClientEvent(se.Event, format)
}