Some WIP code for IncrementalSync but breaks other Sytests

This commit is contained in:
Eric Eastwood 2021-01-06 03:22:50 -06:00
parent 9e4fd0157f
commit c7328cee41

View file

@ -19,6 +19,7 @@ import (
"database/sql"
"encoding/json"
"fmt"
"strconv"
"time"
eduAPI "github.com/matrix-org/dendrite/eduserver/api"
@ -31,6 +32,7 @@ import (
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
)
@ -863,6 +865,85 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
return //res, toPos, joinedRoomIDs, err
}
func (d *Database) filterStreamEventsAccordingToHistoryVisibility(
recentStreamEvents []types.StreamEvent,
device *userapi.Device,
limited bool,
) ([]types.StreamEvent, bool, error) {
var err error
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
joinEventIndex := -1
leaveEventIndex := -1
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
ev := recentStreamEvents[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
membership, _ := ev.Membership()
if membership == gomatrixserverlib.Join {
joinEventIndex = i
if i > 0 {
// the create event happens before the first join, so we should cut it at that point instead
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
joinEventIndex = i - 1
}
}
break
} else if membership == gomatrixserverlib.Leave {
leaveEventIndex = i
}
if joinEventIndex != -1 && leaveEventIndex != -1 {
break
}
}
}
// Default at the start of the array
sliceStart := 0
// If there is a joinEvent, then cut all events earlier the join (exclude the join itself too)
if joinEventIndex != -1 {
sliceStart = joinEventIndex + 1
limited = false // so clients know not to try to backpaginate
}
// Default to spanning the rest of the array
sliceEnd := len(recentStreamEvents)
// If there is a leaveEvent, then cut all events after the person left (exclude the leave itself too)
if leaveEventIndex != -1 {
sliceEnd = leaveEventIndex
}
type somematrixevent struct {
event_id, sender, eventType, origin_server_ts, content string
}
events := make([]somematrixevent, len(recentStreamEvents))
for i, v := range recentStreamEvents {
events[i] = somematrixevent{
event_id: v.HeaderedEvent.Event.EventID(),
sender: v.HeaderedEvent.Event.Sender(),
eventType: v.HeaderedEvent.Event.Type(),
origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10),
content: string(v.HeaderedEvent.Event.Content()),
}
}
logrus.WithFields(logrus.Fields{
"sliceStart": sliceStart,
"sliceEnd": sliceEnd,
"recentStreamEvents": fmt.Sprintf("%+v", events),
}).Info("cutting down the events")
outEvents := recentStreamEvents[sliceStart:sliceEnd]
logrus.WithFields(logrus.Fields{
"recentStreamEvents": fmt.Sprintf("%+v", events[sliceStart:sliceEnd]),
}).Info("cutting down the events after")
return outEvents, limited, err
}
func (d *Database) getJoinResponseForCompleteSync(
ctx context.Context, txn *sql.Tx,
roomID string,
@ -972,40 +1053,9 @@ func (d *Database) getLeaveResponseForCompleteSync(
return
}
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
// which is equiv to history_visibility: joined
joinEventIndex := -1
leaveEventIndex := -1
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
ev := recentStreamEvents[i]
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
membership, _ := ev.Membership()
if membership == gomatrixserverlib.Join {
joinEventIndex = i
if i > 0 {
// the create event happens before the first join, so we should cut it at that point instead
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
joinEventIndex = i - 1
}
}
break
} else if membership == gomatrixserverlib.Leave {
leaveEventIndex = i
}
if joinEventIndex != -1 && leaveEventIndex != -1 {
break
}
}
}
// We can assume that if the person joined, they also left and will only show the events between (inclusive)
if joinEventIndex != -1 && leaveEventIndex != -1 {
// cut all events earlier the join (exclude the join itself too)
// and cut all events after the person left (exclude the leave itself too)
recentStreamEvents = recentStreamEvents[joinEventIndex+1 : leaveEventIndex]
limited = false // so clients know not to try to backpaginate
recentStreamEvents, limited, err = d.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, &device, limited)
if err != nil {
return
}
// Retrieve the backward topology position, i.e. the position of the
@ -1145,8 +1195,63 @@ func (d *Database) addRoomDeltaToResponse(
if err != nil {
return err
}
logrus.WithFields(logrus.Fields{
"limited": limited,
"delta.stateEvents": len(delta.stateEvents),
"recentStreamEvents": len(recentStreamEvents),
}).Info("isync addRoomDeltaToResponse removeDuplicates")
// Remove duplicates while we have all of the events (no filtering)
allRecentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, allRecentEvents) // roll back
type somematrixevent struct {
event_id, sender, eventType, origin_server_ts, content string
}
// events := make([]somematrixevent, len(recentStreamEvents))
// for i, v := range recentStreamEvents {
// events[i] = somematrixevent{
// event_id: v.HeaderedEvent.Event.EventID(),
// sender: v.HeaderedEvent.Event.Sender(),
// eventType: v.HeaderedEvent.Event.Type(),
// origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10),
// content: string(v.HeaderedEvent.Event.Content()),
// }
// }
// logrus.WithFields(logrus.Fields{
// "recentStreamEvents": fmt.Sprintf("%+v", events),
// }).Info("isync addRoomDeltaToResponse before")
recentStreamEvents, limited, err = d.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, device, limited)
if err != nil {
return err
}
// events = make([]somematrixevent, len(recentStreamEvents))
// for i, v := range recentStreamEvents {
// events[i] = somematrixevent{
// event_id: v.HeaderedEvent.Event.EventID(),
// sender: v.HeaderedEvent.Event.Sender(),
// eventType: v.HeaderedEvent.Event.Type(),
// origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10),
// content: string(v.HeaderedEvent.Event.Content()),
// }
// }
// logrus.WithFields(logrus.Fields{
// "recentStreamEvents": fmt.Sprintf("%+v", events),
// }).Info("isync addRoomDeltaToResponse after")
logrus.WithFields(logrus.Fields{
"limited": limited,
"delta.stateEvents": delta.stateEvents,
}).Info("isync addRoomDeltaToResponse delta.stateEvents before")
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
//delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents)
if err != nil {
return err
@ -1178,12 +1283,10 @@ func (d *Database) addRoomDeltaToResponse(
case gomatrixserverlib.Leave:
fallthrough // transitions to leave are the same as ban
case gomatrixserverlib.Ban:
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.PrevBatch = &prevBatch
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.Timeline.Limited = limited
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}