diff --git a/syncapi/storage/shared/syncserver.go b/syncapi/storage/shared/syncserver.go index 156ada92..a6b41872 100644 --- a/syncapi/storage/shared/syncserver.go +++ b/syncapi/storage/shared/syncserver.go @@ -19,6 +19,7 @@ import ( "database/sql" "encoding/json" "fmt" + "strconv" "time" eduAPI "github.com/matrix-org/dendrite/eduserver/api" @@ -31,6 +32,7 @@ import ( "github.com/matrix-org/dendrite/syncapi/storage/tables" "github.com/matrix-org/dendrite/syncapi/types" "github.com/matrix-org/gomatrixserverlib" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" ) @@ -863,6 +865,85 @@ func (d *Database) getResponseWithPDUsForCompleteSync( return //res, toPos, joinedRoomIDs, err } +func (d *Database) filterStreamEventsAccordingToHistoryVisibility( + recentStreamEvents []types.StreamEvent, + device *userapi.Device, + limited bool, +) ([]types.StreamEvent, bool, error) { + var err error + + // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the + // user shouldn't see, we check the recent events and remove any prior to the join event of the user + // which is equiv to history_visibility: joined + joinEventIndex := -1 + leaveEventIndex := -1 + for i := len(recentStreamEvents) - 1; i >= 0; i-- { + ev := recentStreamEvents[i] + if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { + membership, _ := ev.Membership() + if membership == gomatrixserverlib.Join { + joinEventIndex = i + if i > 0 { + // the create event happens before the first join, so we should cut it at that point instead + if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { + joinEventIndex = i - 1 + } + } + break + } else if membership == gomatrixserverlib.Leave { + leaveEventIndex = i + } + + if joinEventIndex != -1 && leaveEventIndex != -1 { + break + } + } + } + + // Default at the start of the array + sliceStart := 0 + // If there is a joinEvent, then cut all events earlier the join (exclude the join itself too) + if joinEventIndex != -1 { + sliceStart = joinEventIndex + 1 + limited = false // so clients know not to try to backpaginate + } + // Default to spanning the rest of the array + sliceEnd := len(recentStreamEvents) + // If there is a leaveEvent, then cut all events after the person left (exclude the leave itself too) + if leaveEventIndex != -1 { + sliceEnd = leaveEventIndex + } + + type somematrixevent struct { + event_id, sender, eventType, origin_server_ts, content string + } + + events := make([]somematrixevent, len(recentStreamEvents)) + for i, v := range recentStreamEvents { + events[i] = somematrixevent{ + event_id: v.HeaderedEvent.Event.EventID(), + sender: v.HeaderedEvent.Event.Sender(), + eventType: v.HeaderedEvent.Event.Type(), + origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10), + content: string(v.HeaderedEvent.Event.Content()), + } + } + + logrus.WithFields(logrus.Fields{ + "sliceStart": sliceStart, + "sliceEnd": sliceEnd, + "recentStreamEvents": fmt.Sprintf("%+v", events), + }).Info("cutting down the events") + + outEvents := recentStreamEvents[sliceStart:sliceEnd] + + logrus.WithFields(logrus.Fields{ + "recentStreamEvents": fmt.Sprintf("%+v", events[sliceStart:sliceEnd]), + }).Info("cutting down the events after") + + return outEvents, limited, err +} + func (d *Database) getJoinResponseForCompleteSync( ctx context.Context, txn *sql.Tx, roomID string, @@ -972,40 +1053,9 @@ func (d *Database) getLeaveResponseForCompleteSync( return } - // TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the - // user shouldn't see, we check the recent events and remove any prior to the join event of the user - // which is equiv to history_visibility: joined - joinEventIndex := -1 - leaveEventIndex := -1 - for i := len(recentStreamEvents) - 1; i >= 0; i-- { - ev := recentStreamEvents[i] - if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) { - membership, _ := ev.Membership() - if membership == gomatrixserverlib.Join { - joinEventIndex = i - if i > 0 { - // the create event happens before the first join, so we should cut it at that point instead - if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") { - joinEventIndex = i - 1 - } - } - break - } else if membership == gomatrixserverlib.Leave { - leaveEventIndex = i - } - - if joinEventIndex != -1 && leaveEventIndex != -1 { - break - } - } - } - - // We can assume that if the person joined, they also left and will only show the events between (inclusive) - if joinEventIndex != -1 && leaveEventIndex != -1 { - // cut all events earlier the join (exclude the join itself too) - // and cut all events after the person left (exclude the leave itself too) - recentStreamEvents = recentStreamEvents[joinEventIndex+1 : leaveEventIndex] - limited = false // so clients know not to try to backpaginate + recentStreamEvents, limited, err = d.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, &device, limited) + if err != nil { + return } // Retrieve the backward topology position, i.e. the position of the @@ -1145,8 +1195,63 @@ func (d *Database) addRoomDeltaToResponse( if err != nil { return err } + + logrus.WithFields(logrus.Fields{ + "limited": limited, + "delta.stateEvents": len(delta.stateEvents), + "recentStreamEvents": len(recentStreamEvents), + }).Info("isync addRoomDeltaToResponse removeDuplicates") + + // Remove duplicates while we have all of the events (no filtering) + allRecentEvents := d.StreamEventsToEvents(device, recentStreamEvents) + delta.stateEvents = removeDuplicates(delta.stateEvents, allRecentEvents) // roll back + + type somematrixevent struct { + event_id, sender, eventType, origin_server_ts, content string + } + + // events := make([]somematrixevent, len(recentStreamEvents)) + // for i, v := range recentStreamEvents { + // events[i] = somematrixevent{ + // event_id: v.HeaderedEvent.Event.EventID(), + // sender: v.HeaderedEvent.Event.Sender(), + // eventType: v.HeaderedEvent.Event.Type(), + // origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10), + // content: string(v.HeaderedEvent.Event.Content()), + // } + // } + + // logrus.WithFields(logrus.Fields{ + // "recentStreamEvents": fmt.Sprintf("%+v", events), + // }).Info("isync addRoomDeltaToResponse before") + + recentStreamEvents, limited, err = d.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, device, limited) + if err != nil { + return err + } + + // events = make([]somematrixevent, len(recentStreamEvents)) + // for i, v := range recentStreamEvents { + // events[i] = somematrixevent{ + // event_id: v.HeaderedEvent.Event.EventID(), + // sender: v.HeaderedEvent.Event.Sender(), + // eventType: v.HeaderedEvent.Event.Type(), + // origin_server_ts: strconv.FormatUint(uint64(v.HeaderedEvent.Event.OriginServerTS()), 10), + // content: string(v.HeaderedEvent.Event.Content()), + // } + // } + + // logrus.WithFields(logrus.Fields{ + // "recentStreamEvents": fmt.Sprintf("%+v", events), + // }).Info("isync addRoomDeltaToResponse after") + + logrus.WithFields(logrus.Fields{ + "limited": limited, + "delta.stateEvents": delta.stateEvents, + }).Info("isync addRoomDeltaToResponse delta.stateEvents before") + recentEvents := d.StreamEventsToEvents(device, recentStreamEvents) - delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back + //delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back prevBatch, err := d.getBackwardTopologyPos(ctx, txn, recentStreamEvents) if err != nil { return err @@ -1178,12 +1283,10 @@ func (d *Database) addRoomDeltaToResponse( case gomatrixserverlib.Leave: fallthrough // transitions to leave are the same as ban case gomatrixserverlib.Ban: - // TODO: recentEvents may contain events that this user is not allowed to see because they are - // no longer in the room. lr := types.NewLeaveResponse() lr.Timeline.PrevBatch = &prevBatch lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync) - lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true + lr.Timeline.Limited = limited lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync) res.Rooms.Leave[delta.roomID] = *lr }