dendrite/syncapi/streams/stream_pdu.go
Neil Alexander 0769412df5
Fix peeks
2021-07-27 13:42:14 +01:00

79 lines
1.8 KiB
Go

package streams
import (
"context"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
)
type PDUStreamProvider struct {
StreamProvider
}
func (p *PDUStreamProvider) Setup() {
p.StreamProvider.Setup()
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
id, err := p.DB.MaxStreamPositionForPDUs(context.Background())
if err != nil {
panic(err)
}
p.latest = id
}
func (p *PDUStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.StreamPosition {
from := types.StreamPosition(0)
to := p.LatestPosition(ctx)
// Get the current sync position which we will base the sync response on.
// For complete syncs, we want to start at the most recent events and work
// backwards, so that we show the most recent events in the room.
r := types.Range{
From: to,
To: 0,
Backwards: true,
}
// Extract room state and recent events for all rooms the user is joined to.
joinedRoomIDs, err := p.DB.RoomIDsWithMembership(ctx, req.Device.UserID, gomatrixserverlib.Join)
if err != nil {
req.Log.WithError(err).Error("p.DB.RoomIDsWithMembership failed")
return from
}
stateFilter := req.Filter.Room.State
eventFilter := req.Filter.Room.Timeline
if err := p.DB.PDUCompleteSync(ctx, req, joinedRoomIDs, r, &stateFilter, &eventFilter); err != nil {
req.Log.WithError(err).Error("p.DB.PDUCompleteSync failed")
return from
}
return to
}
func (p *PDUStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.StreamPosition,
) (newPos types.StreamPosition) {
r := types.Range{
From: from,
To: to,
Backwards: from > to,
}
newPos = to
if err := p.DB.PDUIncrementalSync(ctx, req, r, from, to); err != nil {
req.Log.WithError(err).Error("p.DB.PDUIncrementalSync failed")
return from
}
return r.To
}