mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-07 04:13:39 +00:00
Tweaks
This commit is contained in:
parent
be9ee18233
commit
0231199cc8
2 changed files with 45 additions and 63 deletions
|
@ -2,6 +2,7 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
@ -52,7 +53,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
for _, roomID := range joinedRoomIDs {
|
for _, roomID := range joinedRoomIDs {
|
||||||
var jr *types.JoinResponse
|
var jr *types.JoinResponse
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
ctx, roomID, r, &req.Filter, req.Limit, req.Device,
|
ctx, roomID, r, &req.Filter, req.Device,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
@ -72,7 +73,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
if !peek.Deleted {
|
if !peek.Deleted {
|
||||||
var jr *types.JoinResponse
|
var jr *types.JoinResponse
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
ctx, peek.RoomID, r, &req.Filter, req.Limit, req.Device,
|
ctx, peek.RoomID, r, &req.Filter, req.Device,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
@ -193,22 +194,23 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
switch delta.Membership {
|
switch delta.Membership {
|
||||||
case gomatrixserverlib.Join:
|
case gomatrixserverlib.Join:
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
jr.Timeline.PrevBatch = &prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Join[delta.RoomID] = *jr
|
res.Rooms.Join[delta.RoomID] = *jr
|
||||||
|
|
||||||
case gomatrixserverlib.Peek:
|
case gomatrixserverlib.Peek:
|
||||||
jr := types.NewJoinResponse()
|
jr := types.NewJoinResponse()
|
||||||
|
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
jr.Timeline.PrevBatch = &prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
jr.Timeline.Limited = limited
|
jr.Timeline.Limited = limited
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
res.Rooms.Peek[delta.RoomID] = *jr
|
res.Rooms.Peek[delta.RoomID] = *jr
|
||||||
|
|
||||||
case gomatrixserverlib.Leave:
|
case gomatrixserverlib.Leave:
|
||||||
fallthrough // transitions to leave are the same as ban
|
fallthrough // transitions to leave are the same as ban
|
||||||
|
|
||||||
case gomatrixserverlib.Ban:
|
case gomatrixserverlib.Ban:
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||||
// no longer in the room.
|
// no longer in the room.
|
||||||
|
@ -223,15 +225,16 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
func (p *PDUStreamProvider) getResponseForCompleteSync(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
roomID string,
|
roomID string,
|
||||||
r types.Range,
|
r types.Range,
|
||||||
filter *gomatrixserverlib.Filter,
|
filter *gomatrixserverlib.Filter,
|
||||||
numRecentEventsPerRoom int,
|
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
) (jr *types.JoinResponse, err error) {
|
) (
|
||||||
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
recentEvents, stateEvents []*gomatrixserverlib.HeaderedEvent,
|
||||||
|
prevBatch *types.TopologyToken, limited bool, err error,
|
||||||
|
) {
|
||||||
stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State)
|
stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -239,9 +242,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
||||||
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
||||||
var recentStreamEvents []types.StreamEvent
|
var recentStreamEvents []types.StreamEvent
|
||||||
var limited bool
|
|
||||||
recentStreamEvents, limited, err = p.DB.RecentEvents(
|
recentStreamEvents, limited, err = p.DB.RecentEvents(
|
||||||
ctx, roomID, r, numRecentEventsPerRoom, true, true,
|
ctx, roomID, r, filter.Room.Timeline.Limit, true, true,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -251,7 +253,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
|
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
// oldest event in the room's topology.
|
// oldest event in the room's topology.
|
||||||
var prevBatch *types.TopologyToken
|
|
||||||
if len(recentStreamEvents) > 0 {
|
if len(recentStreamEvents) > 0 {
|
||||||
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
||||||
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
|
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
|
||||||
|
@ -268,8 +269,23 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
// We don't include a device here as we don't need to send down
|
// We don't include a device here as we don't need to send down
|
||||||
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
||||||
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
||||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
recentEvents = p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||||
|
ctx context.Context,
|
||||||
|
roomID string,
|
||||||
|
r types.Range,
|
||||||
|
filter *gomatrixserverlib.Filter,
|
||||||
|
device *userapi.Device,
|
||||||
|
) (jr *types.JoinResponse, err error) {
|
||||||
|
recentEvents, stateEvents, prevBatch, limited, err := p.getResponseForCompleteSync(ctx, roomID, r, filter, device)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("p.getResponseForCompleteSync: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
jr = types.NewJoinResponse()
|
jr = types.NewJoinResponse()
|
||||||
jr.Timeline.PrevBatch = prevBatch
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
@ -285,49 +301,11 @@ func (p *PDUStreamProvider) getLeaveResponseForCompleteSync(
|
||||||
filter *gomatrixserverlib.Filter,
|
filter *gomatrixserverlib.Filter,
|
||||||
device *userapi.Device,
|
device *userapi.Device,
|
||||||
) (lr *types.LeaveResponse, err error) {
|
) (lr *types.LeaveResponse, err error) {
|
||||||
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
recentEvents, stateEvents, prevBatch, limited, err := p.getResponseForCompleteSync(ctx, roomID, r, filter, device)
|
||||||
stateEvents, err = p.DB.CurrentState(ctx, roomID, &filter.Room.State)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return nil, fmt.Errorf("p.getResponseForCompleteSync: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
numRecentEventsPerRoom := filter.Room.Timeline.Limit
|
|
||||||
|
|
||||||
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
|
||||||
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
|
||||||
var recentStreamEvents []types.StreamEvent
|
|
||||||
var limited bool
|
|
||||||
recentStreamEvents, limited, err = p.DB.RecentEvents(
|
|
||||||
ctx, roomID, r, numRecentEventsPerRoom, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
recentStreamEvents, limited = p.filterStreamEventsAccordingToHistoryVisibility(recentStreamEvents, device, limited)
|
|
||||||
|
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
|
||||||
// oldest event in the room's topology.
|
|
||||||
var prevBatch *types.TopologyToken
|
|
||||||
if len(recentStreamEvents) > 0 {
|
|
||||||
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
|
||||||
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
prevBatch = &types.TopologyToken{
|
|
||||||
Depth: backwardTopologyPos,
|
|
||||||
PDUPosition: backwardStreamPos,
|
|
||||||
}
|
|
||||||
prevBatch.Decrement()
|
|
||||||
}
|
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
|
||||||
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
|
||||||
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
|
||||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
|
||||||
|
|
||||||
lr = types.NewLeaveResponse()
|
lr = types.NewLeaveResponse()
|
||||||
lr.Timeline.PrevBatch = prevBatch
|
lr.Timeline.PrevBatch = prevBatch
|
||||||
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
|
|
@ -74,6 +74,10 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
if f != nil {
|
if f != nil {
|
||||||
filter = *f
|
filter = *f
|
||||||
}
|
}
|
||||||
|
limit := filter.Room.Timeline.Limit
|
||||||
|
if limit == 0 {
|
||||||
|
limit = DefaultTimelineLimit
|
||||||
|
}
|
||||||
// TODO: Additional query params: set_presence, filter
|
// TODO: Additional query params: set_presence, filter
|
||||||
|
|
||||||
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||||
|
@ -81,20 +85,20 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
||||||
"device_id": device.ID,
|
"device_id": device.ID,
|
||||||
"since": since,
|
"since": since,
|
||||||
"timeout": timeout,
|
"timeout": timeout,
|
||||||
"limit": filter.Room.Timeline.Limit,
|
"limit": limit,
|
||||||
})
|
})
|
||||||
|
|
||||||
return &types.SyncRequest{
|
return &types.SyncRequest{
|
||||||
Context: req.Context(), //
|
Context: req.Context(), //
|
||||||
Log: logger, //
|
Log: logger, //
|
||||||
Device: &device, //
|
Device: &device, //
|
||||||
Response: types.NewResponse(), // Populated by all streams
|
Response: types.NewResponse(), // Populated by all streams
|
||||||
Filter: filter, //
|
Filter: filter, //
|
||||||
Since: since, //
|
Since: since, //
|
||||||
Timeout: timeout, //
|
Timeout: timeout, //
|
||||||
Limit: filter.Room.Timeline.Limit, //
|
Limit: limit, //
|
||||||
Rooms: make(map[string]string), // Populated by the PDU stream
|
Rooms: make(map[string]string), // Populated by the PDU stream
|
||||||
WantFullState: wantFullState, //
|
WantFullState: wantFullState, //
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue