mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 13:22:46 +00:00
Remove QueryBulkStateContent from current state server (#1404)
* Remove QueryBulkStateContent from current state server Expected fail due to db impl not existing * Implement query bulk state content * Fix up rejecting invites over federation * Fix bulk content marshalling
This commit is contained in:
parent
895ead8048
commit
7913759921
19 changed files with 198 additions and 239 deletions
|
@ -133,7 +133,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
|||
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
||||
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
|
||||
// users keys:
|
||||
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
|
||||
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
|
||||
return
|
||||
|
@ -146,7 +146,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
|
|||
|
||||
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
|
||||
// work out who we are no longer sharing any rooms with and notify them about the leaving user
|
||||
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
|
||||
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
|
||||
return
|
||||
|
|
|
@ -19,7 +19,6 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
@ -50,7 +49,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
|
|||
// nolint:gocyclo
|
||||
func DeviceListCatchup(
|
||||
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
stateAPI currentstateAPI.CurrentStateInternalAPI,
|
||||
userID string, res *types.Response, from, to types.StreamingToken,
|
||||
) (hasNew bool, err error) {
|
||||
|
||||
|
@ -58,7 +56,7 @@ func DeviceListCatchup(
|
|||
newlyJoinedRooms := joinedRooms(res, userID)
|
||||
newlyLeftRooms := leftRooms(res)
|
||||
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
|
||||
changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
|
||||
changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
@ -144,7 +142,7 @@ func DeviceListCatchup(
|
|||
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
|
||||
// nolint:gocyclo
|
||||
func TrackChangedUsers(
|
||||
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||
) (changed, left []string, err error) {
|
||||
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
|
||||
|
||||
|
@ -161,8 +159,8 @@ func TrackChangedUsers(
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
var stateRes currentstateAPI.QueryBulkStateContentResponse
|
||||
err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{
|
||||
var stateRes roomserverAPI.QueryBulkStateContentResponse
|
||||
err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
|
||||
RoomIDs: newlyLeftRooms,
|
||||
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||
{
|
||||
|
@ -202,7 +200,7 @@ func TrackChangedUsers(
|
|||
if err != nil {
|
||||
return nil, left, err
|
||||
}
|
||||
err = stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{
|
||||
err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
|
||||
RoomIDs: newlyJoinedRooms,
|
||||
StateTuples: []gomatrixserverlib.StateKeyTuple{
|
||||
{
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
|
||||
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -108,25 +107,6 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query
|
|||
return nil
|
||||
}
|
||||
|
||||
type mockStateAPI struct {
|
||||
rsAPI *mockRoomserverAPI
|
||||
}
|
||||
|
||||
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
|
||||
func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
|
||||
var res2 api.QueryBulkStateContentResponse
|
||||
err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{
|
||||
RoomIDs: req.RoomIDs,
|
||||
AllowWildcards: req.AllowWildcards,
|
||||
StateTuples: req.StateTuples,
|
||||
}, &res2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res.Rooms = res2.Rooms
|
||||
return nil
|
||||
}
|
||||
|
||||
type wantCatchup struct {
|
||||
hasNew bool
|
||||
changed []string
|
||||
|
@ -200,7 +180,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
|
|||
"!another:room": {syncingUser},
|
||||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||
}
|
||||
|
@ -223,7 +203,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
|
|||
"!another:room": {syncingUser},
|
||||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||
}
|
||||
|
@ -246,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
|
|||
"!another:room": {syncingUser, existingUser},
|
||||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
if err != nil {
|
||||
t.Fatalf("Catchup returned an error: %s", err)
|
||||
}
|
||||
|
@ -268,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
|
|||
"!another:room": {syncingUser, existingUser},
|
||||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||
}
|
||||
|
@ -327,7 +307,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
|
|||
roomID: {syncingUser, existingUser},
|
||||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||
}
|
||||
|
@ -355,7 +335,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
|
|||
"!another:room": {syncingUser},
|
||||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
|
||||
if err != nil {
|
||||
t.Fatalf("Catchup returned an error: %s", err)
|
||||
}
|
||||
|
@ -441,7 +421,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
|
|||
},
|
||||
}
|
||||
hasNew, err := DeviceListCatchup(
|
||||
context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken,
|
||||
context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("DeviceListCatchup returned an error: %s", err)
|
||||
|
|
|
@ -267,7 +267,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
|
|||
func (rp *RequestPool) appendDeviceLists(
|
||||
data *types.Response, userID string, since, to types.StreamingToken,
|
||||
) (*types.Response, error) {
|
||||
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to)
|
||||
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue