mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-01 05:42:46 +00:00
CS API: Support for /messages, fixes for /sync (#847)
* Merge forward * Tidy up a bit * TODO: What to do with NextBatch here? * Replace SyncPosition with PaginationToken throughout syncapi * Fix PaginationTokens * Fix lint errors * Add a couple of missing functions into the syncapi external storage interface * Some updates based on review comments from @babolivier * Some updates based on review comments from @babolivier * argh whitespacing * Fix opentracing span * Remove dead code * Don't overshadow err (fix lint issue) * Handle extremities after inserting event into topology * Try insert event topology as ON CONFLICT DO NOTHING * Prevent OOB error in addRoomDeltaToResponse * Thwarted by gocyclo again * Fix NewPaginationTokenFromString, define unit test for it * Update pagination token test * Update sytest-whitelist * Hopefully fix some of the sync batch tokens * Remove extraneous sync position func * Revert to topology tokens in addRoomDeltaToResponse etc * Fix typo * Remove prevPDUPos as dead now that backwardTopologyPos is used instead * Fix selectEventsWithEventIDsSQL * Update sytest-blacklist * Update sytest-whitelist
This commit is contained in:
parent
43ecf8d1f9
commit
49f760a30b
27 changed files with 1601 additions and 286 deletions
|
@ -36,7 +36,7 @@ type Notifier struct {
|
|||
// Protects currPos and userStreams.
|
||||
streamLock *sync.Mutex
|
||||
// The latest sync position
|
||||
currPos types.SyncPosition
|
||||
currPos types.PaginationToken
|
||||
// A map of user_id => UserStream which can be used to wake a given user's /sync request.
|
||||
userStreams map[string]*UserStream
|
||||
// The last time we cleaned out stale entries from the userStreams map
|
||||
|
@ -46,7 +46,7 @@ type Notifier struct {
|
|||
// NewNotifier creates a new notifier set to the given sync position.
|
||||
// In order for this to be of any use, the Notifier needs to be told all rooms and
|
||||
// the joined users within each of them by calling Notifier.Load(*storage.SyncServerDatabase).
|
||||
func NewNotifier(pos types.SyncPosition) *Notifier {
|
||||
func NewNotifier(pos types.PaginationToken) *Notifier {
|
||||
return &Notifier{
|
||||
currPos: pos,
|
||||
roomIDToJoinedUsers: make(map[string]userIDSet),
|
||||
|
@ -68,7 +68,7 @@ func NewNotifier(pos types.SyncPosition) *Notifier {
|
|||
// event type it handles, leaving other fields as 0.
|
||||
func (n *Notifier) OnNewEvent(
|
||||
ev *gomatrixserverlib.Event, roomID string, userIDs []string,
|
||||
posUpdate types.SyncPosition,
|
||||
posUpdate types.PaginationToken,
|
||||
) {
|
||||
// update the current position then notify relevant /sync streams.
|
||||
// This needs to be done PRIOR to waking up users as they will read this value.
|
||||
|
@ -151,7 +151,7 @@ func (n *Notifier) Load(ctx context.Context, db storage.Database) error {
|
|||
}
|
||||
|
||||
// CurrentPosition returns the current sync position
|
||||
func (n *Notifier) CurrentPosition() types.SyncPosition {
|
||||
func (n *Notifier) CurrentPosition() types.PaginationToken {
|
||||
n.streamLock.Lock()
|
||||
defer n.streamLock.Unlock()
|
||||
|
||||
|
@ -173,7 +173,7 @@ func (n *Notifier) setUsersJoinedToRooms(roomIDToUserIDs map[string][]string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Notifier) wakeupUsers(userIDs []string, newPos types.SyncPosition) {
|
||||
func (n *Notifier) wakeupUsers(userIDs []string, newPos types.PaginationToken) {
|
||||
for _, userID := range userIDs {
|
||||
stream := n.fetchUserStream(userID, false)
|
||||
if stream != nil {
|
||||
|
|
|
@ -32,11 +32,11 @@ var (
|
|||
randomMessageEvent gomatrixserverlib.Event
|
||||
aliceInviteBobEvent gomatrixserverlib.Event
|
||||
bobLeaveEvent gomatrixserverlib.Event
|
||||
syncPositionVeryOld types.SyncPosition
|
||||
syncPositionBefore types.SyncPosition
|
||||
syncPositionAfter types.SyncPosition
|
||||
syncPositionNewEDU types.SyncPosition
|
||||
syncPositionAfter2 types.SyncPosition
|
||||
syncPositionVeryOld types.PaginationToken
|
||||
syncPositionBefore types.PaginationToken
|
||||
syncPositionAfter types.PaginationToken
|
||||
syncPositionNewEDU types.PaginationToken
|
||||
syncPositionAfter2 types.PaginationToken
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -46,9 +46,9 @@ var (
|
|||
)
|
||||
|
||||
func init() {
|
||||
baseSyncPos := types.SyncPosition{
|
||||
PDUPosition: 0,
|
||||
TypingPosition: 0,
|
||||
baseSyncPos := types.PaginationToken{
|
||||
PDUPosition: 0,
|
||||
EDUTypingPosition: 0,
|
||||
}
|
||||
|
||||
syncPositionVeryOld = baseSyncPos
|
||||
|
@ -61,7 +61,7 @@ func init() {
|
|||
syncPositionAfter.PDUPosition = 12
|
||||
|
||||
syncPositionNewEDU = syncPositionAfter
|
||||
syncPositionNewEDU.TypingPosition = 1
|
||||
syncPositionNewEDU.EDUTypingPosition = 1
|
||||
|
||||
syncPositionAfter2 = baseSyncPos
|
||||
syncPositionAfter2.PDUPosition = 13
|
||||
|
@ -119,7 +119,7 @@ func TestImmediateNotification(t *testing.T) {
|
|||
t.Fatalf("TestImmediateNotification error: %s", err)
|
||||
}
|
||||
if pos != syncPositionBefore {
|
||||
t.Fatalf("TestImmediateNotification want %d, got %d", syncPositionBefore, pos)
|
||||
t.Fatalf("TestImmediateNotification want %v, got %v", syncPositionBefore, pos)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,7 +138,7 @@ func TestNewEventAndJoinedToRoom(t *testing.T) {
|
|||
t.Errorf("TestNewEventAndJoinedToRoom error: %s", err)
|
||||
}
|
||||
if pos != syncPositionAfter {
|
||||
t.Errorf("TestNewEventAndJoinedToRoom want %d, got %d", syncPositionAfter, pos)
|
||||
t.Errorf("TestNewEventAndJoinedToRoom want %v, got %v", syncPositionAfter, pos)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
@ -166,7 +166,7 @@ func TestNewInviteEventForUser(t *testing.T) {
|
|||
t.Errorf("TestNewInviteEventForUser error: %s", err)
|
||||
}
|
||||
if pos != syncPositionAfter {
|
||||
t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionAfter, pos)
|
||||
t.Errorf("TestNewInviteEventForUser want %v, got %v", syncPositionAfter, pos)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
@ -194,7 +194,7 @@ func TestEDUWakeup(t *testing.T) {
|
|||
t.Errorf("TestNewInviteEventForUser error: %s", err)
|
||||
}
|
||||
if pos != syncPositionNewEDU {
|
||||
t.Errorf("TestNewInviteEventForUser want %d, got %d", syncPositionNewEDU, pos)
|
||||
t.Errorf("TestNewInviteEventForUser want %v, got %v", syncPositionNewEDU, pos)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
@ -222,7 +222,7 @@ func TestMultipleRequestWakeup(t *testing.T) {
|
|||
t.Errorf("TestMultipleRequestWakeup error: %s", err)
|
||||
}
|
||||
if pos != syncPositionAfter {
|
||||
t.Errorf("TestMultipleRequestWakeup want %d, got %d", syncPositionAfter, pos)
|
||||
t.Errorf("TestMultipleRequestWakeup want %v, got %v", syncPositionAfter, pos)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
@ -262,7 +262,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
||||
}
|
||||
if pos != syncPositionAfter {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter, pos)
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %v, got %v", syncPositionAfter, pos)
|
||||
}
|
||||
leaveWG.Done()
|
||||
}()
|
||||
|
@ -281,7 +281,7 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom error: %s", err)
|
||||
}
|
||||
if pos != syncPositionAfter2 {
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %d, got %d", syncPositionAfter2, pos)
|
||||
t.Errorf("TestNewEventAndWasPreviouslyJoinedToRoom want %v, got %v", syncPositionAfter2, pos)
|
||||
}
|
||||
aliceWG.Done()
|
||||
}()
|
||||
|
@ -305,14 +305,14 @@ func TestNewEventAndWasPreviouslyJoinedToRoom(t *testing.T) {
|
|||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
func waitForEvents(n *Notifier, req syncRequest) (types.SyncPosition, error) {
|
||||
func waitForEvents(n *Notifier, req syncRequest) (types.PaginationToken, error) {
|
||||
listener := n.GetListener(req)
|
||||
defer listener.Close()
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
return types.SyncPosition{}, fmt.Errorf(
|
||||
"waitForEvents timed out waiting for %s (pos=%d)", req.device.UserID, req.since,
|
||||
return types.PaginationToken{}, fmt.Errorf(
|
||||
"waitForEvents timed out waiting for %s (pos=%v)", req.device.UserID, req.since,
|
||||
)
|
||||
case <-listener.GetNotifyChannel(*req.since):
|
||||
p := listener.GetSyncPosition()
|
||||
|
@ -337,7 +337,7 @@ func lockedFetchUserStream(n *Notifier, userID string) *UserStream {
|
|||
return n.fetchUserStream(userID, true)
|
||||
}
|
||||
|
||||
func newTestSyncRequest(userID string, since types.SyncPosition) syncRequest {
|
||||
func newTestSyncRequest(userID string, since types.PaginationToken) syncRequest {
|
||||
return syncRequest{
|
||||
device: authtypes.Device{UserID: userID},
|
||||
timeout: 1 * time.Minute,
|
||||
|
|
|
@ -16,10 +16,8 @@ package sync
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
|
||||
|
@ -38,7 +36,7 @@ type syncRequest struct {
|
|||
device authtypes.Device
|
||||
limit int
|
||||
timeout time.Duration
|
||||
since *types.SyncPosition // nil means that no since token was supplied
|
||||
since *types.PaginationToken // nil means that no since token was supplied
|
||||
wantFullState bool
|
||||
log *log.Entry
|
||||
}
|
||||
|
@ -47,7 +45,7 @@ func newSyncRequest(req *http.Request, device authtypes.Device) (*syncRequest, e
|
|||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||
fullState := req.URL.Query().Get("full_state")
|
||||
wantFullState := fullState != "" && fullState != "false"
|
||||
since, err := getSyncStreamPosition(req.URL.Query().Get("since"))
|
||||
since, err := getPaginationToken(req.URL.Query().Get("since"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -75,41 +73,14 @@ func getTimeout(timeoutMS string) time.Duration {
|
|||
}
|
||||
|
||||
// getSyncStreamPosition tries to parse a 'since' token taken from the API to a
|
||||
// types.SyncPosition. If the string is empty then (nil, nil) is returned.
|
||||
// types.PaginationToken. If the string is empty then (nil, nil) is returned.
|
||||
// There are two forms of tokens: The full length form containing all PDU and EDU
|
||||
// positions separated by "_", and the short form containing only the PDU
|
||||
// position. Short form can be used for, e.g., `prev_batch` tokens.
|
||||
func getSyncStreamPosition(since string) (*types.SyncPosition, error) {
|
||||
func getPaginationToken(since string) (*types.PaginationToken, error) {
|
||||
if since == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
posStrings := strings.Split(since, "_")
|
||||
if len(posStrings) != 2 && len(posStrings) != 1 {
|
||||
// A token can either be full length or short (PDU-only).
|
||||
return nil, errors.New("malformed batch token")
|
||||
}
|
||||
|
||||
positions := make([]int64, len(posStrings))
|
||||
for i, posString := range posStrings {
|
||||
pos, err := strconv.ParseInt(posString, 10, 64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
positions[i] = pos
|
||||
}
|
||||
|
||||
if len(positions) == 2 {
|
||||
// Full length token; construct SyncPosition with every entry in
|
||||
// `positions`. These entries must have the same order with the fields
|
||||
// in struct SyncPosition, so we disable the govet check below.
|
||||
return &types.SyncPosition{ //nolint:govet
|
||||
positions[0], positions[1],
|
||||
}, nil
|
||||
} else {
|
||||
// Token with PDU position only
|
||||
return &types.SyncPosition{
|
||||
PDUPosition: positions[0],
|
||||
}, nil
|
||||
}
|
||||
return types.NewPaginationTokenFromString(since)
|
||||
}
|
||||
|
|
|
@ -130,7 +130,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
|||
}
|
||||
}
|
||||
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncPosition) (res *types.Response, err error) {
|
||||
func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.PaginationToken) (res *types.Response, err error) {
|
||||
// TODO: handle ignored users
|
||||
if req.since == nil {
|
||||
res, err = rp.db.CompleteSync(req.ctx, req.device.UserID, req.limit)
|
||||
|
@ -143,7 +143,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.SyncP
|
|||
}
|
||||
|
||||
accountDataFilter := gomatrix.DefaultFilterPart() // TODO: use filter provided in req instead
|
||||
res, err = rp.appendAccountData(res, req.device.UserID, req, latestPos.PDUPosition, &accountDataFilter)
|
||||
res, err = rp.appendAccountData(res, req.device.UserID, req, int64(latestPos.PDUPosition), &accountDataFilter)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -183,7 +183,11 @@ func (rp *RequestPool) appendAccountData(
|
|||
}
|
||||
|
||||
// Sync is not initial, get all account data since the latest sync
|
||||
dataTypes, err := rp.db.GetAccountDataInRange(req.ctx, userID, req.since.PDUPosition, currentPos, accountDataFilter)
|
||||
dataTypes, err := rp.db.GetAccountDataInRange(
|
||||
req.ctx, userID,
|
||||
types.StreamPosition(req.since.PDUPosition), types.StreamPosition(currentPos),
|
||||
accountDataFilter,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ type UserStream struct {
|
|||
// Closed when there is an update.
|
||||
signalChannel chan struct{}
|
||||
// The last sync position that there may have been an update for the user
|
||||
pos types.SyncPosition
|
||||
pos types.PaginationToken
|
||||
// The last time when we had some listeners waiting
|
||||
timeOfLastChannel time.Time
|
||||
// The number of listeners waiting
|
||||
|
@ -51,7 +51,7 @@ type UserStreamListener struct {
|
|||
}
|
||||
|
||||
// NewUserStream creates a new user stream
|
||||
func NewUserStream(userID string, currPos types.SyncPosition) *UserStream {
|
||||
func NewUserStream(userID string, currPos types.PaginationToken) *UserStream {
|
||||
return &UserStream{
|
||||
UserID: userID,
|
||||
timeOfLastChannel: time.Now(),
|
||||
|
@ -85,7 +85,7 @@ func (s *UserStream) GetListener(ctx context.Context) UserStreamListener {
|
|||
}
|
||||
|
||||
// Broadcast a new sync position for this user.
|
||||
func (s *UserStream) Broadcast(pos types.SyncPosition) {
|
||||
func (s *UserStream) Broadcast(pos types.PaginationToken) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
|
@ -120,7 +120,7 @@ func (s *UserStream) TimeOfLastNonEmpty() time.Time {
|
|||
|
||||
// GetStreamPosition returns last sync position which the UserStream was
|
||||
// notified about
|
||||
func (s *UserStreamListener) GetSyncPosition() types.SyncPosition {
|
||||
func (s *UserStreamListener) GetSyncPosition() types.PaginationToken {
|
||||
s.userStream.lock.Lock()
|
||||
defer s.userStream.lock.Unlock()
|
||||
|
||||
|
@ -132,7 +132,7 @@ func (s *UserStreamListener) GetSyncPosition() types.SyncPosition {
|
|||
// sincePos specifies from which point we want to be notified about. If there
|
||||
// has already been an update after sincePos we'll return a closed channel
|
||||
// immediately.
|
||||
func (s *UserStreamListener) GetNotifyChannel(sincePos types.SyncPosition) <-chan struct{} {
|
||||
func (s *UserStreamListener) GetNotifyChannel(sincePos types.PaginationToken) <-chan struct{} {
|
||||
s.userStream.lock.Lock()
|
||||
defer s.userStream.lock.Unlock()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue