Merge branch 'master' into matthew/peeking

This commit is contained in:
Neil Alexander 2020-09-04 14:15:51 +01:00
commit 2b8f0b8f59
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
29 changed files with 113 additions and 715 deletions

View file

@ -53,14 +53,6 @@ type mockCurrentStateAPI struct {
roomIDToJoinedMembers map[string][]string
}
func (s *mockCurrentStateAPI) QueryCurrentState(ctx context.Context, req *api.QueryCurrentStateRequest, res *api.QueryCurrentStateResponse) error {
return nil
}
func (s *mockCurrentStateAPI) QueryKnownUsers(ctx context.Context, req *api.QueryKnownUsersRequest, res *api.QueryKnownUsersResponse) error {
return nil
}
// QueryRoomsForUser retrieves a list of room IDs matching the given query.
func (s *mockCurrentStateAPI) QueryRoomsForUser(ctx context.Context, req *api.QueryRoomsForUserRequest, res *api.QueryRoomsForUserResponse) error {
return nil
@ -114,10 +106,6 @@ func (s *mockCurrentStateAPI) QuerySharedUsers(ctx context.Context, req *api.Que
return nil
}
func (t *mockCurrentStateAPI) QueryServerBannedFromRoom(ctx context.Context, req *api.QueryServerBannedFromRoomRequest, res *api.QueryServerBannedFromRoomResponse) error {
return nil
}
type wantCatchup struct {
hasNew bool
changed []string

View file

@ -698,9 +698,9 @@ func (d *Database) getResponseWithPDUsForCompleteSync(
return //res, toPos, joinedRoomIDs, err
}
func (d* Database) getJoinResponseForCompleteSync(
func (d *Database) getJoinResponseForCompleteSync(
ctx context.Context, txn *sql.Tx,
roomID string,
roomID string,
r types.Range,
stateFilter *gomatrixserverlib.StateFilter,
numRecentEventsPerRoom int,
@ -798,8 +798,10 @@ func (d *Database) addInvitesToResponse(
res.Rooms.Invite[roomID] = *ir
}
for roomID := range retiredInvites {
lr := types.NewLeaveResponse()
res.Rooms.Leave[roomID] = *lr
if _, ok := res.Rooms.Join[roomID]; !ok {
lr := types.NewLeaveResponse()
res.Rooms.Leave[roomID] = *lr
}
}
return nil
}
@ -1112,37 +1114,20 @@ func (d *Database) getStateDeltas(
// requests with full_state=true.
// Fetches full state for all joined rooms and uses selectStateInRange to get
// updates for other rooms.
// nolint:gocyclo
func (d *Database) getStateDeltasForFullStateSync(
ctx context.Context, device *userapi.Device, txn *sql.Tx,
r types.Range, userID string,
stateFilter *gomatrixserverlib.StateFilter,
) ([]stateDelta, []string, error) {
joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
if err != nil {
return nil, nil, err
}
// Use a reasonable initial capacity
deltas := make(map[string]stateDelta)
peeks, err := d.Peeks.SelectPeeks(ctx, txn, userID, device.ID)
if err != nil {
return nil, nil, err
}
// Use a reasonable initial capacity
deltas := make([]stateDelta, 0, len(joinedRoomIDs) + len(peeks))
// Add full states for all joined rooms
for _, joinedRoomID := range joinedRoomIDs {
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter)
if stateErr != nil {
return nil, nil, stateErr
}
deltas = append(deltas, stateDelta{
membership: gomatrixserverlib.Join,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: joinedRoomID,
})
}
// Add full states for all peeking rooms
newPeeks := false
for _, peek := range peeks {
@ -1153,16 +1138,16 @@ func (d *Database) getStateDeltasForFullStateSync(
if stateErr != nil {
return nil, nil, stateErr
}
deltas = append(deltas, stateDelta{
deltas[peek.RoomID] = stateDelta{
membership: gomatrixserverlib.Peek,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: peek.RoomID,
})
}
}
if newPeeks {
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
return d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
return d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
})
if err != nil {
return nil, nil, err
@ -1183,12 +1168,12 @@ func (d *Database) getStateDeltasForFullStateSync(
for _, ev := range stateStreamEvents {
if membership := getMembershipFromEvent(&ev.Event, userID); membership != "" {
if membership != gomatrixserverlib.Join { // We've already added full state for all joined rooms above.
deltas = append(deltas, stateDelta{
deltas[roomID] = stateDelta{
membership: membership,
membershipPos: ev.StreamPosition,
stateEvents: d.StreamEventsToEvents(device, stateStreamEvents),
roomID: roomID,
})
}
}
break
@ -1196,7 +1181,33 @@ func (d *Database) getStateDeltasForFullStateSync(
}
}
return deltas, joinedRoomIDs, nil
joinedRoomIDs, err := d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, txn, userID, gomatrixserverlib.Join)
if err != nil {
return nil, nil, err
}
// Add full states for all joined rooms
for _, joinedRoomID := range joinedRoomIDs {
s, stateErr := d.currentStateStreamEventsForRoom(ctx, txn, joinedRoomID, stateFilter)
if stateErr != nil {
return nil, nil, stateErr
}
deltas[joinedRoomID] = stateDelta{
membership: gomatrixserverlib.Join,
stateEvents: d.StreamEventsToEvents(device, s),
roomID: joinedRoomID,
}
}
// Create a response array.
result := make([]stateDelta, len(deltas))
i := 0
for _, delta := range deltas {
result[i] = delta
i++
}
return result, joinedRoomIDs, nil
}
func (d *Database) currentStateStreamEventsForRoom(