mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Remove QuerySharedUsers from current state server (#1396)
* Remove QuerySharedUsers from current state server * Bugfixes
This commit is contained in:
parent
81688d6bde
commit
ca8dcf46b7
13 changed files with 93 additions and 407 deletions
|
@ -25,18 +25,6 @@ type CurrentStateInternalAPI interface {
|
||||||
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
|
QueryRoomsForUser(ctx context.Context, req *QueryRoomsForUserRequest, res *QueryRoomsForUserResponse) error
|
||||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||||
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
|
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
|
||||||
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
|
|
||||||
QuerySharedUsers(ctx context.Context, req *QuerySharedUsersRequest, res *QuerySharedUsersResponse) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type QuerySharedUsersRequest struct {
|
|
||||||
UserID string
|
|
||||||
ExcludeRoomIDs []string
|
|
||||||
IncludeRoomIDs []string
|
|
||||||
}
|
|
||||||
|
|
||||||
type QuerySharedUsersResponse struct {
|
|
||||||
UserIDsToCount map[string]int
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type QueryRoomsForUserRequest struct {
|
type QueryRoomsForUserRequest struct {
|
||||||
|
|
|
@ -1,294 +0,0 @@
|
||||||
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package currentstateserver
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ed25519"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"reflect"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
|
||||||
"github.com/gorilla/mux"
|
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/internal"
|
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/inthttp"
|
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/storage"
|
|
||||||
"github.com/matrix-org/dendrite/internal/config"
|
|
||||||
"github.com/matrix-org/dendrite/internal/httputil"
|
|
||||||
"github.com/matrix-org/dendrite/internal/test"
|
|
||||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
|
||||||
"github.com/matrix-org/naffka"
|
|
||||||
naffkaStorage "github.com/matrix-org/naffka/storage"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
testRoomVersion = gomatrixserverlib.RoomVersionV1
|
|
||||||
testData = []json.RawMessage{
|
|
||||||
[]byte(`{"auth_events":[],"content":{"creator":"@userid:kaer.morhen"},"depth":0,"event_id":"$0ok8ynDp7kjc95e3:kaer.morhen","hashes":{"sha256":"17kPoH+h0Dk4Omn7Sus0qMb6+oGcf+CZFEgDhv7UKWs"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"jP4a04f5/F10Pw95FPpdCyKAO44JOwUQ/MZOOeA/RTU1Dn+AHPMzGSaZnuGjRr/xQuADt+I3ctb5ZQfLKNzHDw"}},"state_key":"","type":"m.room.create"}`),
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"content":{"membership":"join"},"depth":1,"event_id":"$LEwEu0kxrtu5fOiS:kaer.morhen","hashes":{"sha256":"B7M88PhXf3vd1LaFtjQutFu4x/w7fHD28XKZ4sAsJTo"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"p2vqmuJn7ZBRImctSaKbXCAxCcBlIjPH9JHte1ouIUGy84gpu4eLipOvSBCLL26hXfC0Zrm4WUto6Hr+ohdrCg"}},"state_key":"@userid:kaer.morhen","type":"m.room.member"}`),
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"join_rule":"public"},"depth":2,"event_id":"$SMHlqUrNhhBBRLeN:kaer.morhen","hashes":{"sha256":"vIuJQvmMjrGxshAkj1SXe0C4RqvMbv4ZADDw9pFCWqQ"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"hBMsb3Qppo3RaqqAl4JyTgaiWEbW5hlckATky6PrHun+F3YM203TzG7w9clwuQU5F5pZoB1a6nw+to0hN90FAw"}},"state_key":"","type":"m.room.join_rules"}`),
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"history_visibility":"shared"},"depth":3,"event_id":"$6F1yGIbO0J7TM93h:kaer.morhen","hashes":{"sha256":"Mr23GKSlZW7UCCYLgOWawI2Sg6KIoMjUWO2TDenuOgw"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$SMHlqUrNhhBBRLeN:kaer.morhen",{"sha256":"SylzE8U02I+6eyEHgL+FlU0L5YdqrVp8OOlxKS9VQW0"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sHLKrFI3hKGrEJfpMVZSDS3LvLasQsy50CTsOwru9XTVxgRsPo6wozNtRVjxo1J3Rk18RC9JppovmQ5VR5EcDw"}},"state_key":"","type":"m.room.history_visibility"}`),
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"ban":50,"events":null,"events_default":0,"invite":0,"kick":50,"redact":50,"state_default":50,"users":null,"users_default":0},"depth":4,"event_id":"$UKNe10XzYzG0TeA9:kaer.morhen","hashes":{"sha256":"ngbP3yja9U5dlckKerUs/fSOhtKxZMCVvsfhPURSS28"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$6F1yGIbO0J7TM93h:kaer.morhen",{"sha256":"A4CucrKSoWX4IaJXhq02mBg1sxIyZEftbC+5p3fZAvk"}]],"prev_state":[],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"zOmwlP01QL3yFchzuR9WHvogOoBZA3oVtNIF3lM0ZfDnqlSYZB9sns27G/4HVq0k7alaK7ZE3oGoCrVnMkPNCw"}},"state_key":"","type":"m.room.power_levels"}`),
|
|
||||||
// messages
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":5,"event_id":"$gl2T9l3qm0kUbiIJ:kaer.morhen","hashes":{"sha256":"Qx3nRMHLDPSL5hBAzuX84FiSSP0K0Kju2iFoBWH4Za8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$UKNe10XzYzG0TeA9:kaer.morhen",{"sha256":"KtSRyMjt0ZSjsv2koixTRCxIRCGoOp6QrKscsW97XRo"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"sqDgv3EG7ml5VREzmT9aZeBpS4gAPNIaIeJOwqjDhY0GPU/BcpX5wY4R7hYLrNe5cChgV+eFy/GWm1Zfg5FfDg"}},"type":"m.room.message"}`),
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":6,"event_id":"$MYSbs8m4rEbsCWXD:kaer.morhen","hashes":{"sha256":"kgbYM7v4Ud2YaBsjBTolM4ySg6rHcJNYI6nWhMSdFUA"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$gl2T9l3qm0kUbiIJ:kaer.morhen",{"sha256":"C/rD04h9wGxRdN2G/IBfrgoE1UovzLZ+uskwaKZ37/Q"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"x0UoKh968jj/F5l1/R7Ew0T6CTKuew3PLNHASNxqck/bkNe8yYQiDHXRr+kZxObeqPZZTpaF1+EI+bLU9W8GDQ"}},"type":"m.room.message"}`),
|
|
||||||
[]byte(`{"auth_events":[["$0ok8ynDp7kjc95e3:kaer.morhen",{"sha256":"sWCi6Ckp9rDimQON+MrUlNRkyfZ2tjbPbWfg2NMB18Q"}],["$LEwEu0kxrtu5fOiS:kaer.morhen",{"sha256":"1aKajq6DWHru1R1HJjvdWMEavkJJHGaTmPvfuERUXaA"}]],"content":{"body":"Test Message"},"depth":7,"event_id":"$N5x9WJkl9ClPrAEg:kaer.morhen","hashes":{"sha256":"FWM8oz4yquTunRZ67qlW2gzPDzdWfBP6RPHXhK1I/x8"},"origin":"kaer.morhen","origin_server_ts":0,"prev_events":[["$MYSbs8m4rEbsCWXD:kaer.morhen",{"sha256":"fatqgW+SE8mb2wFn3UN+drmluoD4UJ/EcSrL6Ur9q1M"}]],"room_id":"!roomid:kaer.morhen","sender":"@userid:kaer.morhen","signatures":{"kaer.morhen":{"ed25519:auto":"Y+LX/xcyufoXMOIoqQBNOzy6lZfUGB1ffgXIrSugk6obMiyAsiRejHQN/pciZXsHKxMJLYRFAz4zSJoS/LGPAA"}},"type":"m.room.message"}`),
|
|
||||||
}
|
|
||||||
testEvents = []gomatrixserverlib.HeaderedEvent{}
|
|
||||||
testStateEvents = make(map[gomatrixserverlib.StateKeyTuple]gomatrixserverlib.HeaderedEvent)
|
|
||||||
|
|
||||||
kafkaPrefix = "Dendrite"
|
|
||||||
kafkaTopic = fmt.Sprintf("%s%s", kafkaPrefix, "OutputRoomEvent")
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
for _, j := range testData {
|
|
||||||
e, err := gomatrixserverlib.NewEventFromTrustedJSON(j, false, testRoomVersion)
|
|
||||||
if err != nil {
|
|
||||||
panic("cannot load test data: " + err.Error())
|
|
||||||
}
|
|
||||||
h := e.Headered(testRoomVersion)
|
|
||||||
testEvents = append(testEvents, h)
|
|
||||||
if e.StateKey() != nil {
|
|
||||||
testStateEvents[gomatrixserverlib.StateKeyTuple{
|
|
||||||
EventType: e.Type(),
|
|
||||||
StateKey: *e.StateKey(),
|
|
||||||
}] = h
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForOffsetProcessed(t *testing.T, db storage.Database, offset int64) {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
for {
|
|
||||||
poffsets, err := db.PartitionOffsets(ctx, kafkaTopic)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to PartitionOffsets: %s", err)
|
|
||||||
}
|
|
||||||
for _, partition := range poffsets {
|
|
||||||
if partition.Offset >= offset {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func MustWriteOutputEvent(t *testing.T, producer sarama.SyncProducer, out *roomserverAPI.OutputNewRoomEvent) int64 {
|
|
||||||
value, err := json.Marshal(roomserverAPI.OutputEvent{
|
|
||||||
Type: roomserverAPI.OutputTypeNewRoomEvent,
|
|
||||||
NewRoomEvent: out,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to marshal output event: %s", err)
|
|
||||||
}
|
|
||||||
_, offset, err := producer.SendMessage(&sarama.ProducerMessage{
|
|
||||||
Topic: kafkaTopic,
|
|
||||||
Key: sarama.StringEncoder(out.Event.RoomID()),
|
|
||||||
Value: sarama.ByteEncoder(value),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to send message: %s", err)
|
|
||||||
}
|
|
||||||
return offset
|
|
||||||
}
|
|
||||||
|
|
||||||
func MustMakeInternalAPI(t *testing.T) (api.CurrentStateInternalAPI, storage.Database, sarama.SyncProducer, func()) {
|
|
||||||
cfg := &config.Dendrite{}
|
|
||||||
cfg.Defaults()
|
|
||||||
stateDBName := "test_state.db"
|
|
||||||
naffkaDBName := "test_naffka.db"
|
|
||||||
cfg.Global.ServerName = "kaer.morhen"
|
|
||||||
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource("file:" + stateDBName)
|
|
||||||
cfg.Global.Kafka.TopicPrefix = kafkaPrefix
|
|
||||||
naffkaDB, err := naffkaStorage.NewDatabase("file:" + naffkaDBName)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to setup naffka database: %s", err)
|
|
||||||
}
|
|
||||||
naff, err := naffka.New(naffkaDB)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create naffka consumer: %s", err)
|
|
||||||
}
|
|
||||||
stateAPI := NewInternalAPI(&cfg.CurrentStateServer, naff)
|
|
||||||
// type-cast to pull out the DB
|
|
||||||
stateAPIVal := stateAPI.(*internal.CurrentStateInternalAPI)
|
|
||||||
return stateAPI, stateAPIVal.DB, naff, func() {
|
|
||||||
os.Remove(naffkaDBName)
|
|
||||||
os.Remove(stateDBName)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func mustMakeMembershipEvent(t *testing.T, roomID, userID, membership string) *roomserverAPI.OutputNewRoomEvent {
|
|
||||||
eb := gomatrixserverlib.EventBuilder{
|
|
||||||
RoomID: roomID,
|
|
||||||
Sender: userID,
|
|
||||||
StateKey: &userID,
|
|
||||||
Type: "m.room.member",
|
|
||||||
Content: []byte(`{"membership":"` + membership + `"}`),
|
|
||||||
}
|
|
||||||
_, pkey, err := ed25519.GenerateKey(nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to make ed25519 key: %s", err)
|
|
||||||
}
|
|
||||||
roomVer := gomatrixserverlib.RoomVersionV5
|
|
||||||
ev, err := eb.Build(
|
|
||||||
time.Now(), gomatrixserverlib.ServerName("localhost"), gomatrixserverlib.KeyID("ed25519:test"),
|
|
||||||
pkey, roomVer,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("mustMakeMembershipEvent failed: %s", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &roomserverAPI.OutputNewRoomEvent{
|
|
||||||
Event: ev.Headered(roomVer),
|
|
||||||
AddsStateEventIDs: []string{ev.EventID()},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This test makes sure that QuerySharedUsers is returning the correct users for a range of sets.
|
|
||||||
func TestQuerySharedUsers(t *testing.T) {
|
|
||||||
currStateAPI, db, producer, cancel := MustMakeInternalAPI(t)
|
|
||||||
defer cancel()
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@alice:localhost", "join"))
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo:bar", "@bob:localhost", "join"))
|
|
||||||
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo2:bar", "@alice:localhost", "join"))
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo2:bar", "@charlie:localhost", "join"))
|
|
||||||
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@alice:localhost", "join"))
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@bob:localhost", "join"))
|
|
||||||
MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo3:bar", "@dave:localhost", "leave"))
|
|
||||||
|
|
||||||
offset := MustWriteOutputEvent(t, producer, mustMakeMembershipEvent(t, "!foo4:bar", "@alice:localhost", "join"))
|
|
||||||
waitForOffsetProcessed(t, db, offset)
|
|
||||||
|
|
||||||
testCases := []struct {
|
|
||||||
req api.QuerySharedUsersRequest
|
|
||||||
wantRes api.QuerySharedUsersResponse
|
|
||||||
}{
|
|
||||||
// Simple case: sharing (A,B) (A,C) (A,B) (A) produces (A:4,B:2,C:1)
|
|
||||||
{
|
|
||||||
req: api.QuerySharedUsersRequest{
|
|
||||||
UserID: "@alice:localhost",
|
|
||||||
},
|
|
||||||
wantRes: api.QuerySharedUsersResponse{
|
|
||||||
UserIDsToCount: map[string]int{
|
|
||||||
"@alice:localhost": 4,
|
|
||||||
"@bob:localhost": 2,
|
|
||||||
"@charlie:localhost": 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
// Exclude (A,C): sharing (A,B) (A,B) (A) produces (A:3,B:2)
|
|
||||||
{
|
|
||||||
req: api.QuerySharedUsersRequest{
|
|
||||||
UserID: "@alice:localhost",
|
|
||||||
ExcludeRoomIDs: []string{"!foo2:bar"},
|
|
||||||
},
|
|
||||||
wantRes: api.QuerySharedUsersResponse{
|
|
||||||
UserIDsToCount: map[string]int{
|
|
||||||
"@alice:localhost": 3,
|
|
||||||
"@bob:localhost": 2,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
// Unknown user has no shared users
|
|
||||||
{
|
|
||||||
req: api.QuerySharedUsersRequest{
|
|
||||||
UserID: "@unknownuser:localhost",
|
|
||||||
},
|
|
||||||
wantRes: api.QuerySharedUsersResponse{
|
|
||||||
UserIDsToCount: map[string]int{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
// left real user produces no shared users
|
|
||||||
{
|
|
||||||
req: api.QuerySharedUsersRequest{
|
|
||||||
UserID: "@dave:localhost",
|
|
||||||
},
|
|
||||||
wantRes: api.QuerySharedUsersResponse{
|
|
||||||
UserIDsToCount: map[string]int{},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
// left real user but with included room returns the included room member
|
|
||||||
{
|
|
||||||
req: api.QuerySharedUsersRequest{
|
|
||||||
UserID: "@dave:localhost",
|
|
||||||
IncludeRoomIDs: []string{"!foo:bar"},
|
|
||||||
},
|
|
||||||
wantRes: api.QuerySharedUsersResponse{
|
|
||||||
UserIDsToCount: map[string]int{
|
|
||||||
"@alice:localhost": 1,
|
|
||||||
"@bob:localhost": 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
|
|
||||||
// including a room more than once doesn't double counts
|
|
||||||
{
|
|
||||||
req: api.QuerySharedUsersRequest{
|
|
||||||
UserID: "@dave:localhost",
|
|
||||||
IncludeRoomIDs: []string{"!foo:bar", "!foo:bar", "!foo:bar"},
|
|
||||||
},
|
|
||||||
wantRes: api.QuerySharedUsersResponse{
|
|
||||||
UserIDsToCount: map[string]int{
|
|
||||||
"@alice:localhost": 1,
|
|
||||||
"@bob:localhost": 1,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
runCases := func(testAPI api.CurrentStateInternalAPI) {
|
|
||||||
for _, tc := range testCases {
|
|
||||||
var res api.QuerySharedUsersResponse
|
|
||||||
err := testAPI.QuerySharedUsers(context.Background(), &tc.req, &res)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("QuerySharedUsers returned error: %s", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !reflect.DeepEqual(res.UserIDsToCount, tc.wantRes.UserIDsToCount) {
|
|
||||||
t.Errorf("QuerySharedUsers got users %+v want %+v", res.UserIDsToCount, tc.wantRes.UserIDsToCount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Run("HTTP API", func(t *testing.T) {
|
|
||||||
router := mux.NewRouter().PathPrefix(httputil.InternalPathPrefix).Subrouter()
|
|
||||||
AddInternalRoutes(router, currStateAPI)
|
|
||||||
apiURL, cancel := test.ListenAndServe(t, router, false)
|
|
||||||
defer cancel()
|
|
||||||
httpAPI, err := inthttp.NewCurrentStateAPIClient(apiURL, &http.Client{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("failed to create HTTP client")
|
|
||||||
}
|
|
||||||
runCases(httpAPI)
|
|
||||||
})
|
|
||||||
t.Run("Monolith", func(t *testing.T) {
|
|
||||||
runCases(currStateAPI)
|
|
||||||
})
|
|
||||||
}
|
|
|
@ -54,33 +54,3 @@ func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *CurrentStateInternalAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error {
|
|
||||||
roomIDs, err := a.DB.GetRoomsByMembership(ctx, req.UserID, "join")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
roomIDs = append(roomIDs, req.IncludeRoomIDs...)
|
|
||||||
excludeMap := make(map[string]bool)
|
|
||||||
for _, roomID := range req.ExcludeRoomIDs {
|
|
||||||
excludeMap[roomID] = true
|
|
||||||
}
|
|
||||||
// filter out excluded rooms
|
|
||||||
j := 0
|
|
||||||
for i := range roomIDs {
|
|
||||||
// move elements to include to the beginning of the slice
|
|
||||||
// then trim elements on the right
|
|
||||||
if !excludeMap[roomIDs[i]] {
|
|
||||||
roomIDs[j] = roomIDs[i]
|
|
||||||
j++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
roomIDs = roomIDs[:j]
|
|
||||||
|
|
||||||
users, err := a.DB.JoinedUsersSetInRooms(ctx, roomIDs)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
res.UserIDsToCount = users
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ import (
|
||||||
const (
|
const (
|
||||||
QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser"
|
QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser"
|
||||||
QueryBulkStateContentPath = "/currentstateserver/queryBulkStateContent"
|
QueryBulkStateContentPath = "/currentstateserver/queryBulkStateContent"
|
||||||
QuerySharedUsersPath = "/currentstateserver/querySharedUsers"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
|
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
|
||||||
|
@ -74,13 +73,3 @@ func (h *httpCurrentStateInternalAPI) QueryBulkStateContent(
|
||||||
apiURL := h.apiURL + QueryBulkStateContentPath
|
apiURL := h.apiURL + QueryBulkStateContentPath
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpCurrentStateInternalAPI) QuerySharedUsers(
|
|
||||||
ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse,
|
|
||||||
) error {
|
|
||||||
span, ctx := opentracing.StartSpanFromContext(ctx, "QuerySharedUsers")
|
|
||||||
defer span.Finish()
|
|
||||||
|
|
||||||
apiURL := h.apiURL + QuerySharedUsersPath
|
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
|
|
||||||
}
|
|
||||||
|
|
|
@ -51,17 +51,4 @@ func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
internalAPIMux.Handle(QuerySharedUsersPath,
|
|
||||||
httputil.MakeInternalAPI("querySharedUsers", func(req *http.Request) util.JSONResponse {
|
|
||||||
request := api.QuerySharedUsersRequest{}
|
|
||||||
response := api.QuerySharedUsersResponse{}
|
|
||||||
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
|
||||||
return util.MessageResponse(http.StatusBadRequest, err.Error())
|
|
||||||
}
|
|
||||||
if err := intAPI.QuerySharedUsers(req.Context(), &request, &response); err != nil {
|
|
||||||
return util.ErrorResponse(err)
|
|
||||||
}
|
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
|
||||||
}),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,10 +79,10 @@ const selectRoomIDsSQL = "" +
|
||||||
"SELECT room_id FROM roomserver_rooms"
|
"SELECT room_id FROM roomserver_rooms"
|
||||||
|
|
||||||
const bulkSelectRoomIDsSQL = "" +
|
const bulkSelectRoomIDsSQL = "" +
|
||||||
"SELECT room_id FROM roomserver_rooms WHERE room_nid IN ($1)"
|
"SELECT room_id FROM roomserver_rooms WHERE room_nid = ANY($1)"
|
||||||
|
|
||||||
const bulkSelectRoomNIDsSQL = "" +
|
const bulkSelectRoomNIDsSQL = "" +
|
||||||
"SELECT room_nid FROM roomserver_rooms WHERE room_id IN ($1)"
|
"SELECT room_nid FROM roomserver_rooms WHERE room_id = ANY($1)"
|
||||||
|
|
||||||
type roomStatements struct {
|
type roomStatements struct {
|
||||||
insertRoomNIDStmt *sql.Stmt
|
insertRoomNIDStmt *sql.Stmt
|
||||||
|
|
|
@ -774,15 +774,18 @@ func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership
|
||||||
}
|
}
|
||||||
stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID)
|
stateKeyNID, err := d.EventStateKeysTable.SelectEventStateKeyNID(ctx, nil, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
return nil, fmt.Errorf("GetRoomsByMembership: cannot map user ID to state key NID: %w", err)
|
return nil, fmt.Errorf("GetRoomsByMembership: cannot map user ID to state key NID: %w", err)
|
||||||
}
|
}
|
||||||
roomNIDs, err := d.MembershipTable.SelectRoomsWithMembership(ctx, stateKeyNID, membershipState)
|
roomNIDs, err := d.MembershipTable.SelectRoomsWithMembership(ctx, stateKeyNID, membershipState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("GetRoomsByMembership: failed to SelectRoomsWithMembership: %w", err)
|
||||||
}
|
}
|
||||||
roomIDs, err := d.RoomsTable.BulkSelectRoomIDs(ctx, roomNIDs)
|
roomIDs, err := d.RoomsTable.BulkSelectRoomIDs(ctx, roomNIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, fmt.Errorf("GetRoomsByMembership: failed to lookup room nids: %w", err)
|
||||||
}
|
}
|
||||||
if len(roomIDs) != len(roomNIDs) {
|
if len(roomIDs) != len(roomNIDs) {
|
||||||
return nil, fmt.Errorf("GetRoomsByMembership: missing room IDs, got %d want %d", len(roomIDs), len(roomNIDs))
|
return nil, fmt.Errorf("GetRoomsByMembership: missing room IDs, got %d want %d", len(roomIDs), len(roomNIDs))
|
||||||
|
|
|
@ -41,7 +41,7 @@ const membershipSchema = `
|
||||||
`
|
`
|
||||||
|
|
||||||
var selectJoinedUsersSetForRoomsSQL = "" +
|
var selectJoinedUsersSetForRoomsSQL = "" +
|
||||||
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership WHERE room_nid = ANY($1) AND" +
|
"SELECT target_nid, COUNT(room_nid) FROM roomserver_membership WHERE room_nid IN ($1) AND" +
|
||||||
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " GROUP BY target_nid"
|
" membership_nid = " + fmt.Sprintf("%d", tables.MembershipStateJoin) + " GROUP BY target_nid"
|
||||||
|
|
||||||
// Insert a row in to membership table so that it can be locked by the
|
// Insert a row in to membership table so that it can be locked by the
|
||||||
|
|
|
@ -23,6 +23,7 @@ import (
|
||||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/internal"
|
"github.com/matrix-org/dendrite/internal"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
|
syncinternal "github.com/matrix-org/dendrite/syncapi/internal"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
|
syncapi "github.com/matrix-org/dendrite/syncapi/sync"
|
||||||
|
@ -36,7 +37,8 @@ type OutputKeyChangeEventConsumer struct {
|
||||||
keyChangeConsumer *internal.ContinualConsumer
|
keyChangeConsumer *internal.ContinualConsumer
|
||||||
db storage.Database
|
db storage.Database
|
||||||
serverName gomatrixserverlib.ServerName // our server name
|
serverName gomatrixserverlib.ServerName // our server name
|
||||||
currentStateAPI currentstateAPI.CurrentStateInternalAPI
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
|
stateAPI currentstateAPI.CurrentStateInternalAPI
|
||||||
keyAPI api.KeyInternalAPI
|
keyAPI api.KeyInternalAPI
|
||||||
partitionToOffset map[int32]int64
|
partitionToOffset map[int32]int64
|
||||||
partitionToOffsetMu sync.Mutex
|
partitionToOffsetMu sync.Mutex
|
||||||
|
@ -51,7 +53,8 @@ func NewOutputKeyChangeEventConsumer(
|
||||||
kafkaConsumer sarama.Consumer,
|
kafkaConsumer sarama.Consumer,
|
||||||
n *syncapi.Notifier,
|
n *syncapi.Notifier,
|
||||||
keyAPI api.KeyInternalAPI,
|
keyAPI api.KeyInternalAPI,
|
||||||
currentStateAPI currentstateAPI.CurrentStateInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
|
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
store storage.Database,
|
store storage.Database,
|
||||||
) *OutputKeyChangeEventConsumer {
|
) *OutputKeyChangeEventConsumer {
|
||||||
|
|
||||||
|
@ -67,7 +70,8 @@ func NewOutputKeyChangeEventConsumer(
|
||||||
db: store,
|
db: store,
|
||||||
serverName: serverName,
|
serverName: serverName,
|
||||||
keyAPI: keyAPI,
|
keyAPI: keyAPI,
|
||||||
currentStateAPI: currentStateAPI,
|
rsAPI: rsAPI,
|
||||||
|
stateAPI: stateAPI,
|
||||||
partitionToOffset: make(map[int32]int64),
|
partitionToOffset: make(map[int32]int64),
|
||||||
partitionToOffsetMu: sync.Mutex{},
|
partitionToOffsetMu: sync.Mutex{},
|
||||||
notifier: n,
|
notifier: n,
|
||||||
|
@ -105,8 +109,8 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// work out who we need to notify about the new key
|
// work out who we need to notify about the new key
|
||||||
var queryRes currentstateAPI.QuerySharedUsersResponse
|
var queryRes roomserverAPI.QuerySharedUsersResponse
|
||||||
err := s.currentStateAPI.QuerySharedUsers(context.Background(), ¤tstateAPI.QuerySharedUsersRequest{
|
err := s.rsAPI.QuerySharedUsers(context.Background(), &roomserverAPI.QuerySharedUsersRequest{
|
||||||
UserID: output.UserID,
|
UserID: output.UserID,
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -115,7 +119,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
}
|
}
|
||||||
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
|
// TODO: f.e queryRes.UserIDsToCount : notify users by waking up streams
|
||||||
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
|
posUpdate := types.NewStreamToken(0, 0, map[string]*types.LogPosition{
|
||||||
syncinternal.DeviceListLogName: &types.LogPosition{
|
syncinternal.DeviceListLogName: {
|
||||||
Offset: msg.Offset,
|
Offset: msg.Offset,
|
||||||
Partition: msg.Partition,
|
Partition: msg.Partition,
|
||||||
},
|
},
|
||||||
|
@ -129,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
||||||
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
||||||
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
|
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
|
||||||
// users keys:
|
// users keys:
|
||||||
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
|
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
|
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
|
||||||
return
|
return
|
||||||
|
@ -142,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
|
||||||
|
|
||||||
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
||||||
// work out who we are no longer sharing any rooms with and notify them about the leaving user
|
// work out who we are no longer sharing any rooms with and notify them about the leaving user
|
||||||
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.currentStateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
|
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
|
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
|
||||||
return
|
return
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
"github.com/matrix-org/dendrite/keyserver/api"
|
"github.com/matrix-org/dendrite/keyserver/api"
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
|
@ -48,7 +49,8 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
|
||||||
// be already filled in with join/leave information.
|
// be already filled in with join/leave information.
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func DeviceListCatchup(
|
func DeviceListCatchup(
|
||||||
ctx context.Context, keyAPI keyapi.KeyInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||||
|
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
userID string, res *types.Response, from, to types.StreamingToken,
|
userID string, res *types.Response, from, to types.StreamingToken,
|
||||||
) (hasNew bool, err error) {
|
) (hasNew bool, err error) {
|
||||||
|
|
||||||
|
@ -56,7 +58,7 @@ func DeviceListCatchup(
|
||||||
newlyJoinedRooms := joinedRooms(res, userID)
|
newlyJoinedRooms := joinedRooms(res, userID)
|
||||||
newlyLeftRooms := leftRooms(res)
|
newlyLeftRooms := leftRooms(res)
|
||||||
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
|
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
|
||||||
changed, left, err := TrackChangedUsers(ctx, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
|
changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
@ -97,7 +99,7 @@ func DeviceListCatchup(
|
||||||
}
|
}
|
||||||
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
|
// QueryKeyChanges gets ALL users who have changed keys, we want the ones who share rooms with the user.
|
||||||
var sharedUsersMap map[string]int
|
var sharedUsersMap map[string]int
|
||||||
sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, stateAPI, userID, queryRes.UserIDs)
|
sharedUsersMap, queryRes.UserIDs = filterSharedUsers(ctx, rsAPI, userID, queryRes.UserIDs)
|
||||||
util.GetLogger(ctx).Debugf(
|
util.GetLogger(ctx).Debugf(
|
||||||
"QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
|
"QueryKeyChanges request p=%d,off=%d,to=%d response p=%d off=%d uids=%v",
|
||||||
partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
|
partition, offset, toOffset, queryRes.Partition, queryRes.Offset, queryRes.UserIDs,
|
||||||
|
@ -142,7 +144,7 @@ func DeviceListCatchup(
|
||||||
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
|
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func TrackChangedUsers(
|
func TrackChangedUsers(
|
||||||
ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||||
) (changed, left []string, err error) {
|
) (changed, left []string, err error) {
|
||||||
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
|
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
|
||||||
|
|
||||||
|
@ -151,8 +153,8 @@ func TrackChangedUsers(
|
||||||
// - Get users in newly left room. - QueryCurrentState
|
// - Get users in newly left room. - QueryCurrentState
|
||||||
// - Loop set of users and decrement by 1 for each user in newly left room.
|
// - Loop set of users and decrement by 1 for each user in newly left room.
|
||||||
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
|
// - If count=0 then they share no more rooms so inform BOTH parties of this via 'left'=[...] in /sync.
|
||||||
var queryRes currentstateAPI.QuerySharedUsersResponse
|
var queryRes roomserverAPI.QuerySharedUsersResponse
|
||||||
err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{
|
err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
IncludeRoomIDs: newlyLeftRooms,
|
IncludeRoomIDs: newlyLeftRooms,
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
|
@ -193,7 +195,7 @@ func TrackChangedUsers(
|
||||||
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
|
// - Loop set of users in newly joined room, do they appear in the set of users prior to joining?
|
||||||
// - If yes: then they already shared a room in common, do nothing.
|
// - If yes: then they already shared a room in common, do nothing.
|
||||||
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
|
// - If no: then they are a brand new user so inform BOTH parties of this via 'changed=[...]'
|
||||||
err = stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{
|
err = rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
ExcludeRoomIDs: newlyJoinedRooms,
|
ExcludeRoomIDs: newlyJoinedRooms,
|
||||||
}, &queryRes)
|
}, &queryRes)
|
||||||
|
@ -228,11 +230,11 @@ func TrackChangedUsers(
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterSharedUsers(
|
func filterSharedUsers(
|
||||||
ctx context.Context, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, usersWithChangedKeys []string,
|
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, usersWithChangedKeys []string,
|
||||||
) (map[string]int, []string) {
|
) (map[string]int, []string) {
|
||||||
var result []string
|
var result []string
|
||||||
var sharedUsersRes currentstateAPI.QuerySharedUsersResponse
|
var sharedUsersRes roomserverAPI.QuerySharedUsersResponse
|
||||||
err := stateAPI.QuerySharedUsers(ctx, ¤tstateAPI.QuerySharedUsersRequest{
|
err := rsAPI.QuerySharedUsers(ctx, &roomserverAPI.QuerySharedUsersRequest{
|
||||||
UserID: userID,
|
UserID: userID,
|
||||||
}, &sharedUsersRes)
|
}, &sharedUsersRes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -7,8 +7,9 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/Shopify/sarama"
|
"github.com/Shopify/sarama"
|
||||||
"github.com/matrix-org/dendrite/currentstateserver/api"
|
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
"github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
@ -49,17 +50,18 @@ func (k *mockKeyAPI) InputDeviceListUpdate(ctx context.Context, req *keyapi.Inpu
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type mockCurrentStateAPI struct {
|
type mockRoomserverAPI struct {
|
||||||
|
api.RoomserverInternalAPITrace
|
||||||
roomIDToJoinedMembers map[string][]string
|
roomIDToJoinedMembers map[string][]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
||||||
func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
|
func (s *mockRoomserverAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||||
func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
|
func (s *mockRoomserverAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
|
||||||
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
|
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
|
||||||
if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" {
|
if req.AllowWildcards && len(req.StateTuples) == 1 && req.StateTuples[0].EventType == gomatrixserverlib.MRoomMember && req.StateTuples[0].StateKey == "*" {
|
||||||
for _, roomID := range req.RoomIDs {
|
for _, roomID := range req.RoomIDs {
|
||||||
|
@ -76,7 +78,7 @@ func (s *mockCurrentStateAPI) QueryBulkStateContent(ctx context.Context, req *ap
|
||||||
}
|
}
|
||||||
|
|
||||||
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
|
// QuerySharedUsers returns a list of users who share at least 1 room in common with the given user.
|
||||||
func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error {
|
func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.QuerySharedUsersRequest, res *api.QuerySharedUsersResponse) error {
|
||||||
roomsToQuery := req.IncludeRoomIDs
|
roomsToQuery := req.IncludeRoomIDs
|
||||||
for roomID, members := range s.roomIDToJoinedMembers {
|
for roomID, members := range s.roomIDToJoinedMembers {
|
||||||
exclude := false
|
exclude := false
|
||||||
|
@ -106,6 +108,30 @@ func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.Que
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mockStateAPI struct {
|
||||||
|
rsAPI *mockRoomserverAPI
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
|
||||||
|
func (s *mockStateAPI) QueryRoomsForUser(ctx context.Context, req *stateapi.QueryRoomsForUserRequest, res *stateapi.QueryRoomsForUserResponse) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||||
|
func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
|
||||||
|
var res2 api.QueryBulkStateContentResponse
|
||||||
|
err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{
|
||||||
|
RoomIDs: req.RoomIDs,
|
||||||
|
AllowWildcards: req.AllowWildcards,
|
||||||
|
StateTuples: req.StateTuples,
|
||||||
|
}, &res2)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
res.Rooms = res2.Rooms
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type wantCatchup struct {
|
type wantCatchup struct {
|
||||||
hasNew bool
|
hasNew bool
|
||||||
changed []string
|
changed []string
|
||||||
|
@ -173,12 +199,13 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
newlyJoinedRoom: {syncingUser, newShareUser},
|
newlyJoinedRoom: {syncingUser, newShareUser},
|
||||||
"!another:room": {syncingUser},
|
"!another:room": {syncingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -195,12 +222,13 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
newlyLeftRoom: {removeUser},
|
newlyLeftRoom: {removeUser},
|
||||||
"!another:room": {syncingUser},
|
"!another:room": {syncingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -217,12 +245,13 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
newlyJoinedRoom: {syncingUser, existingUser},
|
newlyJoinedRoom: {syncingUser, existingUser},
|
||||||
"!another:room": {syncingUser, existingUser},
|
"!another:room": {syncingUser, existingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -238,12 +267,13 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
|
||||||
syncResponse := types.NewResponse()
|
syncResponse := types.NewResponse()
|
||||||
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
newlyLeftRoom: {existingUser},
|
newlyLeftRoom: {existingUser},
|
||||||
"!another:room": {syncingUser, existingUser},
|
"!another:room": {syncingUser, existingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -297,11 +327,12 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
|
||||||
jr.Timeline.Events = roomTimelineEvents
|
jr.Timeline.Events = roomTimelineEvents
|
||||||
syncResponse.Rooms.Join[roomID] = jr
|
syncResponse.Rooms.Join[roomID] = jr
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
roomID: {syncingUser, existingUser},
|
roomID: {syncingUser, existingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -322,13 +353,14 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
|
||||||
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
syncResponse = joinResponseWithRooms(syncResponse, syncingUser, []string{newlyJoinedRoom})
|
||||||
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
syncResponse = leaveResponseWithRooms(syncResponse, syncingUser, []string{newlyLeftRoom})
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
|
newlyJoinedRoom: {syncingUser, newShareUser, newShareUser2},
|
||||||
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
|
newlyLeftRoom: {newlyLeftUser, newlyLeftUser2},
|
||||||
"!another:room": {syncingUser},
|
"!another:room": {syncingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Catchup returned an error: %s", err)
|
t.Fatalf("Catchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -407,12 +439,15 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
|
||||||
lr.Timeline.Events = roomEvents
|
lr.Timeline.Events = roomEvents
|
||||||
syncResponse.Rooms.Leave[roomID] = lr
|
syncResponse.Rooms.Leave[roomID] = lr
|
||||||
|
|
||||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, &mockCurrentStateAPI{
|
rsAPI := &mockRoomserverAPI{
|
||||||
roomIDToJoinedMembers: map[string][]string{
|
roomIDToJoinedMembers: map[string][]string{
|
||||||
roomID: {newShareUser, newShareUser2},
|
roomID: {newShareUser, newShareUser2},
|
||||||
"!another:room": {syncingUser},
|
"!another:room": {syncingUser},
|
||||||
},
|
},
|
||||||
}, syncingUser, syncResponse, emptyToken, newestToken)
|
}
|
||||||
|
hasNew, err := DeviceListCatchup(
|
||||||
|
context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken,
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
||||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||||
|
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||||
"github.com/matrix-org/dendrite/syncapi/internal"
|
"github.com/matrix-org/dendrite/syncapi/internal"
|
||||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
@ -40,15 +41,16 @@ type RequestPool struct {
|
||||||
userAPI userapi.UserInternalAPI
|
userAPI userapi.UserInternalAPI
|
||||||
notifier *Notifier
|
notifier *Notifier
|
||||||
keyAPI keyapi.KeyInternalAPI
|
keyAPI keyapi.KeyInternalAPI
|
||||||
|
rsAPI roomserverAPI.RoomserverInternalAPI
|
||||||
stateAPI currentstateAPI.CurrentStateInternalAPI
|
stateAPI currentstateAPI.CurrentStateInternalAPI
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequestPool makes a new RequestPool
|
// NewRequestPool makes a new RequestPool
|
||||||
func NewRequestPool(
|
func NewRequestPool(
|
||||||
db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
|
||||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||||
) *RequestPool {
|
) *RequestPool {
|
||||||
return &RequestPool{db, userAPI, n, keyAPI, stateAPI}
|
return &RequestPool{db, userAPI, n, keyAPI, rsAPI, stateAPI}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
|
||||||
|
@ -265,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
||||||
func (rp *RequestPool) appendDeviceLists(
|
func (rp *RequestPool) appendDeviceLists(
|
||||||
data *types.Response, userID string, since, to types.StreamingToken,
|
data *types.Response, userID string, since, to types.StreamingToken,
|
||||||
) (*types.Response, error) {
|
) (*types.Response, error) {
|
||||||
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.stateAPI, userID, data, since, to)
|
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
|
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -62,11 +62,11 @@ func AddPublicRoutes(
|
||||||
logrus.WithError(err).Panicf("failed to start notifier")
|
logrus.WithError(err).Panicf("failed to start notifier")
|
||||||
}
|
}
|
||||||
|
|
||||||
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, currentStateAPI)
|
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI, currentStateAPI)
|
||||||
|
|
||||||
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
||||||
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
||||||
consumer, notifier, keyAPI, currentStateAPI, syncDB,
|
consumer, notifier, keyAPI, rsAPI, currentStateAPI, syncDB,
|
||||||
)
|
)
|
||||||
if err = keyChangeConsumer.Start(); err != nil {
|
if err = keyChangeConsumer.Start(); err != nil {
|
||||||
logrus.WithError(err).Panicf("failed to start key change consumer")
|
logrus.WithError(err).Panicf("failed to start key change consumer")
|
||||||
|
|
Loading…
Reference in a new issue