diff --git a/syncapi/syncapi_test.go b/syncapi/syncapi_test.go index 089bdafa..8b33c5e4 100644 --- a/syncapi/syncapi_test.go +++ b/syncapi/syncapi_test.go @@ -154,8 +154,12 @@ func testSyncAccessTokens(t *testing.T, dbType test.DBType) { wantJoinedRooms: []string{room.ID}, }, } - // TODO: find a better way - time.Sleep(500 * time.Millisecond) + + syncUntil(t, base, alice.AccessToken, false, func(syncBody string) bool { + // wait for the last sent eventID to come down sync + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(event_id=="%s")`, room.ID, room.Events()[len(room.Events())-1].EventID()) + return gjson.Get(syncBody, path).Exists() + }) for _, tc := range testCases { w := httptest.NewRecorder() @@ -343,6 +347,13 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { // create the users alice := test.NewUser(t) + aliceDev := userapi.Device{ + ID: "ALICEID", + UserID: alice.ID, + AccessToken: "ALICE_BEARER_TOKEN", + DisplayName: "ALICE", + } + bob := test.NewUser(t) bobDev := userapi.Device{ @@ -409,7 +420,7 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { rsAPI := roomserver.NewInternalAPI(base) rsAPI.SetFederationAPI(nil, nil) - AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{bobDev}}, rsAPI, &syncKeyAPI{}) + AddPublicRoutes(base, &syncUserAPI{accounts: []userapi.Device{aliceDev, bobDev}}, rsAPI, &syncKeyAPI{}) for _, tc := range testCases { testname := fmt.Sprintf("%s - %s", tc.historyVisibility, userType) @@ -418,12 +429,18 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { room := test.NewRoom(t, alice, test.RoomHistoryVisibility(tc.historyVisibility)) // send the events/messages to NATS to create the rooms - beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("Before invite in a %s room", tc.historyVisibility)}) + beforeJoinBody := fmt.Sprintf("Before invite in a %s room", tc.historyVisibility) + beforeJoinEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": beforeJoinBody}) eventsToSend := append(room.Events(), beforeJoinEv) if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } - time.Sleep(100 * time.Millisecond) // TODO: find a better way + syncUntil(t, base, aliceDev.AccessToken, false, + func(syncBody string) bool { + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, beforeJoinBody) + return gjson.Get(syncBody, path).Exists() + }, + ) // There is only one event, we expect only to be able to see this, if the room is world_readable w := httptest.NewRecorder() @@ -449,14 +466,20 @@ func testHistoryVisibility(t *testing.T, dbType test.DBType) { inviteEv := room.CreateAndInsert(t, alice, "m.room.member", map[string]interface{}{"membership": "invite"}, test.WithStateKey(bob.ID)) afterInviteEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After invite in a %s room", tc.historyVisibility)}) joinEv := room.CreateAndInsert(t, bob, "m.room.member", map[string]interface{}{"membership": "join"}, test.WithStateKey(bob.ID)) - msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": fmt.Sprintf("After join in a %s room", tc.historyVisibility)}) + afterJoinBody := fmt.Sprintf("After join in a %s room", tc.historyVisibility) + msgEv := room.CreateAndInsert(t, alice, "m.room.message", map[string]interface{}{"body": afterJoinBody}) eventsToSend = append([]*gomatrixserverlib.HeaderedEvent{}, inviteEv, afterInviteEv, joinEv, msgEv) if err := api.SendEvents(ctx, rsAPI, api.KindNew, eventsToSend, "test", "test", nil, false); err != nil { t.Fatalf("failed to send events: %v", err) } - time.Sleep(100 * time.Millisecond) // TODO: find a better way + syncUntil(t, base, aliceDev.AccessToken, false, + func(syncBody string) bool { + path := fmt.Sprintf(`rooms.join.%s.timeline.events.#(content.body=="%s")`, room.ID, afterJoinBody) + return gjson.Get(syncBody, path).Exists() + }, + ) // Verify the messages after/before invite are visible or not w = httptest.NewRecorder() @@ -511,8 +534,8 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { AccountType: userapi.AccountTypeUser, } - base, close := testrig.CreateBaseDendrite(t, dbType) - defer close() + base, baseClose := testrig.CreateBaseDendrite(t, dbType) + defer baseClose() jsctx, _ := base.NATS.Prepare(base.ProcessContext, &base.Cfg.Global.JetStream) defer jetstream.DeleteAllStreams(jsctx, &base.Cfg.Global.JetStream) @@ -607,7 +630,14 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { t.Fatalf("unable to send to device message: %v", err) } } - time.Sleep((time.Millisecond * 15) * time.Duration(tc.sendMessagesCount)) // wait a bit, so the messages can be processed + + syncUntil(t, base, alice.AccessToken, + len(tc.want) == 0, + func(body string) bool { + return gjson.Get(body, fmt.Sprintf(`to_device.events.#(content.dummy=="message %d")`, msgCounter)).Exists() + }, + ) + // Execute a /sync request, recording the response w := httptest.NewRecorder() base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ @@ -630,6 +660,42 @@ func testSendToDevice(t *testing.T, dbType test.DBType) { } } +func syncUntil(t *testing.T, + base *base.BaseDendrite, accessToken string, + skip bool, + checkFunc func(syncBody string) bool, +) { + if checkFunc == nil { + t.Fatalf("No checkFunc defined") + } + if skip { + return + } + // loop on /sync until we receive the last send message or timeout after 5 seconds, since we don't know if the message made it + // to the syncAPI when hitting /sync + done := make(chan bool) + defer close(done) + go func() { + for { + w := httptest.NewRecorder() + base.PublicClientAPIMux.ServeHTTP(w, test.NewRequest(t, "GET", "/_matrix/client/v3/sync", test.WithQueryParams(map[string]string{ + "access_token": accessToken, + "timeout": "1000", + }))) + if checkFunc(w.Body.String()) { + done <- true + return + } + } + }() + + select { + case <-done: + case <-time.After(time.Second * 5): + t.Fatalf("Timed out waiting for messages") + } +} + func toNATSMsgs(t *testing.T, base *base.BaseDendrite, input ...*gomatrixserverlib.HeaderedEvent) []*nats.Msg { result := make([]*nats.Msg, len(input)) for i, ev := range input {