mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-04-10 13:53:40 +00:00
Move PDU stream functions to storage package to reuse snapshot transaction
This commit is contained in:
parent
a060df91e2
commit
e145c2c506
4 changed files with 322 additions and 312 deletions
|
@ -29,6 +29,21 @@ import (
|
||||||
type Database interface {
|
type Database interface {
|
||||||
internal.PartitionStorer
|
internal.PartitionStorer
|
||||||
|
|
||||||
|
PDUCompleteSync(
|
||||||
|
ctx context.Context,
|
||||||
|
req *types.SyncRequest,
|
||||||
|
joinedRoomIDs []string,
|
||||||
|
r types.Range,
|
||||||
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
|
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
) error
|
||||||
|
PDUIncrementalSync(
|
||||||
|
ctx context.Context,
|
||||||
|
req *types.SyncRequest,
|
||||||
|
r types.Range,
|
||||||
|
from, to types.StreamPosition,
|
||||||
|
) error
|
||||||
|
|
||||||
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
|
||||||
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
|
||||||
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
|
||||||
|
@ -36,8 +51,6 @@ type Database interface {
|
||||||
MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error)
|
MaxStreamPositionForSendToDeviceMessages(ctx context.Context) (types.StreamPosition, error)
|
||||||
|
|
||||||
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
|
CurrentState(ctx context.Context, roomID string, stateFilterPart *gomatrixserverlib.StateFilter, excludeEventIDs []string) ([]*gomatrixserverlib.HeaderedEvent, error)
|
||||||
GetStateDeltasForFullStateSync(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
|
||||||
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
|
||||||
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
||||||
|
|
||||||
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||||
|
|
272
syncapi/storage/shared/sync_pdu.go
Normal file
272
syncapi/storage/shared/sync_pdu.go
Normal file
|
@ -0,0 +1,272 @@
|
||||||
|
package shared
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (d *Database) PDUCompleteSync(
|
||||||
|
ctx context.Context,
|
||||||
|
req *types.SyncRequest,
|
||||||
|
joinedRoomIDs []string,
|
||||||
|
r types.Range,
|
||||||
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
|
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
) error {
|
||||||
|
txn, err := d.readOnlySnapshot(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.readOnlySnapshot: %w", err)
|
||||||
|
}
|
||||||
|
var succeeded bool
|
||||||
|
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
||||||
|
|
||||||
|
for _, roomID := range joinedRoomIDs {
|
||||||
|
jr, err := d.getJoinResponseForCompleteSync(
|
||||||
|
ctx, txn, roomID, r, stateFilter, eventFilter, req.WantFullState, req.Device,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.getJoinResponseForCompleteSync: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Response.Rooms.Join[roomID] = *jr
|
||||||
|
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) PDUIncrementalSync(
|
||||||
|
ctx context.Context,
|
||||||
|
req *types.SyncRequest,
|
||||||
|
r types.Range,
|
||||||
|
from, to types.StreamPosition,
|
||||||
|
) error {
|
||||||
|
txn, err := d.readOnlySnapshot(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("d.readOnlySnapshot: %w", err)
|
||||||
|
}
|
||||||
|
var succeeded bool
|
||||||
|
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
||||||
|
|
||||||
|
var stateDeltas []types.StateDelta
|
||||||
|
var joinedRooms []string
|
||||||
|
|
||||||
|
stateFilter := req.Filter.Room.State
|
||||||
|
eventFilter := req.Filter.Room.Timeline
|
||||||
|
|
||||||
|
if req.WantFullState {
|
||||||
|
if stateDeltas, joinedRooms, err = d.getStateDeltasForFullStateSync(ctx, txn, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
|
||||||
|
return fmt.Errorf("d.GetStateDeltasForFullStateSync: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if stateDeltas, joinedRooms, err = d.getStateDeltas(ctx, txn, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, roomID := range joinedRooms {
|
||||||
|
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, delta := range stateDeltas {
|
||||||
|
if err = d.addRoomDeltaToResponse(ctx, txn, req.Device, r, delta, &eventFilter, req.Response); err != nil {
|
||||||
|
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) addRoomDeltaToResponse(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
|
device *userapi.Device,
|
||||||
|
r types.Range,
|
||||||
|
delta types.StateDelta,
|
||||||
|
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
res *types.Response,
|
||||||
|
) error {
|
||||||
|
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
|
||||||
|
// make sure we don't leak recent events after the leave event.
|
||||||
|
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
||||||
|
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
||||||
|
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
||||||
|
// in a single /sync request
|
||||||
|
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
||||||
|
r.To = delta.MembershipPos
|
||||||
|
}
|
||||||
|
recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
|
||||||
|
ctx, txn, delta.RoomID, r,
|
||||||
|
eventFilter, true, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
|
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
|
||||||
|
prevBatch, err := d.GetBackwardTopologyPos(ctx, recentStreamEvents)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// XXX: should we ever get this far if we have no recent events or state in this room?
|
||||||
|
// in practice we do for peeks, but possibly not joins?
|
||||||
|
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch delta.Membership {
|
||||||
|
case gomatrixserverlib.Join:
|
||||||
|
jr := types.NewJoinResponse()
|
||||||
|
jr.Timeline.PrevBatch = &prevBatch
|
||||||
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
jr.Timeline.Limited = limited
|
||||||
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Join[delta.RoomID] = *jr
|
||||||
|
|
||||||
|
case gomatrixserverlib.Peek:
|
||||||
|
jr := types.NewJoinResponse()
|
||||||
|
jr.Timeline.PrevBatch = &prevBatch
|
||||||
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
jr.Timeline.Limited = limited
|
||||||
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Peek[delta.RoomID] = *jr
|
||||||
|
|
||||||
|
case gomatrixserverlib.Leave:
|
||||||
|
fallthrough // transitions to leave are the same as ban
|
||||||
|
|
||||||
|
case gomatrixserverlib.Ban:
|
||||||
|
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
||||||
|
// no longer in the room.
|
||||||
|
lr := types.NewLeaveResponse()
|
||||||
|
lr.Timeline.PrevBatch = &prevBatch
|
||||||
|
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
||||||
|
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
res.Rooms.Leave[delta.RoomID] = *lr
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Database) getJoinResponseForCompleteSync(
|
||||||
|
ctx context.Context,
|
||||||
|
txn *sql.Tx,
|
||||||
|
roomID string,
|
||||||
|
r types.Range,
|
||||||
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
|
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||||
|
wantFullState bool,
|
||||||
|
device *userapi.Device,
|
||||||
|
) (jr *types.JoinResponse, err error) {
|
||||||
|
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
||||||
|
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
||||||
|
recentStreamEvents, limited, err := d.OutputEvents.SelectRecentEvents(
|
||||||
|
ctx, txn, roomID, r, eventFilter, true, true,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the event IDs of the stream events we fetched. There's no point in us
|
||||||
|
var excludingEventIDs []string
|
||||||
|
if !wantFullState {
|
||||||
|
excludingEventIDs = make([]string, 0, len(recentStreamEvents))
|
||||||
|
for _, event := range recentStreamEvents {
|
||||||
|
if event.StateKey() != nil {
|
||||||
|
excludingEventIDs = append(excludingEventIDs, event.EventID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stateEvents, err := d.CurrentRoomState.SelectCurrentState(ctx, txn, roomID, stateFilter, excludingEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
|
||||||
|
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
|
||||||
|
// which is equiv to history_visibility: joined
|
||||||
|
joinEventIndex := -1
|
||||||
|
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
|
||||||
|
ev := recentStreamEvents[i]
|
||||||
|
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
|
||||||
|
membership, _ := ev.Membership()
|
||||||
|
if membership == "join" {
|
||||||
|
joinEventIndex = i
|
||||||
|
if i > 0 {
|
||||||
|
// the create event happens before the first join, so we should cut it at that point instead
|
||||||
|
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
|
||||||
|
joinEventIndex = i - 1
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if joinEventIndex != -1 {
|
||||||
|
// cut all events earlier than the join (but not the join itself)
|
||||||
|
recentStreamEvents = recentStreamEvents[joinEventIndex:]
|
||||||
|
limited = false // so clients know not to try to backpaginate
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the backward topology position, i.e. the position of the
|
||||||
|
// oldest event in the room's topology.
|
||||||
|
var prevBatch *types.TopologyToken
|
||||||
|
if len(recentStreamEvents) > 0 {
|
||||||
|
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
||||||
|
backwardTopologyPos, backwardStreamPos, err = d.PositionInTopology(ctx, recentStreamEvents[0].EventID())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
prevBatch = &types.TopologyToken{
|
||||||
|
Depth: backwardTopologyPos,
|
||||||
|
PDUPosition: backwardStreamPos,
|
||||||
|
}
|
||||||
|
prevBatch.Decrement()
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't include a device here as we don't need to send down
|
||||||
|
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
||||||
|
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
||||||
|
recentEvents := d.StreamEventsToEvents(device, recentStreamEvents)
|
||||||
|
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
||||||
|
jr = types.NewJoinResponse()
|
||||||
|
jr.Timeline.PrevBatch = prevBatch
|
||||||
|
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
||||||
|
jr.Timeline.Limited = limited
|
||||||
|
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
||||||
|
return jr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||||
|
for _, recentEv := range recentEvents {
|
||||||
|
if recentEv.StateKey() == nil {
|
||||||
|
continue // not a state event
|
||||||
|
}
|
||||||
|
// TODO: This is a linear scan over all the current state events in this room. This will
|
||||||
|
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
|
||||||
|
// then do a binary search to find matching events, similar to what roomserver does.
|
||||||
|
for j := 0; j < len(stateEvents); j++ {
|
||||||
|
if stateEvents[j].EventID() == recentEv.EventID() {
|
||||||
|
// overwrite the element to remove with the last element then pop the last element.
|
||||||
|
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
|
||||||
|
// (we don't care about the order of stateEvents)
|
||||||
|
stateEvents[j] = stateEvents[len(stateEvents)-1]
|
||||||
|
stateEvents = stateEvents[:len(stateEvents)-1]
|
||||||
|
break // there shouldn't be multiple events with the same event ID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stateEvents
|
||||||
|
}
|
|
@ -661,8 +661,8 @@ func (d *Database) fetchMissingStateEvents(
|
||||||
// exclusive of oldPos, inclusive of newPos, for the rooms in which
|
// exclusive of oldPos, inclusive of newPos, for the rooms in which
|
||||||
// the user has new membership events.
|
// the user has new membership events.
|
||||||
// A list of joined room IDs is also returned in case the caller needs it.
|
// A list of joined room IDs is also returned in case the caller needs it.
|
||||||
func (d *Database) GetStateDeltas(
|
func (d *Database) getStateDeltas(
|
||||||
ctx context.Context, device *userapi.Device,
|
ctx context.Context, txn *sql.Tx, device *userapi.Device,
|
||||||
r types.Range, userID string,
|
r types.Range, userID string,
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]types.StateDelta, []string, error) {
|
) ([]types.StateDelta, []string, error) {
|
||||||
|
@ -674,12 +674,6 @@ func (d *Database) GetStateDeltas(
|
||||||
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
// * Check if user is still CURRENTLY invited to the room. If so, add room to 'invited' block.
|
||||||
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
// * Check if the user is CURRENTLY (TODO) left/banned. If so, add room to 'archived' block.
|
||||||
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
// - Get all CURRENTLY joined rooms, and add them to 'joined' block.
|
||||||
txn, err := d.readOnlySnapshot(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
|
|
||||||
}
|
|
||||||
var succeeded bool
|
|
||||||
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
|
||||||
|
|
||||||
var deltas []types.StateDelta
|
var deltas []types.StateDelta
|
||||||
|
|
||||||
|
@ -764,7 +758,6 @@ func (d *Database) GetStateDeltas(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
succeeded = true
|
|
||||||
return deltas, joinedRoomIDs, nil
|
return deltas, joinedRoomIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -772,18 +765,11 @@ func (d *Database) GetStateDeltas(
|
||||||
// requests with full_state=true.
|
// requests with full_state=true.
|
||||||
// Fetches full state for all joined rooms and uses selectStateInRange to get
|
// Fetches full state for all joined rooms and uses selectStateInRange to get
|
||||||
// updates for other rooms.
|
// updates for other rooms.
|
||||||
func (d *Database) GetStateDeltasForFullStateSync(
|
func (d *Database) getStateDeltasForFullStateSync(
|
||||||
ctx context.Context, device *userapi.Device,
|
ctx context.Context, txn *sql.Tx, device *userapi.Device,
|
||||||
r types.Range, userID string,
|
r types.Range, userID string,
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
stateFilter *gomatrixserverlib.StateFilter,
|
||||||
) ([]types.StateDelta, []string, error) {
|
) ([]types.StateDelta, []string, error) {
|
||||||
txn, err := d.readOnlySnapshot(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, nil, fmt.Errorf("d.readOnlySnapshot: %w", err)
|
|
||||||
}
|
|
||||||
var succeeded bool
|
|
||||||
defer sqlutil.EndTransactionWithCheck(txn, &succeeded, &err)
|
|
||||||
|
|
||||||
// Use a reasonable initial capacity
|
// Use a reasonable initial capacity
|
||||||
deltas := make(map[string]types.StateDelta)
|
deltas := make(map[string]types.StateDelta)
|
||||||
|
|
||||||
|
@ -860,7 +846,6 @@ func (d *Database) GetStateDeltasForFullStateSync(
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
succeeded = true
|
|
||||||
return result, joinedRoomIDs, nil
|
return result, joinedRoomIDs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,54 +2,17 @@ package streams
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"go.uber.org/atomic"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// The max number of per-room goroutines to have running.
|
|
||||||
// Too high and this will consume lots of CPU, too low and complete
|
|
||||||
// sync responses will take longer to process.
|
|
||||||
const PDU_STREAM_WORKERS = 256
|
|
||||||
|
|
||||||
// The maximum number of tasks that can be queued in total before
|
|
||||||
// backpressure will build up and the rests will start to block.
|
|
||||||
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
|
|
||||||
|
|
||||||
type PDUStreamProvider struct {
|
type PDUStreamProvider struct {
|
||||||
StreamProvider
|
StreamProvider
|
||||||
|
|
||||||
tasks chan func()
|
|
||||||
workers atomic.Int32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PDUStreamProvider) worker() {
|
|
||||||
defer p.workers.Dec()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case f := <-p.tasks:
|
|
||||||
f()
|
|
||||||
case <-time.After(time.Second * 10):
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PDUStreamProvider) queue(f func()) {
|
|
||||||
if p.workers.Load() < PDU_STREAM_WORKERS {
|
|
||||||
p.workers.Inc()
|
|
||||||
go p.worker()
|
|
||||||
}
|
|
||||||
p.tasks <- f
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) Setup() {
|
func (p *PDUStreamProvider) Setup() {
|
||||||
p.StreamProvider.Setup()
|
p.StreamProvider.Setup()
|
||||||
p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
|
|
||||||
|
|
||||||
p.latestMutex.Lock()
|
p.latestMutex.Lock()
|
||||||
defer p.latestMutex.Unlock()
|
defer p.latestMutex.Unlock()
|
||||||
|
@ -84,55 +47,40 @@ func (p *PDUStreamProvider) CompleteSync(
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add peeked rooms.
|
||||||
|
/*
|
||||||
|
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||||
|
if err != nil {
|
||||||
|
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
|
||||||
|
return from
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
stateFilter := req.Filter.Room.State
|
stateFilter := req.Filter.Room.State
|
||||||
eventFilter := req.Filter.Room.Timeline
|
eventFilter := req.Filter.Room.Timeline
|
||||||
|
|
||||||
// Build up a /sync response. Add joined rooms.
|
if err := p.DB.PDUCompleteSync(ctx, req, joinedRoomIDs, r, &stateFilter, &eventFilter); err != nil {
|
||||||
var reqMutex sync.Mutex
|
req.Log.WithError(err).Error("p.DB.PDUCompleteSync failed")
|
||||||
var reqWaitGroup sync.WaitGroup
|
|
||||||
reqWaitGroup.Add(len(joinedRoomIDs))
|
|
||||||
for _, room := range joinedRoomIDs {
|
|
||||||
roomID := room
|
|
||||||
p.queue(func() {
|
|
||||||
defer reqWaitGroup.Done()
|
|
||||||
|
|
||||||
var jr *types.JoinResponse
|
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
|
||||||
ctx, roomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
reqMutex.Lock()
|
|
||||||
defer reqMutex.Unlock()
|
|
||||||
req.Response.Rooms.Join[roomID] = *jr
|
|
||||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
reqWaitGroup.Wait()
|
|
||||||
|
|
||||||
// Add peeked rooms.
|
|
||||||
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
|
||||||
if err != nil {
|
|
||||||
req.Log.WithError(err).Error("p.DB.PeeksInRange failed")
|
|
||||||
return from
|
return from
|
||||||
}
|
}
|
||||||
for _, peek := range peeks {
|
|
||||||
if !peek.Deleted {
|
/*
|
||||||
var jr *types.JoinResponse
|
for _, peek := range peeks {
|
||||||
jr, err = p.getJoinResponseForCompleteSync(
|
p.queue(func() {
|
||||||
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
|
if !peek.Deleted {
|
||||||
)
|
var jr *types.JoinResponse
|
||||||
if err != nil {
|
jr, err = p.getJoinResponseForCompleteSync(
|
||||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.WantFullState, req.Device,
|
||||||
return from
|
)
|
||||||
}
|
if err != nil {
|
||||||
req.Response.Rooms.Peek[peek.RoomID] = *jr
|
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req.Response.Rooms.Peek[peek.RoomID] = *jr
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
*/
|
||||||
|
|
||||||
return to
|
return to
|
||||||
}
|
}
|
||||||
|
@ -149,218 +97,10 @@ func (p *PDUStreamProvider) IncrementalSync(
|
||||||
}
|
}
|
||||||
newPos = to
|
newPos = to
|
||||||
|
|
||||||
var err error
|
if err := p.DB.PDUIncrementalSync(ctx, req, r, from, to); err != nil {
|
||||||
var stateDeltas []types.StateDelta
|
req.Log.WithError(err).Error("p.DB.PDUIncrementalSync failed")
|
||||||
var joinedRooms []string
|
return from
|
||||||
|
|
||||||
stateFilter := req.Filter.Room.State
|
|
||||||
eventFilter := req.Filter.Room.Timeline
|
|
||||||
|
|
||||||
if req.WantFullState {
|
|
||||||
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
|
||||||
req.Log.WithError(err).Error("p.DB.GetStateDeltasForFullStateSync failed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltas(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
|
||||||
req.Log.WithError(err).Error("p.DB.GetStateDeltas failed")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, roomID := range joinedRooms {
|
|
||||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, delta := range stateDeltas {
|
|
||||||
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
|
|
||||||
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
|
||||||
return newPos
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.To
|
return r.To
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|
||||||
ctx context.Context,
|
|
||||||
device *userapi.Device,
|
|
||||||
r types.Range,
|
|
||||||
delta types.StateDelta,
|
|
||||||
eventFilter *gomatrixserverlib.RoomEventFilter,
|
|
||||||
res *types.Response,
|
|
||||||
) error {
|
|
||||||
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
|
|
||||||
// make sure we don't leak recent events after the leave event.
|
|
||||||
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
|
|
||||||
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
|
|
||||||
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
|
|
||||||
// in a single /sync request
|
|
||||||
// This is all "okay" assuming history_visibility == "shared" which it is by default.
|
|
||||||
r.To = delta.MembershipPos
|
|
||||||
}
|
|
||||||
recentStreamEvents, limited, err := p.DB.RecentEvents(
|
|
||||||
ctx, delta.RoomID, r,
|
|
||||||
eventFilter, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
|
||||||
delta.StateEvents = removeDuplicates(delta.StateEvents, recentEvents) // roll back
|
|
||||||
prevBatch, err := p.DB.GetBackwardTopologyPos(ctx, recentStreamEvents)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// XXX: should we ever get this far if we have no recent events or state in this room?
|
|
||||||
// in practice we do for peeks, but possibly not joins?
|
|
||||||
if len(recentEvents) == 0 && len(delta.StateEvents) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch delta.Membership {
|
|
||||||
case gomatrixserverlib.Join:
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = limited
|
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Join[delta.RoomID] = *jr
|
|
||||||
|
|
||||||
case gomatrixserverlib.Peek:
|
|
||||||
jr := types.NewJoinResponse()
|
|
||||||
jr.Timeline.PrevBatch = &prevBatch
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = limited
|
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Peek[delta.RoomID] = *jr
|
|
||||||
|
|
||||||
case gomatrixserverlib.Leave:
|
|
||||||
fallthrough // transitions to leave are the same as ban
|
|
||||||
|
|
||||||
case gomatrixserverlib.Ban:
|
|
||||||
// TODO: recentEvents may contain events that this user is not allowed to see because they are
|
|
||||||
// no longer in the room.
|
|
||||||
lr := types.NewLeaveResponse()
|
|
||||||
lr.Timeline.PrevBatch = &prevBatch
|
|
||||||
lr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
|
|
||||||
lr.State.Events = gomatrixserverlib.HeaderedToClientEvents(delta.StateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
res.Rooms.Leave[delta.RoomID] = *lr
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
|
||||||
ctx context.Context,
|
|
||||||
roomID string,
|
|
||||||
r types.Range,
|
|
||||||
stateFilter *gomatrixserverlib.StateFilter,
|
|
||||||
eventFilter *gomatrixserverlib.RoomEventFilter,
|
|
||||||
wantFullState bool,
|
|
||||||
device *userapi.Device,
|
|
||||||
) (jr *types.JoinResponse, err error) {
|
|
||||||
// TODO: When filters are added, we may need to call this multiple times to get enough events.
|
|
||||||
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
|
|
||||||
recentStreamEvents, limited, err := p.DB.RecentEvents(
|
|
||||||
ctx, roomID, r, eventFilter, true, true,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the event IDs of the stream events we fetched. There's no point in us
|
|
||||||
var excludingEventIDs []string
|
|
||||||
if !wantFullState {
|
|
||||||
excludingEventIDs = make([]string, 0, len(recentStreamEvents))
|
|
||||||
for _, event := range recentStreamEvents {
|
|
||||||
if event.StateKey() != nil {
|
|
||||||
excludingEventIDs = append(excludingEventIDs, event.EventID())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stateEvents, err := p.DB.CurrentState(ctx, roomID, stateFilter, excludingEventIDs)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
|
|
||||||
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
|
|
||||||
// which is equiv to history_visibility: joined
|
|
||||||
joinEventIndex := -1
|
|
||||||
for i := len(recentStreamEvents) - 1; i >= 0; i-- {
|
|
||||||
ev := recentStreamEvents[i]
|
|
||||||
if ev.Type() == gomatrixserverlib.MRoomMember && ev.StateKeyEquals(device.UserID) {
|
|
||||||
membership, _ := ev.Membership()
|
|
||||||
if membership == "join" {
|
|
||||||
joinEventIndex = i
|
|
||||||
if i > 0 {
|
|
||||||
// the create event happens before the first join, so we should cut it at that point instead
|
|
||||||
if recentStreamEvents[i-1].Type() == gomatrixserverlib.MRoomCreate && recentStreamEvents[i-1].StateKeyEquals("") {
|
|
||||||
joinEventIndex = i - 1
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if joinEventIndex != -1 {
|
|
||||||
// cut all events earlier than the join (but not the join itself)
|
|
||||||
recentStreamEvents = recentStreamEvents[joinEventIndex:]
|
|
||||||
limited = false // so clients know not to try to backpaginate
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve the backward topology position, i.e. the position of the
|
|
||||||
// oldest event in the room's topology.
|
|
||||||
var prevBatch *types.TopologyToken
|
|
||||||
if len(recentStreamEvents) > 0 {
|
|
||||||
var backwardTopologyPos, backwardStreamPos types.StreamPosition
|
|
||||||
backwardTopologyPos, backwardStreamPos, err = p.DB.PositionInTopology(ctx, recentStreamEvents[0].EventID())
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
prevBatch = &types.TopologyToken{
|
|
||||||
Depth: backwardTopologyPos,
|
|
||||||
PDUPosition: backwardStreamPos,
|
|
||||||
}
|
|
||||||
prevBatch.Decrement()
|
|
||||||
}
|
|
||||||
|
|
||||||
// We don't include a device here as we don't need to send down
|
|
||||||
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
|
|
||||||
// "Can sync a room with a message with a transaction id" - which does a complete sync to check.
|
|
||||||
recentEvents := p.DB.StreamEventsToEvents(device, recentStreamEvents)
|
|
||||||
stateEvents = removeDuplicates(stateEvents, recentEvents)
|
|
||||||
jr = types.NewJoinResponse()
|
|
||||||
jr.Timeline.PrevBatch = prevBatch
|
|
||||||
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
|
|
||||||
jr.Timeline.Limited = limited
|
|
||||||
jr.State.Events = gomatrixserverlib.HeaderedToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
|
|
||||||
return jr, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
|
||||||
for _, recentEv := range recentEvents {
|
|
||||||
if recentEv.StateKey() == nil {
|
|
||||||
continue // not a state event
|
|
||||||
}
|
|
||||||
// TODO: This is a linear scan over all the current state events in this room. This will
|
|
||||||
// be slow for big rooms. We should instead sort the state events by event ID (ORDER BY)
|
|
||||||
// then do a binary search to find matching events, similar to what roomserver does.
|
|
||||||
for j := 0; j < len(stateEvents); j++ {
|
|
||||||
if stateEvents[j].EventID() == recentEv.EventID() {
|
|
||||||
// overwrite the element to remove with the last element then pop the last element.
|
|
||||||
// This is orders of magnitude faster than re-slicing, but doesn't preserve ordering
|
|
||||||
// (we don't care about the order of stateEvents)
|
|
||||||
stateEvents[j] = stateEvents[len(stateEvents)-1]
|
|
||||||
stateEvents = stateEvents[:len(stateEvents)-1]
|
|
||||||
break // there shouldn't be multiple events with the same event ID
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return stateEvents
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue