Remove partitioned stream positions

This was used by the device list stream position. The device list position
now corresponds to the `Offset`, and the partition is always 0, in prep
for removing reliance on Kafka topics for device list changes.
This commit is contained in:
Kegan Dougal 2022-01-19 17:39:47 +00:00
parent 8d3a2d87e6
commit 43f56a45a5
14 changed files with 63 additions and 227 deletions

View file

@ -10,7 +10,7 @@ import (
)
type DeviceListStreamProvider struct {
PartitionedStreamProvider
StreamProvider
rsAPI api.RoomserverInternalAPI
keyAPI keyapi.KeyInternalAPI
}
@ -18,15 +18,15 @@ type DeviceListStreamProvider struct {
func (p *DeviceListStreamProvider) CompleteSync(
ctx context.Context,
req *types.SyncRequest,
) types.LogPosition {
) types.StreamPosition {
return p.LatestPosition(ctx)
}
func (p *DeviceListStreamProvider) IncrementalSync(
ctx context.Context,
req *types.SyncRequest,
from, to types.LogPosition,
) types.LogPosition {
from, to types.StreamPosition,
) types.StreamPosition {
var err error
to, _, err = internal.DeviceListCatchup(context.Background(), p.keyAPI, p.rsAPI, req.Device.UserID, req.Response, from, to)
if err != nil {

View file

@ -18,7 +18,7 @@ type Streams struct {
InviteStreamProvider types.StreamProvider
SendToDeviceStreamProvider types.StreamProvider
AccountDataStreamProvider types.StreamProvider
DeviceListStreamProvider types.PartitionedStreamProvider
DeviceListStreamProvider types.StreamProvider
}
func NewSyncStreamProviders(
@ -48,9 +48,9 @@ func NewSyncStreamProviders(
userAPI: userAPI,
},
DeviceListStreamProvider: &DeviceListStreamProvider{
PartitionedStreamProvider: PartitionedStreamProvider{DB: d},
rsAPI: rsAPI,
keyAPI: keyAPI,
StreamProvider: StreamProvider{DB: d},
rsAPI: rsAPI,
keyAPI: keyAPI,
},
}

View file

@ -1,38 +0,0 @@
package streams
import (
"context"
"sync"
"github.com/matrix-org/dendrite/syncapi/storage"
"github.com/matrix-org/dendrite/syncapi/types"
)
type PartitionedStreamProvider struct {
DB storage.Database
latest types.LogPosition
latestMutex sync.RWMutex
}
func (p *PartitionedStreamProvider) Setup() {
}
func (p *PartitionedStreamProvider) Advance(
latest types.LogPosition,
) {
p.latestMutex.Lock()
defer p.latestMutex.Unlock()
if latest.IsAfter(&p.latest) {
p.latest = latest
}
}
func (p *PartitionedStreamProvider) LatestPosition(
ctx context.Context,
) types.LogPosition {
p.latestMutex.RLock()
defer p.latestMutex.RUnlock()
return p.latest
}