2022-05-09 16:23:02 +00:00
package syncapi
import (
"context"
"encoding/json"
2022-08-02 15:00:16 +00:00
"fmt"
2022-05-09 16:23:02 +00:00
"net/http"
"net/http/httptest"
2022-08-02 15:00:16 +00:00
"reflect"
2022-05-09 16:23:02 +00:00
"testing"
"time"
2022-08-18 06:56:57 +00:00
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/tidwall/gjson"
2022-08-02 15:00:16 +00:00
"github.com/matrix-org/dendrite/clientapi/producers"
2022-05-09 16:23:02 +00:00
keyapi "github.com/matrix-org/dendrite/keyserver/api"
2022-08-11 16:23:35 +00:00
"github.com/matrix-org/dendrite/roomserver"
2022-05-09 16:23:02 +00:00
"github.com/matrix-org/dendrite/roomserver/api"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
2022-05-11 12:44:32 +00:00
"github.com/matrix-org/dendrite/setup/base"
2022-05-09 16:23:02 +00:00
"github.com/matrix-org/dendrite/setup/jetstream"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/dendrite/test"
2022-05-17 12:23:35 +00:00
"github.com/matrix-org/dendrite/test/testrig"
2022-05-09 16:23:02 +00:00
userapi "github.com/matrix-org/dendrite/userapi/api"
)
type syncRoomserverAPI struct {
rsapi . SyncRoomserverAPI
rooms [ ] * test . Room
}
func ( s * syncRoomserverAPI ) QueryLatestEventsAndState ( ctx context . Context , req * rsapi . QueryLatestEventsAndStateRequest , res * rsapi . QueryLatestEventsAndStateResponse ) error {
var room * test . Room
for _ , r := range s . rooms {
if r . ID == req . RoomID {
room = r
break
}
}
if room == nil {
res . RoomExists = false
return nil
}
res . RoomVersion = room . Version
return nil // TODO: return state
}
2022-05-11 12:44:32 +00:00
func ( s * syncRoomserverAPI ) QuerySharedUsers ( ctx context . Context , req * rsapi . QuerySharedUsersRequest , res * rsapi . QuerySharedUsersResponse ) error {
res . UserIDsToCount = make ( map [ string ] int )
return nil
}
func ( s * syncRoomserverAPI ) QueryBulkStateContent ( ctx context . Context , req * rsapi . QueryBulkStateContentRequest , res * rsapi . QueryBulkStateContentResponse ) error {
return nil
}
2022-08-11 16:23:35 +00:00
func ( s * syncRoomserverAPI ) QueryMembershipForUser ( ctx context . Context , req * rsapi . QueryMembershipForUserRequest , res * rsapi . QueryMembershipForUserResponse ) error {
res . IsRoomForgotten = false
res . RoomExists = true
return nil
}
func ( s * syncRoomserverAPI ) QueryMembershipAtEvent ( ctx context . Context , req * rsapi . QueryMembershipAtEventRequest , res * rsapi . QueryMembershipAtEventResponse ) error {
return nil
}
2022-05-09 16:23:02 +00:00
type syncUserAPI struct {
userapi . SyncUserAPI
accounts [ ] userapi . Device
}
func ( s * syncUserAPI ) QueryAccessToken ( ctx context . Context , req * userapi . QueryAccessTokenRequest , res * userapi . QueryAccessTokenResponse ) error {
for _ , acc := range s . accounts {
if acc . AccessToken == req . AccessToken {
res . Device = & acc
return nil
}
}
res . Err = "unknown user"
return nil
}
func ( s * syncUserAPI ) PerformLastSeenUpdate ( ctx context . Context , req * userapi . PerformLastSeenUpdateRequest , res * userapi . PerformLastSeenUpdateResponse ) error {
return nil
}
type syncKeyAPI struct {
2022-05-11 12:44:32 +00:00
keyapi . SyncKeyAPI
}
2022-08-11 14:29:33 +00:00
func ( s * syncKeyAPI ) QueryKeyChanges ( ctx context . Context , req * keyapi . QueryKeyChangesRequest , res * keyapi . QueryKeyChangesResponse ) error {
return nil
2022-05-11 12:44:32 +00:00
}
2022-08-11 14:29:33 +00:00
func ( s * syncKeyAPI ) QueryOneTimeKeys ( ctx context . Context , req * keyapi . QueryOneTimeKeysRequest , res * keyapi . QueryOneTimeKeysResponse ) error {
return nil
2022-05-09 16:23:02 +00:00
}
2022-05-11 12:44:32 +00:00
func TestSyncAPIAccessTokens ( t * testing . T ) {
2022-05-09 16:23:02 +00:00
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
2022-05-11 12:44:32 +00:00
testSyncAccessTokens ( t , dbType )
2022-05-09 16:23:02 +00:00
} )
}
2022-05-11 12:44:32 +00:00
func testSyncAccessTokens ( t * testing . T , dbType test . DBType ) {
2022-05-17 12:23:35 +00:00
user := test . NewUser ( t )
2022-05-09 16:23:02 +00:00
room := test . NewRoom ( t , user )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2022-05-17 12:23:35 +00:00
base , close := testrig . CreateBaseDendrite ( t , dbType )
2022-05-09 16:23:02 +00:00
defer close ( )
jsctx , _ := base . NATS . Prepare ( base . ProcessContext , & base . Cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & base . Cfg . Global . JetStream )
2022-08-11 16:23:35 +00:00
msgs := toNATSMsgs ( t , base , room . Events ( ) ... )
2022-05-09 16:23:02 +00:00
AddPublicRoutes ( base , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { rooms : [ ] * test . Room { room } } , & syncKeyAPI { } )
2022-05-17 12:23:35 +00:00
testrig . MustPublishMsgs ( t , jsctx , msgs ... )
2022-05-09 16:23:02 +00:00
testCases := [ ] struct {
name string
req * http . Request
wantCode int
wantJoinedRooms [ ] string
} {
{
name : "missing access token" ,
req : test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"timeout" : "0" ,
} ) ) ,
wantCode : 401 ,
} ,
{
name : "unknown access token" ,
req : test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : "foo" ,
"timeout" : "0" ,
} ) ) ,
wantCode : 401 ,
} ,
{
name : "valid access token" ,
req : test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
} ) ) ,
wantCode : 200 ,
wantJoinedRooms : [ ] string { room . ID } ,
} ,
}
2022-08-19 09:03:55 +00:00
syncUntil ( t , base , alice . AccessToken , false , func ( syncBody string ) bool {
// wait for the last sent eventID to come down sync
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(event_id=="%s") ` , room . ID , room . Events ( ) [ len ( room . Events ( ) ) - 1 ] . EventID ( ) )
return gjson . Get ( syncBody , path ) . Exists ( )
} )
2022-05-09 16:23:02 +00:00
for _ , tc := range testCases {
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , tc . req )
if w . Code != tc . wantCode {
t . Fatalf ( "%s: got HTTP %d want %d" , tc . name , w . Code , tc . wantCode )
}
if tc . wantJoinedRooms != nil {
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Fatalf ( "%s: failed to decode response body: %s" , tc . name , err )
}
if len ( res . Rooms . Join ) != len ( tc . wantJoinedRooms ) {
t . Errorf ( "%s: got %v joined rooms, want %v.\nResponse: %+v" , tc . name , len ( res . Rooms . Join ) , len ( tc . wantJoinedRooms ) , res )
}
t . Logf ( "res: %+v" , res . Rooms . Join [ room . ID ] )
gotEventIDs := make ( [ ] string , len ( res . Rooms . Join [ room . ID ] . Timeline . Events ) )
for i , ev := range res . Rooms . Join [ room . ID ] . Timeline . Events {
gotEventIDs [ i ] = ev . EventID
}
test . AssertEventIDsEqual ( t , gotEventIDs , room . Events ( ) )
}
}
}
2022-05-11 12:44:32 +00:00
// Tests what happens when we create a room and then /sync before all events from /createRoom have
// been sent to the syncapi
func TestSyncAPICreateRoomSyncEarly ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testSyncAPICreateRoomSyncEarly ( t , dbType )
} )
}
func testSyncAPICreateRoomSyncEarly ( t * testing . T , dbType test . DBType ) {
2022-08-25 12:42:47 +00:00
t . Skip ( "Skipped, possibly fixed" )
2022-05-17 12:23:35 +00:00
user := test . NewUser ( t )
2022-05-11 12:44:32 +00:00
room := test . NewRoom ( t , user )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2022-05-17 12:23:35 +00:00
base , close := testrig . CreateBaseDendrite ( t , dbType )
2022-05-11 12:44:32 +00:00
defer close ( )
jsctx , _ := base . NATS . Prepare ( base . ProcessContext , & base . Cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & base . Cfg . Global . JetStream )
// order is:
// m.room.create
// m.room.member
// m.room.power_levels
// m.room.join_rules
// m.room.history_visibility
2022-08-11 16:23:35 +00:00
msgs := toNATSMsgs ( t , base , room . Events ( ) ... )
2022-05-11 12:44:32 +00:00
sinceTokens := make ( [ ] string , len ( msgs ) )
AddPublicRoutes ( base , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { rooms : [ ] * test . Room { room } } , & syncKeyAPI { } )
for i , msg := range msgs {
2022-05-17 12:23:35 +00:00
testrig . MustPublishMsgs ( t , jsctx , msg )
2022-05-12 09:11:46 +00:00
time . Sleep ( 100 * time . Millisecond )
2022-05-11 12:44:32 +00:00
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
} ) ) )
if w . Code != 200 {
t . Errorf ( "got HTTP %d want 200" , w . Code )
continue
}
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
sinceTokens [ i ] = res . NextBatch . String ( )
if i == 0 { // create event does not produce a room section
if len ( res . Rooms . Join ) != 0 {
t . Fatalf ( "i=%v got %d joined rooms, want 0" , i , len ( res . Rooms . Join ) )
}
} else { // we should have that room somewhere
if len ( res . Rooms . Join ) != 1 {
t . Fatalf ( "i=%v got %d joined rooms, want 1" , i , len ( res . Rooms . Join ) )
}
}
}
// sync with no token "" and with the penultimate token and this should neatly return room events in the timeline block
sinceTokens = append ( [ ] string { "" } , sinceTokens [ : len ( sinceTokens ) - 1 ] ... )
t . Logf ( "waited for events to be consumed; syncing with %v" , sinceTokens )
for i , since := range sinceTokens {
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
"since" : since ,
} ) ) )
if w . Code != 200 {
t . Errorf ( "since=%s got HTTP %d want 200" , since , w . Code )
}
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
if len ( res . Rooms . Join ) != 1 {
t . Fatalf ( "since=%s got %d joined rooms, want 1" , since , len ( res . Rooms . Join ) )
}
t . Logf ( "since=%s res state:%+v res timeline:%+v" , since , res . Rooms . Join [ room . ID ] . State . Events , res . Rooms . Join [ room . ID ] . Timeline . Events )
gotEventIDs := make ( [ ] string , len ( res . Rooms . Join [ room . ID ] . Timeline . Events ) )
for j , ev := range res . Rooms . Join [ room . ID ] . Timeline . Events {
gotEventIDs [ j ] = ev . EventID
}
test . AssertEventIDsEqual ( t , gotEventIDs , room . Events ( ) [ i : ] )
}
}
2022-05-17 14:53:08 +00:00
// Test that if we hit /sync we get back presence: online, regardless of whether messages get delivered
// via NATS. Regression test for a flakey test "User sees their own presence in a sync"
func TestSyncAPIUpdatePresenceImmediately ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testSyncAPIUpdatePresenceImmediately ( t , dbType )
} )
}
func testSyncAPIUpdatePresenceImmediately ( t * testing . T , dbType test . DBType ) {
user := test . NewUser ( t )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
base , close := testrig . CreateBaseDendrite ( t , dbType )
base . Cfg . Global . Presence . EnableOutbound = true
base . Cfg . Global . Presence . EnableInbound = true
defer close ( )
jsctx , _ := base . NATS . Prepare ( base . ProcessContext , & base . Cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & base . Cfg . Global . JetStream )
AddPublicRoutes ( base , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { } , & syncKeyAPI { } )
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"timeout" : "0" ,
"set_presence" : "online" ,
} ) ) )
if w . Code != 200 {
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
var res types . Response
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
if len ( res . Presence . Events ) != 1 {
t . Fatalf ( "expected 1 presence events, got: %+v" , res . Presence . Events )
}
if res . Presence . Events [ 0 ] . Sender != alice . UserID {
t . Errorf ( "sender: got %v want %v" , res . Presence . Events [ 0 ] . Sender , alice . UserID )
}
if res . Presence . Events [ 0 ] . Type != "m.presence" {
t . Errorf ( "type: got %v want %v" , res . Presence . Events [ 0 ] . Type , "m.presence" )
}
if gjson . ParseBytes ( res . Presence . Events [ 0 ] . Content ) . Get ( "presence" ) . Str != "online" {
t . Errorf ( "content: not online, got %v" , res . Presence . Events [ 0 ] . Content )
}
}
2022-08-11 16:23:35 +00:00
// This is mainly what Sytest is doing in "test_history_visibility"
func TestMessageHistoryVisibility ( t * testing . T ) {
test . WithAllDatabases ( t , func ( t * testing . T , dbType test . DBType ) {
testHistoryVisibility ( t , dbType )
} )
}
func testHistoryVisibility ( t * testing . T , dbType test . DBType ) {
type result struct {
seeWithoutJoin bool
seeBeforeJoin bool
seeAfterInvite bool
}
// create the users
alice := test . NewUser ( t )
2022-08-19 09:03:55 +00:00
aliceDev := userapi . Device {
ID : "ALICEID" ,
UserID : alice . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "ALICE" ,
}
2022-08-11 16:23:35 +00:00
bob := test . NewUser ( t )
bobDev := userapi . Device {
ID : "BOBID" ,
UserID : bob . ID ,
AccessToken : "BOD_BEARER_TOKEN" ,
DisplayName : "BOB" ,
}
ctx := context . Background ( )
// check guest and normal user accounts
for _ , accType := range [ ] userapi . AccountType { userapi . AccountTypeGuest , userapi . AccountTypeUser } {
testCases := [ ] struct {
historyVisibility gomatrixserverlib . HistoryVisibility
wantResult result
} {
{
historyVisibility : gomatrixserverlib . HistoryVisibilityWorldReadable ,
wantResult : result {
seeWithoutJoin : true ,
seeBeforeJoin : true ,
seeAfterInvite : true ,
} ,
} ,
{
historyVisibility : gomatrixserverlib . HistoryVisibilityShared ,
wantResult : result {
seeWithoutJoin : false ,
seeBeforeJoin : true ,
seeAfterInvite : true ,
} ,
} ,
{
historyVisibility : gomatrixserverlib . HistoryVisibilityInvited ,
wantResult : result {
seeWithoutJoin : false ,
seeBeforeJoin : false ,
seeAfterInvite : true ,
} ,
} ,
{
historyVisibility : gomatrixserverlib . HistoryVisibilityJoined ,
wantResult : result {
seeWithoutJoin : false ,
seeBeforeJoin : false ,
seeAfterInvite : false ,
} ,
} ,
}
bobDev . AccountType = accType
userType := "guest"
if accType == userapi . AccountTypeUser {
userType = "real user"
}
base , close := testrig . CreateBaseDendrite ( t , dbType )
defer close ( )
jsctx , _ := base . NATS . Prepare ( base . ProcessContext , & base . Cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & base . Cfg . Global . JetStream )
// Use the actual internal roomserver API
rsAPI := roomserver . NewInternalAPI ( base )
rsAPI . SetFederationAPI ( nil , nil )
2022-08-19 09:03:55 +00:00
AddPublicRoutes ( base , & syncUserAPI { accounts : [ ] userapi . Device { aliceDev , bobDev } } , rsAPI , & syncKeyAPI { } )
2022-08-11 16:23:35 +00:00
for _ , tc := range testCases {
testname := fmt . Sprintf ( "%s - %s" , tc . historyVisibility , userType )
t . Run ( testname , func ( t * testing . T ) {
// create a room with the given visibility
room := test . NewRoom ( t , alice , test . RoomHistoryVisibility ( tc . historyVisibility ) )
// send the events/messages to NATS to create the rooms
2022-08-19 09:03:55 +00:00
beforeJoinBody := fmt . Sprintf ( "Before invite in a %s room" , tc . historyVisibility )
beforeJoinEv := room . CreateAndInsert ( t , alice , "m.room.message" , map [ string ] interface { } { "body" : beforeJoinBody } )
2022-08-11 16:23:35 +00:00
eventsToSend := append ( room . Events ( ) , beforeJoinEv )
if err := api . SendEvents ( ctx , rsAPI , api . KindNew , eventsToSend , "test" , "test" , nil , false ) ; err != nil {
t . Fatalf ( "failed to send events: %v" , err )
}
2022-08-19 09:03:55 +00:00
syncUntil ( t , base , aliceDev . AccessToken , false ,
func ( syncBody string ) bool {
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(content.body=="%s") ` , room . ID , beforeJoinBody )
return gjson . Get ( syncBody , path ) . Exists ( )
} ,
)
2022-08-11 16:23:35 +00:00
// There is only one event, we expect only to be able to see this, if the room is world_readable
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/messages" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : bobDev . AccessToken ,
"dir" : "b" ,
} ) ) )
if w . Code != 200 {
t . Logf ( "%s" , w . Body . String ( ) )
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
// We only care about the returned events at this point
var res struct {
Chunk [ ] gomatrixserverlib . ClientEvent ` json:"chunk" `
}
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
verifyEventVisible ( t , tc . wantResult . seeWithoutJoin , beforeJoinEv , res . Chunk )
// Create invite, a message, join the room and create another message.
inviteEv := room . CreateAndInsert ( t , alice , "m.room.member" , map [ string ] interface { } { "membership" : "invite" } , test . WithStateKey ( bob . ID ) )
afterInviteEv := room . CreateAndInsert ( t , alice , "m.room.message" , map [ string ] interface { } { "body" : fmt . Sprintf ( "After invite in a %s room" , tc . historyVisibility ) } )
joinEv := room . CreateAndInsert ( t , bob , "m.room.member" , map [ string ] interface { } { "membership" : "join" } , test . WithStateKey ( bob . ID ) )
2022-08-19 09:03:55 +00:00
afterJoinBody := fmt . Sprintf ( "After join in a %s room" , tc . historyVisibility )
msgEv := room . CreateAndInsert ( t , alice , "m.room.message" , map [ string ] interface { } { "body" : afterJoinBody } )
2022-08-11 16:23:35 +00:00
eventsToSend = append ( [ ] * gomatrixserverlib . HeaderedEvent { } , inviteEv , afterInviteEv , joinEv , msgEv )
if err := api . SendEvents ( ctx , rsAPI , api . KindNew , eventsToSend , "test" , "test" , nil , false ) ; err != nil {
t . Fatalf ( "failed to send events: %v" , err )
}
2022-08-19 09:03:55 +00:00
syncUntil ( t , base , aliceDev . AccessToken , false ,
func ( syncBody string ) bool {
path := fmt . Sprintf ( ` rooms.join.%s.timeline.events.#(content.body=="%s") ` , room . ID , afterJoinBody )
return gjson . Get ( syncBody , path ) . Exists ( )
} ,
)
2022-08-11 16:23:35 +00:00
// Verify the messages after/before invite are visible or not
w = httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , fmt . Sprintf ( "/_matrix/client/v3/rooms/%s/messages" , room . ID ) , test . WithQueryParams ( map [ string ] string {
"access_token" : bobDev . AccessToken ,
"dir" : "b" ,
} ) ) )
if w . Code != 200 {
t . Logf ( "%s" , w . Body . String ( ) )
t . Fatalf ( "got HTTP %d want %d" , w . Code , 200 )
}
if err := json . NewDecoder ( w . Body ) . Decode ( & res ) ; err != nil {
t . Errorf ( "failed to decode response body: %s" , err )
}
// verify results
verifyEventVisible ( t , tc . wantResult . seeBeforeJoin , beforeJoinEv , res . Chunk )
verifyEventVisible ( t , tc . wantResult . seeAfterInvite , afterInviteEv , res . Chunk )
} )
}
}
}
func verifyEventVisible ( t * testing . T , wantVisible bool , wantVisibleEvent * gomatrixserverlib . HeaderedEvent , chunk [ ] gomatrixserverlib . ClientEvent ) {
t . Helper ( )
if wantVisible {
for _ , ev := range chunk {
if ev . EventID == wantVisibleEvent . EventID ( ) {
return
}
}
t . Fatalf ( "expected to see event %s but didn't: %+v" , wantVisibleEvent . EventID ( ) , chunk )
} else {
for _ , ev := range chunk {
if ev . EventID == wantVisibleEvent . EventID ( ) {
t . Fatalf ( "expected not to see event %s: %+v" , wantVisibleEvent . EventID ( ) , string ( ev . Content ) )
}
}
}
}
2022-08-02 15:00:16 +00:00
func TestSendToDevice ( t * testing . T ) {
test . WithAllDatabases ( t , testSendToDevice )
}
func testSendToDevice ( t * testing . T , dbType test . DBType ) {
user := test . NewUser ( t )
alice := userapi . Device {
ID : "ALICEID" ,
UserID : user . ID ,
AccessToken : "ALICE_BEARER_TOKEN" ,
DisplayName : "Alice" ,
AccountType : userapi . AccountTypeUser ,
}
2022-08-19 09:03:55 +00:00
base , baseClose := testrig . CreateBaseDendrite ( t , dbType )
defer baseClose ( )
2022-08-02 15:00:16 +00:00
jsctx , _ := base . NATS . Prepare ( base . ProcessContext , & base . Cfg . Global . JetStream )
defer jetstream . DeleteAllStreams ( jsctx , & base . Cfg . Global . JetStream )
AddPublicRoutes ( base , & syncUserAPI { accounts : [ ] userapi . Device { alice } } , & syncRoomserverAPI { } , & syncKeyAPI { } )
producer := producers . SyncAPIProducer {
TopicSendToDeviceEvent : base . Cfg . Global . JetStream . Prefixed ( jetstream . OutputSendToDeviceEvent ) ,
JetStream : jsctx ,
}
msgCounter := 0
testCases := [ ] struct {
name string
since string
want [ ] string
sendMessagesCount int
} {
{
name : "initial sync, no messages" ,
want : [ ] string { } ,
} ,
{
name : "initial sync, one new message" ,
sendMessagesCount : 1 ,
want : [ ] string {
"message 1" ,
} ,
} ,
{
name : "initial sync, two new messages" , // we didn't advance the since token, so we'll receive two messages
sendMessagesCount : 1 ,
want : [ ] string {
"message 1" ,
"message 2" ,
} ,
} ,
{
name : "incremental sync, one message" , // this deletes message 1, as we advanced the since token
since : types . StreamingToken { SendToDevicePosition : 1 } . String ( ) ,
want : [ ] string {
"message 2" ,
} ,
} ,
{
name : "failed incremental sync, one message" , // didn't advance since, so still the same message
since : types . StreamingToken { SendToDevicePosition : 1 } . String ( ) ,
want : [ ] string {
"message 2" ,
} ,
} ,
{
name : "incremental sync, no message" , // this should delete message 2
since : types . StreamingToken { SendToDevicePosition : 2 } . String ( ) , // next_batch from previous sync
want : [ ] string { } ,
} ,
{
name : "incremental sync, three new messages" ,
since : types . StreamingToken { SendToDevicePosition : 2 } . String ( ) ,
sendMessagesCount : 3 ,
want : [ ] string {
"message 3" , // message 2 was deleted in the previous test
"message 4" ,
"message 5" ,
} ,
} ,
{
name : "initial sync, three messages" , // we expect three messages, as we didn't go beyond "2"
want : [ ] string {
"message 3" ,
"message 4" ,
"message 5" ,
} ,
} ,
{
name : "incremental sync, no messages" , // advance the sync token, no new messages
since : types . StreamingToken { SendToDevicePosition : 5 } . String ( ) ,
want : [ ] string { } ,
} ,
}
ctx := context . Background ( )
for _ , tc := range testCases {
// Send to-device messages of type "m.dendrite.test" with content `{"dummy":"message $counter"}`
for i := 0 ; i < tc . sendMessagesCount ; i ++ {
msgCounter ++
msg := map [ string ] string {
"dummy" : fmt . Sprintf ( "message %d" , msgCounter ) ,
}
if err := producer . SendToDevice ( ctx , user . ID , user . ID , alice . ID , "m.dendrite.test" , msg ) ; err != nil {
t . Fatalf ( "unable to send to device message: %v" , err )
}
}
2022-08-19 09:03:55 +00:00
syncUntil ( t , base , alice . AccessToken ,
len ( tc . want ) == 0 ,
func ( body string ) bool {
return gjson . Get ( body , fmt . Sprintf ( ` to_device.events.#(content.dummy=="message %d") ` , msgCounter ) ) . Exists ( )
} ,
)
2022-08-02 15:00:16 +00:00
// Execute a /sync request, recording the response
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : alice . AccessToken ,
"since" : tc . since ,
} ) ) )
// Extract the to_device.events, # gets all values of an array, in this case a string slice with "message $counter" entries
events := gjson . Get ( w . Body . String ( ) , "to_device.events.#.content.dummy" ) . Array ( )
got := make ( [ ] string , len ( events ) )
for i := range events {
got [ i ] = events [ i ] . String ( )
}
// Ensure the messages we received are as we expect them to be
if ! reflect . DeepEqual ( got , tc . want ) {
t . Logf ( "[%s|since=%s]: Sync: %s" , tc . name , tc . since , w . Body . String ( ) )
t . Fatalf ( "[%s|since=%s]: got: %+v, want: %+v" , tc . name , tc . since , got , tc . want )
}
}
}
2022-08-19 09:03:55 +00:00
func syncUntil ( t * testing . T ,
base * base . BaseDendrite , accessToken string ,
skip bool ,
checkFunc func ( syncBody string ) bool ,
) {
if checkFunc == nil {
t . Fatalf ( "No checkFunc defined" )
}
if skip {
return
}
// loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it
// to the syncAPI when hitting /sync
done := make ( chan bool )
defer close ( done )
go func ( ) {
for {
w := httptest . NewRecorder ( )
base . PublicClientAPIMux . ServeHTTP ( w , test . NewRequest ( t , "GET" , "/_matrix/client/v3/sync" , test . WithQueryParams ( map [ string ] string {
"access_token" : accessToken ,
"timeout" : "1000" ,
} ) ) )
if checkFunc ( w . Body . String ( ) ) {
done <- true
return
}
}
} ( )
select {
case <- done :
case <- time . After ( time . Second * 5 ) :
t . Fatalf ( "Timed out waiting for messages" )
}
}
2022-08-11 16:23:35 +00:00
func toNATSMsgs ( t * testing . T , base * base . BaseDendrite , input ... * gomatrixserverlib . HeaderedEvent ) [ ] * nats . Msg {
2022-05-11 12:44:32 +00:00
result := make ( [ ] * nats . Msg , len ( input ) )
for i , ev := range input {
var addsStateIDs [ ] string
if ev . StateKey ( ) != nil {
addsStateIDs = append ( addsStateIDs , ev . EventID ( ) )
}
2022-05-17 12:23:35 +00:00
result [ i ] = testrig . NewOutputEventMsg ( t , base , ev . RoomID ( ) , api . OutputEvent {
2022-05-11 12:44:32 +00:00
Type : rsapi . OutputTypeNewRoomEvent ,
NewRoomEvent : & rsapi . OutputNewRoomEvent {
Event : ev ,
AddsStateEventIDs : addsStateIDs ,
2022-08-11 16:23:35 +00:00
HistoryVisibility : ev . Visibility ,
2022-05-11 12:44:32 +00:00
} ,
} )
}
return result
}