2017-04-20 22:40:52 +00:00
|
|
|
// Copyright 2017 Vector Creations Ltd
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-04-12 15:06:26 +00:00
|
|
|
package types
|
2017-04-11 10:52:26 +00:00
|
|
|
|
|
|
|
import (
|
2023-09-15 15:25:09 +00:00
|
|
|
"context"
|
2017-09-22 10:34:54 +00:00
|
|
|
"encoding/json"
|
2022-01-20 15:26:45 +00:00
|
|
|
"errors"
|
2020-01-23 17:51:10 +00:00
|
|
|
"fmt"
|
2017-04-13 15:56:46 +00:00
|
|
|
"strconv"
|
2020-01-23 17:51:10 +00:00
|
|
|
"strings"
|
2017-04-19 15:04:01 +00:00
|
|
|
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
2023-04-19 14:50:33 +00:00
|
|
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
2020-04-24 15:30:25 +00:00
|
|
|
"github.com/tidwall/gjson"
|
2022-08-09 08:40:46 +00:00
|
|
|
|
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
2023-04-27 11:54:20 +00:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/types"
|
2023-04-04 17:16:53 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
2017-04-11 10:52:26 +00:00
|
|
|
)
|
|
|
|
|
2020-01-23 17:51:10 +00:00
|
|
|
var (
|
2022-01-20 15:26:45 +00:00
|
|
|
// This error is returned when parsing sync tokens if the token is invalid. Callers can use this
|
|
|
|
// error to detect whether to 400 or 401 the client. It is recommended to 401 them to force a
|
|
|
|
// logout.
|
|
|
|
ErrMalformedSyncToken = errors.New("malformed sync token")
|
2020-01-23 17:51:10 +00:00
|
|
|
)
|
|
|
|
|
2021-01-08 16:59:06 +00:00
|
|
|
type StateDelta struct {
|
|
|
|
RoomID string
|
2023-04-27 11:54:20 +00:00
|
|
|
StateEvents []*types.HeaderedEvent
|
2022-08-25 12:42:47 +00:00
|
|
|
NewlyJoined bool
|
2021-01-08 16:59:06 +00:00
|
|
|
Membership string
|
|
|
|
// The PDU stream position of the latest membership event for this user, if applicable.
|
|
|
|
// Can be 0 if there is no membership event in this delta.
|
|
|
|
MembershipPos StreamPosition
|
|
|
|
}
|
|
|
|
|
2020-01-23 17:51:10 +00:00
|
|
|
// StreamPosition represents the offset in the sync stream a client is at.
|
|
|
|
type StreamPosition int64
|
|
|
|
|
2022-10-13 13:50:52 +00:00
|
|
|
func NewStreamPositionFromString(s string) (StreamPosition, error) {
|
|
|
|
n, err := strconv.Atoi(s)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return StreamPosition(n), nil
|
|
|
|
}
|
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
// StreamEvent is the same as gomatrixserverlib.Event but also has the PDU stream position for this event.
|
2020-01-23 17:51:10 +00:00
|
|
|
type StreamEvent struct {
|
2023-04-27 11:54:20 +00:00
|
|
|
*types.HeaderedEvent
|
2020-01-23 17:51:10 +00:00
|
|
|
StreamPosition StreamPosition
|
|
|
|
TransactionID *api.TransactionID
|
|
|
|
ExcludeFromSync bool
|
2019-07-12 14:59:53 +00:00
|
|
|
}
|
2017-04-11 10:52:26 +00:00
|
|
|
|
2023-02-07 13:31:23 +00:00
|
|
|
type RecentEvents struct {
|
|
|
|
Limited bool
|
|
|
|
Events []StreamEvent
|
|
|
|
}
|
|
|
|
|
2020-05-15 08:41:12 +00:00
|
|
|
// Range represents a range between two stream positions.
|
|
|
|
type Range struct {
|
|
|
|
// From is the position the client has already received.
|
|
|
|
From StreamPosition
|
|
|
|
// To is the position the client is going towards.
|
|
|
|
To StreamPosition
|
|
|
|
// True if the client is going backwards
|
|
|
|
Backwards bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// Low returns the low number of the range.
|
|
|
|
// This represents the position the client already has and hence is exclusive.
|
|
|
|
func (r *Range) Low() StreamPosition {
|
|
|
|
if !r.Backwards {
|
|
|
|
return r.From
|
|
|
|
}
|
|
|
|
return r.To
|
|
|
|
}
|
|
|
|
|
|
|
|
// High returns the high number of the range
|
|
|
|
// This represents the position the client is going towards and hence is inclusive.
|
|
|
|
func (r *Range) High() StreamPosition {
|
|
|
|
if !r.Backwards {
|
|
|
|
return r.To
|
|
|
|
}
|
|
|
|
return r.From
|
|
|
|
}
|
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
// SyncTokenType represents the type of a sync token.
|
2020-01-23 17:51:10 +00:00
|
|
|
// It can be either "s" (representing a position in the whole stream of events)
|
|
|
|
// or "t" (representing a position in a room's topology/depth).
|
2020-05-13 11:14:50 +00:00
|
|
|
type SyncTokenType string
|
2020-01-23 17:51:10 +00:00
|
|
|
|
|
|
|
const (
|
2020-05-13 11:14:50 +00:00
|
|
|
// SyncTokenTypeStream represents a position in the server's whole
|
2020-01-23 17:51:10 +00:00
|
|
|
// stream of events
|
2020-05-13 11:14:50 +00:00
|
|
|
SyncTokenTypeStream SyncTokenType = "s"
|
|
|
|
// SyncTokenTypeTopology represents a position in a room's topology.
|
|
|
|
SyncTokenTypeTopology SyncTokenType = "t"
|
2020-01-23 17:51:10 +00:00
|
|
|
)
|
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
type StreamingToken struct {
|
2022-03-03 11:40:53 +00:00
|
|
|
PDUPosition StreamPosition
|
|
|
|
TypingPosition StreamPosition
|
|
|
|
ReceiptPosition StreamPosition
|
|
|
|
SendToDevicePosition StreamPosition
|
|
|
|
InvitePosition StreamPosition
|
|
|
|
AccountDataPosition StreamPosition
|
|
|
|
DeviceListPosition StreamPosition
|
|
|
|
NotificationDataPosition StreamPosition
|
2022-04-06 11:11:19 +00:00
|
|
|
PresencePosition StreamPosition
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
|
|
|
|
2020-12-18 11:11:21 +00:00
|
|
|
// This will be used as a fallback by json.Marshal.
|
|
|
|
func (s StreamingToken) MarshalText() ([]byte, error) {
|
|
|
|
return []byte(s.String()), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// This will be used as a fallback by json.Unmarshal.
|
|
|
|
func (s *StreamingToken) UnmarshalText(text []byte) (err error) {
|
|
|
|
*s, err = NewStreamTokenFromString(string(text))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-12-10 18:57:10 +00:00
|
|
|
func (t StreamingToken) String() string {
|
|
|
|
posStr := fmt.Sprintf(
|
2022-04-06 11:11:19 +00:00
|
|
|
"s%d_%d_%d_%d_%d_%d_%d_%d_%d",
|
2020-12-10 18:57:10 +00:00
|
|
|
t.PDUPosition, t.TypingPosition,
|
|
|
|
t.ReceiptPosition, t.SendToDevicePosition,
|
2022-03-03 11:40:53 +00:00
|
|
|
t.InvitePosition, t.AccountDataPosition,
|
|
|
|
t.DeviceListPosition, t.NotificationDataPosition,
|
2022-04-06 11:11:19 +00:00
|
|
|
t.PresencePosition,
|
2020-12-10 18:57:10 +00:00
|
|
|
)
|
2020-12-15 15:09:10 +00:00
|
|
|
return posStr
|
2020-06-19 12:29:27 +00:00
|
|
|
}
|
2020-05-13 11:14:50 +00:00
|
|
|
|
|
|
|
// IsAfter returns true if ANY position in this token is greater than `other`.
|
|
|
|
func (t *StreamingToken) IsAfter(other StreamingToken) bool {
|
2020-12-10 18:57:10 +00:00
|
|
|
switch {
|
|
|
|
case t.PDUPosition > other.PDUPosition:
|
|
|
|
return true
|
|
|
|
case t.TypingPosition > other.TypingPosition:
|
|
|
|
return true
|
|
|
|
case t.ReceiptPosition > other.ReceiptPosition:
|
|
|
|
return true
|
|
|
|
case t.SendToDevicePosition > other.SendToDevicePosition:
|
|
|
|
return true
|
2020-12-18 11:11:21 +00:00
|
|
|
case t.InvitePosition > other.InvitePosition:
|
|
|
|
return true
|
2021-01-08 16:59:06 +00:00
|
|
|
case t.AccountDataPosition > other.AccountDataPosition:
|
|
|
|
return true
|
2022-01-20 15:26:45 +00:00
|
|
|
case t.DeviceListPosition > other.DeviceListPosition:
|
2020-12-15 15:09:10 +00:00
|
|
|
return true
|
2022-03-03 11:40:53 +00:00
|
|
|
case t.NotificationDataPosition > other.NotificationDataPosition:
|
|
|
|
return true
|
2022-04-06 11:11:19 +00:00
|
|
|
case t.PresencePosition > other.PresencePosition:
|
|
|
|
return true
|
2020-07-29 18:00:04 +00:00
|
|
|
}
|
2020-05-13 11:14:50 +00:00
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2020-12-10 18:57:10 +00:00
|
|
|
func (t *StreamingToken) IsEmpty() bool {
|
2022-04-06 11:11:19 +00:00
|
|
|
return t == nil || t.PDUPosition+t.TypingPosition+t.ReceiptPosition+t.SendToDevicePosition+t.InvitePosition+t.AccountDataPosition+t.DeviceListPosition+t.NotificationDataPosition+t.PresencePosition == 0
|
2020-12-10 18:57:10 +00:00
|
|
|
}
|
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
// WithUpdates returns a copy of the StreamingToken with updates applied from another StreamingToken.
|
|
|
|
// If the latter StreamingToken contains a field that is not 0, it is considered an update,
|
|
|
|
// and its value will replace the corresponding value in the StreamingToken on which WithUpdates is called.
|
2020-07-29 18:00:04 +00:00
|
|
|
// If the other token has a log, they will replace any existing log on this token.
|
2020-12-18 11:11:21 +00:00
|
|
|
func (t *StreamingToken) WithUpdates(other StreamingToken) StreamingToken {
|
|
|
|
ret := *t
|
|
|
|
ret.ApplyUpdates(other)
|
2020-05-13 11:14:50 +00:00
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
2020-12-18 11:11:21 +00:00
|
|
|
// ApplyUpdates applies any changes from the supplied StreamingToken. If the supplied
|
|
|
|
// streaming token contains any positions that are not 0, they are considered updates
|
|
|
|
// and will overwrite the value in the token.
|
|
|
|
func (t *StreamingToken) ApplyUpdates(other StreamingToken) {
|
2021-01-09 11:25:09 +00:00
|
|
|
if other.PDUPosition > t.PDUPosition {
|
2020-12-18 11:11:21 +00:00
|
|
|
t.PDUPosition = other.PDUPosition
|
|
|
|
}
|
2021-01-09 11:25:09 +00:00
|
|
|
if other.TypingPosition > t.TypingPosition {
|
2020-12-18 11:11:21 +00:00
|
|
|
t.TypingPosition = other.TypingPosition
|
|
|
|
}
|
2021-01-09 11:25:09 +00:00
|
|
|
if other.ReceiptPosition > t.ReceiptPosition {
|
2020-12-18 11:11:21 +00:00
|
|
|
t.ReceiptPosition = other.ReceiptPosition
|
|
|
|
}
|
2021-01-09 11:25:09 +00:00
|
|
|
if other.SendToDevicePosition > t.SendToDevicePosition {
|
2020-12-18 11:11:21 +00:00
|
|
|
t.SendToDevicePosition = other.SendToDevicePosition
|
|
|
|
}
|
2021-01-09 11:25:09 +00:00
|
|
|
if other.InvitePosition > t.InvitePosition {
|
2020-12-18 11:11:21 +00:00
|
|
|
t.InvitePosition = other.InvitePosition
|
|
|
|
}
|
2021-01-09 11:25:09 +00:00
|
|
|
if other.AccountDataPosition > t.AccountDataPosition {
|
2021-01-08 16:59:06 +00:00
|
|
|
t.AccountDataPosition = other.AccountDataPosition
|
|
|
|
}
|
2022-01-20 15:26:45 +00:00
|
|
|
if other.DeviceListPosition > t.DeviceListPosition {
|
2020-12-18 11:11:21 +00:00
|
|
|
t.DeviceListPosition = other.DeviceListPosition
|
|
|
|
}
|
2022-03-03 11:40:53 +00:00
|
|
|
if other.NotificationDataPosition > t.NotificationDataPosition {
|
|
|
|
t.NotificationDataPosition = other.NotificationDataPosition
|
|
|
|
}
|
2022-04-06 11:11:19 +00:00
|
|
|
if other.PresencePosition > t.PresencePosition {
|
|
|
|
t.PresencePosition = other.PresencePosition
|
|
|
|
}
|
2020-12-18 11:11:21 +00:00
|
|
|
}
|
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
type TopologyToken struct {
|
2020-12-10 18:57:10 +00:00
|
|
|
Depth StreamPosition
|
|
|
|
PDUPosition StreamPosition
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
|
|
|
|
2020-12-18 11:11:21 +00:00
|
|
|
// This will be used as a fallback by json.Marshal.
|
|
|
|
func (t TopologyToken) MarshalText() ([]byte, error) {
|
|
|
|
return []byte(t.String()), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// This will be used as a fallback by json.Unmarshal.
|
|
|
|
func (t *TopologyToken) UnmarshalText(text []byte) (err error) {
|
|
|
|
*t, err = NewTopologyTokenFromString(string(text))
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
func (t *TopologyToken) StreamToken() StreamingToken {
|
2020-12-10 18:57:10 +00:00
|
|
|
return StreamingToken{
|
|
|
|
PDUPosition: t.PDUPosition,
|
|
|
|
}
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
|
|
|
|
func (t TopologyToken) String() string {
|
2022-10-25 10:39:10 +00:00
|
|
|
if t.Depth <= 0 && t.PDUPosition <= 0 {
|
|
|
|
return ""
|
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
return fmt.Sprintf("t%d_%d", t.Depth, t.PDUPosition)
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Decrement the topology token to one event earlier.
|
|
|
|
func (t *TopologyToken) Decrement() {
|
2020-12-10 18:57:10 +00:00
|
|
|
depth := t.Depth
|
|
|
|
pduPos := t.PDUPosition
|
2020-05-13 11:14:50 +00:00
|
|
|
if depth-1 <= 0 {
|
2020-05-14 16:30:16 +00:00
|
|
|
// nothing can be lower than this
|
2020-05-13 11:14:50 +00:00
|
|
|
depth = 1
|
|
|
|
} else {
|
2020-05-14 16:30:16 +00:00
|
|
|
// this assumes that we will never have 1000 events all with the same
|
|
|
|
// depth. TODO: work out what the right PDU position is to use, probably needs a db hit.
|
2020-05-13 11:14:50 +00:00
|
|
|
depth--
|
|
|
|
pduPos += 1000
|
|
|
|
}
|
|
|
|
// The lowest token value is 1, therefore we need to manually set it to that
|
|
|
|
// value if we're below it.
|
|
|
|
if depth < 1 {
|
|
|
|
depth = 1
|
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
t.Depth = depth
|
|
|
|
t.PDUPosition = pduPos
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewTopologyTokenFromString(tok string) (token TopologyToken, err error) {
|
2020-12-10 18:57:10 +00:00
|
|
|
if len(tok) < 1 {
|
|
|
|
err = fmt.Errorf("empty topology token")
|
2020-05-13 11:14:50 +00:00
|
|
|
return
|
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
if tok[0] != SyncTokenTypeTopology[0] {
|
|
|
|
err = fmt.Errorf("topology token must start with 't'")
|
2020-05-13 11:14:50 +00:00
|
|
|
return
|
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
parts := strings.Split(tok[1:], "_")
|
|
|
|
var positions [2]StreamPosition
|
|
|
|
for i, p := range parts {
|
|
|
|
if i > len(positions) {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
var pos int
|
|
|
|
pos, err = strconv.Atoi(p)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
positions[i] = StreamPosition(pos)
|
2020-07-29 18:00:04 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
token = TopologyToken{
|
|
|
|
Depth: positions[0],
|
|
|
|
PDUPosition: positions[1],
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
return
|
2020-05-13 11:14:50 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
|
2020-05-13 11:14:50 +00:00
|
|
|
func NewStreamTokenFromString(tok string) (token StreamingToken, err error) {
|
2020-12-10 18:57:10 +00:00
|
|
|
if len(tok) < 1 {
|
2022-01-20 15:26:45 +00:00
|
|
|
err = ErrMalformedSyncToken
|
2020-05-13 11:14:50 +00:00
|
|
|
return
|
2019-07-12 14:59:53 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
if tok[0] != SyncTokenTypeStream[0] {
|
2022-01-20 15:26:45 +00:00
|
|
|
err = ErrMalformedSyncToken
|
2020-05-13 11:14:50 +00:00
|
|
|
return
|
|
|
|
}
|
2022-01-20 15:26:45 +00:00
|
|
|
// Migration: Remove everything after and including '.' - we previously had tokens like:
|
|
|
|
// s478_0_0_0_0_13.dl-0-2 but we have now removed partitioned stream positions
|
|
|
|
tok = strings.Split(tok, ".")[0]
|
|
|
|
parts := strings.Split(tok[1:], "_")
|
2022-04-06 11:11:19 +00:00
|
|
|
var positions [9]StreamPosition
|
2020-12-10 18:57:10 +00:00
|
|
|
for i, p := range parts {
|
2022-02-17 13:25:41 +00:00
|
|
|
if i >= len(positions) {
|
2020-12-10 18:57:10 +00:00
|
|
|
break
|
2020-07-29 18:00:04 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
var pos int
|
|
|
|
pos, err = strconv.Atoi(p)
|
|
|
|
if err != nil {
|
2022-01-20 15:26:45 +00:00
|
|
|
err = ErrMalformedSyncToken
|
2020-12-10 18:57:10 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
positions[i] = StreamPosition(pos)
|
2020-07-29 18:00:04 +00:00
|
|
|
}
|
2020-12-10 18:57:10 +00:00
|
|
|
token = StreamingToken{
|
2022-03-03 11:40:53 +00:00
|
|
|
PDUPosition: positions[0],
|
|
|
|
TypingPosition: positions[1],
|
|
|
|
ReceiptPosition: positions[2],
|
|
|
|
SendToDevicePosition: positions[3],
|
|
|
|
InvitePosition: positions[4],
|
|
|
|
AccountDataPosition: positions[5],
|
|
|
|
DeviceListPosition: positions[6],
|
|
|
|
NotificationDataPosition: positions[7],
|
2022-04-06 11:11:19 +00:00
|
|
|
PresencePosition: positions[8],
|
2020-12-10 18:57:10 +00:00
|
|
|
}
|
|
|
|
return token, nil
|
2020-01-23 17:51:10 +00:00
|
|
|
}
|
|
|
|
|
2022-10-05 12:47:13 +00:00
|
|
|
type DeviceLists struct {
|
|
|
|
Changed []string `json:"changed,omitempty"`
|
|
|
|
Left []string `json:"left,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type RoomsResponse struct {
|
|
|
|
Join map[string]*JoinResponse `json:"join,omitempty"`
|
|
|
|
Peek map[string]*JoinResponse `json:"peek,omitempty"`
|
|
|
|
Invite map[string]*InviteResponse `json:"invite,omitempty"`
|
|
|
|
Leave map[string]*LeaveResponse `json:"leave,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type ToDeviceResponse struct {
|
|
|
|
Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
|
|
|
|
}
|
|
|
|
|
2017-04-11 10:52:26 +00:00
|
|
|
// Response represents a /sync API response. See https://matrix.org/docs/spec/client_server/r0.2.0.html#get-matrix-client-r0-sync
|
|
|
|
type Response struct {
|
2022-10-05 12:47:13 +00:00
|
|
|
NextBatch StreamingToken `json:"next_batch"`
|
|
|
|
AccountData *ClientEvents `json:"account_data,omitempty"`
|
|
|
|
Presence *ClientEvents `json:"presence,omitempty"`
|
|
|
|
Rooms *RoomsResponse `json:"rooms,omitempty"`
|
|
|
|
ToDevice *ToDeviceResponse `json:"to_device,omitempty"`
|
|
|
|
DeviceLists *DeviceLists `json:"device_lists,omitempty"`
|
|
|
|
DeviceListsOTKCount map[string]int `json:"device_one_time_keys_count,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r Response) MarshalJSON() ([]byte, error) {
|
|
|
|
type alias Response
|
|
|
|
a := alias(r)
|
|
|
|
if r.AccountData != nil && len(r.AccountData.Events) == 0 {
|
|
|
|
a.AccountData = nil
|
|
|
|
}
|
|
|
|
if r.Presence != nil && len(r.Presence.Events) == 0 {
|
|
|
|
a.Presence = nil
|
|
|
|
}
|
|
|
|
if r.DeviceLists != nil {
|
|
|
|
if len(r.DeviceLists.Left) == 0 && len(r.DeviceLists.Changed) == 0 {
|
|
|
|
a.DeviceLists = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if r.Rooms != nil {
|
|
|
|
if len(r.Rooms.Join) == 0 && len(r.Rooms.Peek) == 0 &&
|
|
|
|
len(r.Rooms.Invite) == 0 && len(r.Rooms.Leave) == 0 {
|
|
|
|
a.Rooms = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if r.ToDevice != nil && len(r.ToDevice.Events) == 0 {
|
|
|
|
a.ToDevice = nil
|
|
|
|
}
|
|
|
|
return json.Marshal(a)
|
2017-04-11 10:52:26 +00:00
|
|
|
}
|
|
|
|
|
2022-05-19 08:00:56 +00:00
|
|
|
func (r *Response) HasUpdates() bool {
|
|
|
|
// purposefully exclude DeviceListsOTKCount as we always include them
|
|
|
|
return (len(r.AccountData.Events) > 0 ||
|
|
|
|
len(r.Presence.Events) > 0 ||
|
|
|
|
len(r.Rooms.Invite) > 0 ||
|
|
|
|
len(r.Rooms.Join) > 0 ||
|
|
|
|
len(r.Rooms.Leave) > 0 ||
|
|
|
|
len(r.Rooms.Peek) > 0 ||
|
|
|
|
len(r.ToDevice.Events) > 0 ||
|
|
|
|
len(r.DeviceLists.Changed) > 0 ||
|
|
|
|
len(r.DeviceLists.Left) > 0)
|
|
|
|
}
|
|
|
|
|
2017-04-11 10:52:26 +00:00
|
|
|
// NewResponse creates an empty response with initialised maps.
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
func NewResponse() *Response {
|
|
|
|
res := Response{}
|
2017-06-12 17:30:47 +00:00
|
|
|
// Pre-initialise the maps. Synapse will return {} even if there are no rooms under a specific section,
|
2017-04-11 10:52:26 +00:00
|
|
|
// so let's do the same thing. Bonus: this means we can't get dreaded 'assignment to entry in nil map' errors.
|
2022-10-05 12:47:13 +00:00
|
|
|
res.Rooms = &RoomsResponse{
|
|
|
|
Join: map[string]*JoinResponse{},
|
|
|
|
Peek: map[string]*JoinResponse{},
|
|
|
|
Invite: map[string]*InviteResponse{},
|
|
|
|
Leave: map[string]*LeaveResponse{},
|
|
|
|
}
|
2017-04-11 10:52:26 +00:00
|
|
|
|
|
|
|
// Also pre-intialise empty slices or else we'll insert 'null' instead of '[]' for the value.
|
|
|
|
// TODO: We really shouldn't have to do all this to coerce encoding/json to Do The Right Thing. We should
|
|
|
|
// really be using our own Marshal/Unmarshal implementations otherwise this may prove to be a CPU bottleneck.
|
|
|
|
// This also applies to NewJoinResponse, NewInviteResponse and NewLeaveResponse.
|
2022-10-05 12:47:13 +00:00
|
|
|
res.AccountData = &ClientEvents{}
|
|
|
|
res.Presence = &ClientEvents{}
|
|
|
|
res.DeviceLists = &DeviceLists{}
|
|
|
|
res.ToDevice = &ToDeviceResponse{}
|
2021-01-13 14:32:49 +00:00
|
|
|
res.DeviceListsOTKCount = map[string]int{}
|
2017-04-11 10:52:26 +00:00
|
|
|
|
|
|
|
return &res
|
|
|
|
}
|
|
|
|
|
2017-10-16 12:34:08 +00:00
|
|
|
// IsEmpty returns true if the response is empty, i.e. used to decided whether
|
|
|
|
// to return the response immediately to the client or to wait for more data.
|
|
|
|
func (r *Response) IsEmpty() bool {
|
|
|
|
return len(r.Rooms.Join) == 0 &&
|
|
|
|
len(r.Rooms.Invite) == 0 &&
|
|
|
|
len(r.Rooms.Leave) == 0 &&
|
|
|
|
len(r.AccountData.Events) == 0 &&
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
len(r.Presence.Events) == 0 &&
|
|
|
|
len(r.ToDevice.Events) == 0
|
2017-10-16 12:34:08 +00:00
|
|
|
}
|
|
|
|
|
2022-09-27 13:01:34 +00:00
|
|
|
type UnreadNotifications struct {
|
|
|
|
HighlightCount int `json:"highlight_count"`
|
|
|
|
NotificationCount int `json:"notification_count"`
|
|
|
|
}
|
|
|
|
|
2022-10-05 12:47:13 +00:00
|
|
|
type ClientEvents struct {
|
2023-04-04 17:16:53 +00:00
|
|
|
Events []synctypes.ClientEvent `json:"events,omitempty"`
|
2022-10-05 12:47:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Timeline struct {
|
2023-04-04 17:16:53 +00:00
|
|
|
Events []synctypes.ClientEvent `json:"events"`
|
|
|
|
Limited bool `json:"limited"`
|
|
|
|
PrevBatch *TopologyToken `json:"prev_batch,omitempty"`
|
2022-10-05 12:47:13 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Summary struct {
|
|
|
|
Heroes []string `json:"m.heroes,omitempty"`
|
|
|
|
JoinedMemberCount *int `json:"m.joined_member_count,omitempty"`
|
|
|
|
InvitedMemberCount *int `json:"m.invited_member_count,omitempty"`
|
|
|
|
}
|
|
|
|
|
2020-09-10 13:39:18 +00:00
|
|
|
// JoinResponse represents a /sync response for a room which is under the 'join' or 'peek' key.
|
2017-04-11 10:52:26 +00:00
|
|
|
type JoinResponse struct {
|
2022-10-05 12:47:13 +00:00
|
|
|
Summary *Summary `json:"summary,omitempty"`
|
|
|
|
State *ClientEvents `json:"state,omitempty"`
|
|
|
|
Timeline *Timeline `json:"timeline,omitempty"`
|
|
|
|
Ephemeral *ClientEvents `json:"ephemeral,omitempty"`
|
|
|
|
AccountData *ClientEvents `json:"account_data,omitempty"`
|
2022-09-27 13:01:34 +00:00
|
|
|
*UnreadNotifications `json:"unread_notifications,omitempty"`
|
2017-04-11 10:52:26 +00:00
|
|
|
}
|
|
|
|
|
2022-10-05 12:47:13 +00:00
|
|
|
func (jr JoinResponse) MarshalJSON() ([]byte, error) {
|
|
|
|
type alias JoinResponse
|
|
|
|
a := alias(jr)
|
|
|
|
if jr.State != nil && len(jr.State.Events) == 0 {
|
|
|
|
a.State = nil
|
|
|
|
}
|
|
|
|
if jr.Ephemeral != nil && len(jr.Ephemeral.Events) == 0 {
|
|
|
|
a.Ephemeral = nil
|
|
|
|
}
|
2022-11-29 14:46:28 +00:00
|
|
|
if jr.Ephemeral != nil {
|
|
|
|
// Remove the room_id from EDUs, as this seems to cause Element Web
|
|
|
|
// to trigger notifications - https://github.com/vector-im/element-web/issues/17263
|
|
|
|
for i := range jr.Ephemeral.Events {
|
|
|
|
jr.Ephemeral.Events[i].RoomID = ""
|
|
|
|
}
|
|
|
|
}
|
2022-10-05 12:47:13 +00:00
|
|
|
if jr.AccountData != nil && len(jr.AccountData.Events) == 0 {
|
|
|
|
a.AccountData = nil
|
|
|
|
}
|
|
|
|
if jr.Timeline != nil && len(jr.Timeline.Events) == 0 {
|
|
|
|
a.Timeline = nil
|
|
|
|
}
|
|
|
|
if jr.Summary != nil {
|
|
|
|
var nilPtr int
|
|
|
|
joinedEmpty := jr.Summary.JoinedMemberCount == nil || jr.Summary.JoinedMemberCount == &nilPtr
|
|
|
|
invitedEmpty := jr.Summary.InvitedMemberCount == nil || jr.Summary.InvitedMemberCount == &nilPtr
|
|
|
|
if joinedEmpty && invitedEmpty && len(jr.Summary.Heroes) == 0 {
|
|
|
|
a.Summary = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
2022-10-14 08:38:12 +00:00
|
|
|
if jr.UnreadNotifications != nil {
|
|
|
|
// if everything else is nil, also remove UnreadNotifications
|
|
|
|
if a.State == nil && a.Ephemeral == nil && a.AccountData == nil && a.Timeline == nil && a.Summary == nil {
|
|
|
|
a.UnreadNotifications = nil
|
|
|
|
}
|
2022-10-05 12:47:13 +00:00
|
|
|
}
|
|
|
|
return json.Marshal(a)
|
|
|
|
}
|
|
|
|
|
2017-04-11 10:52:26 +00:00
|
|
|
// NewJoinResponse creates an empty response with initialised arrays.
|
|
|
|
func NewJoinResponse() *JoinResponse {
|
2022-10-05 12:47:13 +00:00
|
|
|
return &JoinResponse{
|
|
|
|
Summary: &Summary{},
|
|
|
|
State: &ClientEvents{},
|
|
|
|
Timeline: &Timeline{},
|
|
|
|
Ephemeral: &ClientEvents{},
|
|
|
|
AccountData: &ClientEvents{},
|
|
|
|
UnreadNotifications: &UnreadNotifications{},
|
|
|
|
}
|
2017-04-11 10:52:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// InviteResponse represents a /sync response for a room which is under the 'invite' key.
|
|
|
|
type InviteResponse struct {
|
|
|
|
InviteState struct {
|
2020-10-08 09:03:37 +00:00
|
|
|
Events []json.RawMessage `json:"events"`
|
2017-04-11 10:52:26 +00:00
|
|
|
} `json:"invite_state"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewInviteResponse creates an empty response with initialised arrays.
|
2023-09-15 15:25:09 +00:00
|
|
|
func NewInviteResponse(ctx context.Context, rsAPI api.QuerySenderIDAPI, event *types.HeaderedEvent, eventFormat synctypes.ClientEventFormat) (*InviteResponse, error) {
|
2017-04-11 10:52:26 +00:00
|
|
|
res := InviteResponse{}
|
2020-10-08 09:03:37 +00:00
|
|
|
res.InviteState.Events = []json.RawMessage{}
|
|
|
|
|
|
|
|
// First see if there's invite_room_state in the unsigned key of the invite.
|
|
|
|
// If there is then unmarshal it into the response. This will contain the
|
|
|
|
// partial room state such as join rules, room name etc.
|
2020-04-24 15:30:25 +00:00
|
|
|
if inviteRoomState := gjson.GetBytes(event.Unsigned(), "invite_room_state"); inviteRoomState.Exists() {
|
2023-09-15 15:25:09 +00:00
|
|
|
if event.Version() == gomatrixserverlib.RoomVersionPseudoIDs && eventFormat != synctypes.FormatSyncFederation {
|
|
|
|
updatedInvite, err := synctypes.GetUpdatedInviteRoomState(func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
|
|
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
|
|
|
|
}, inviteRoomState, event.PDU, event.RoomID(), eventFormat)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
_ = json.Unmarshal(updatedInvite, &res.InviteState.Events)
|
|
|
|
} else {
|
|
|
|
_ = json.Unmarshal([]byte(inviteRoomState.Raw), &res.InviteState.Events)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Clear unsigned so it doesn't have pseudoIDs converted during ToClientEvent
|
|
|
|
eventNoUnsigned, err := event.SetUnsigned(nil)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2020-10-08 09:03:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Then we'll see if we can create a partial of the invite event itself.
|
|
|
|
// This is needed for clients to work out *who* sent the invite.
|
2023-09-15 15:25:09 +00:00
|
|
|
inviteEvent, err := synctypes.ToClientEvent(eventNoUnsigned, eventFormat, func(roomID spec.RoomID, senderID spec.SenderID) (*spec.UserID, error) {
|
|
|
|
return rsAPI.QueryUserIDForSender(ctx, roomID, senderID)
|
|
|
|
})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure unsigned field is empty so it isn't marshalled into the final JSON
|
2020-10-08 09:03:37 +00:00
|
|
|
inviteEvent.Unsigned = nil
|
2023-09-15 15:25:09 +00:00
|
|
|
|
|
|
|
if ev, err := json.Marshal(*inviteEvent); err == nil {
|
2020-10-08 09:03:37 +00:00
|
|
|
res.InviteState.Events = append(res.InviteState.Events, ev)
|
2020-04-24 15:30:25 +00:00
|
|
|
}
|
2020-10-08 09:03:37 +00:00
|
|
|
|
2023-09-15 15:25:09 +00:00
|
|
|
return &res, nil
|
2017-04-11 10:52:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// LeaveResponse represents a /sync response for a room which is under the 'leave' key.
|
|
|
|
type LeaveResponse struct {
|
2022-10-05 12:47:13 +00:00
|
|
|
State *ClientEvents `json:"state,omitempty"`
|
|
|
|
Timeline *Timeline `json:"timeline,omitempty"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func (lr LeaveResponse) MarshalJSON() ([]byte, error) {
|
|
|
|
type alias LeaveResponse
|
|
|
|
a := alias(lr)
|
|
|
|
if lr.State != nil && len(lr.State.Events) == 0 {
|
|
|
|
a.State = nil
|
|
|
|
}
|
|
|
|
if lr.Timeline != nil && len(lr.Timeline.Events) == 0 {
|
|
|
|
a.Timeline = nil
|
|
|
|
}
|
|
|
|
return json.Marshal(a)
|
2017-04-11 10:52:26 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewLeaveResponse creates an empty response with initialised arrays.
|
|
|
|
func NewLeaveResponse() *LeaveResponse {
|
2022-10-05 12:47:13 +00:00
|
|
|
res := LeaveResponse{
|
|
|
|
State: &ClientEvents{},
|
|
|
|
Timeline: &Timeline{},
|
|
|
|
}
|
2017-04-11 10:52:26 +00:00
|
|
|
return &res
|
|
|
|
}
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
|
|
|
|
type SendToDeviceEvent struct {
|
|
|
|
gomatrixserverlib.SendToDeviceEvent
|
2021-01-13 17:29:46 +00:00
|
|
|
ID StreamPosition
|
|
|
|
UserID string
|
|
|
|
DeviceID string
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
}
|
2020-09-10 13:39:18 +00:00
|
|
|
|
|
|
|
type PeekingDevice struct {
|
|
|
|
UserID string
|
|
|
|
DeviceID string
|
|
|
|
}
|
|
|
|
|
|
|
|
type Peek struct {
|
|
|
|
RoomID string
|
|
|
|
New bool
|
|
|
|
Deleted bool
|
|
|
|
}
|
2022-03-03 11:40:53 +00:00
|
|
|
|
2022-03-29 12:14:35 +00:00
|
|
|
// OutputReceiptEvent is an entry in the receipt output kafka log
|
|
|
|
type OutputReceiptEvent struct {
|
2023-04-19 14:50:33 +00:00
|
|
|
UserID string `json:"user_id"`
|
|
|
|
RoomID string `json:"room_id"`
|
|
|
|
EventID string `json:"event_id"`
|
|
|
|
Type string `json:"type"`
|
|
|
|
Timestamp spec.Timestamp `json:"timestamp"`
|
2022-03-29 12:14:35 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// OutputSendToDeviceEvent is an entry in the send-to-device output kafka log.
|
|
|
|
// This contains the full event content, along with the user ID and device ID
|
|
|
|
// to which it is destined.
|
|
|
|
type OutputSendToDeviceEvent struct {
|
|
|
|
UserID string `json:"user_id"`
|
|
|
|
DeviceID string `json:"device_id"`
|
|
|
|
gomatrixserverlib.SendToDeviceEvent
|
|
|
|
}
|
2022-04-07 14:08:19 +00:00
|
|
|
|
|
|
|
type IgnoredUsers struct {
|
|
|
|
List map[string]interface{} `json:"ignored_users"`
|
|
|
|
}
|
2022-10-13 13:50:52 +00:00
|
|
|
|
|
|
|
type RelationEntry struct {
|
|
|
|
Position StreamPosition
|
|
|
|
EventID string
|
|
|
|
}
|