mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-26 15:08:28 +00:00
Implement backfill over federation (#938)
* Implement history visibility checks for /backfill Required for p2p to show history correctly. * Add sytest * Logging * Fix two backfill bugs which prevented backfill from working correctly - When receiving backfill requests, do not send the event that was in the original request. - When storing backfill results, correctly update the backwards extremity for the room. * hack: make backfill work multiple times * add sqlite impl and remove logging * Linting
This commit is contained in:
parent
5a1a1ded1b
commit
6bac7e5efd
11 changed files with 322 additions and 202 deletions
6
go.sum
6
go.sum
|
@ -276,6 +276,8 @@ github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a h1:7+
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318145320-bc896516d72a/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203 h1:7HkL6bF7/M2cYteNFVtvGW5qjD4wHIiR0HsdCm2Rqao=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203 h1:7HkL6bF7/M2cYteNFVtvGW5qjD4wHIiR0HsdCm2Rqao=
|
||||||
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318150006-bc27294f9203/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
|
||||||
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2 h1:y4DOMbhgPAnATHJ4lNxTWxIlJG0SlIPhvukx1sQkty4=
|
||||||
|
github.com/matrix-org/gomatrixserverlib v0.0.0-20200318180716-bc4ff56961e2/go.mod h1:FsKa2pWE/bpQql9H7U4boOPXFoJX/QcqaZZ6ijLkaZI=
|
||||||
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
|
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1 h1:osLoFdOy+ChQqVUn2PeTDETFftVkl4w9t/OW18g3lnk=
|
||||||
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
|
github.com/matrix-org/naffka v0.0.0-20200127221512-0716baaabaf1/go.mod h1:cXoYQIENbdWIQHt1SyCo6Bl3C3raHwJ0wgVrXHSqf+A=
|
||||||
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=
|
github.com/matrix-org/util v0.0.0-20171127121716-2e2df66af2f5 h1:W7l5CP4V7wPyPb4tYE11dbmeAOwtFQBTW0rf4OonOS8=
|
||||||
|
@ -470,6 +472,8 @@ go.uber.org/atomic v1.3.0 h1:vs7fgriifsPbGdK3bNuMWapNn3qnZhCRXc19NRdq010=
|
||||||
go.uber.org/atomic v1.3.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
go.uber.org/atomic v1.3.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||||
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
|
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
|
||||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||||
|
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
|
||||||
|
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
|
||||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||||
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||||
|
@ -491,6 +495,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
|
||||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||||
|
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
|
||||||
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
|
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
|
||||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||||
|
@ -559,6 +564,7 @@ golang.org/x/tools v0.0.0-20190910044552-dd2b5c81c578/go.mod h1:b+2E5dAYhXwXZwtn
|
||||||
golang.org/x/tools v0.0.0-20190911230505-6bfd74cf029c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
golang.org/x/tools v0.0.0-20190911230505-6bfd74cf029c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678 h1:rM1Udd0CgtYI3KUIhu9ROz0QCqjW+n/ODp/hH7c60Xc=
|
golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678 h1:rM1Udd0CgtYI3KUIhu9ROz0QCqjW+n/ODp/hH7c60Xc=
|
||||||
golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
golang.org/x/tools v0.0.0-20190912215617-3720d1ec3678/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
|
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
|
||||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||||
|
|
|
@ -12,18 +12,76 @@
|
||||||
|
|
||||||
package auth
|
package auth
|
||||||
|
|
||||||
import "github.com/matrix-org/gomatrixserverlib"
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
// IsServerAllowed returns true if there exists a event in authEvents
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
// which allows server to view this event. That is true when a client on the server
|
)
|
||||||
// can view the event. Otherwise returns false.
|
|
||||||
|
// TODO: This logic should live in gomatrixserverlib
|
||||||
|
|
||||||
|
// IsServerAllowed returns true if the server is allowed to see events in the room
|
||||||
|
// at this particular state. This function implements https://matrix.org/docs/spec/client_server/r0.6.0#id87
|
||||||
func IsServerAllowed(
|
func IsServerAllowed(
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
|
serverCurrentlyInRoom bool,
|
||||||
authEvents []gomatrixserverlib.Event,
|
authEvents []gomatrixserverlib.Event,
|
||||||
) bool {
|
) bool {
|
||||||
|
historyVisibility := historyVisibilityForRoom(authEvents)
|
||||||
|
|
||||||
|
// 1. If the history_visibility was set to world_readable, allow.
|
||||||
|
if historyVisibility == "world_readable" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// 2. If the user's membership was join, allow.
|
||||||
|
joinedUserExists := IsAnyUserOnServerWithMembership(serverName, authEvents, gomatrixserverlib.Join)
|
||||||
|
if joinedUserExists {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// 3. If history_visibility was set to shared, and the user joined the room at any point after the event was sent, allow.
|
||||||
|
if historyVisibility == "shared" && serverCurrentlyInRoom {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// 4. If the user's membership was invite, and the history_visibility was set to invited, allow.
|
||||||
|
invitedUserExists := IsAnyUserOnServerWithMembership(serverName, authEvents, gomatrixserverlib.Invite)
|
||||||
|
if invitedUserExists && historyVisibility == "invited" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. Otherwise, deny.
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func historyVisibilityForRoom(authEvents []gomatrixserverlib.Event) string {
|
||||||
|
// https://matrix.org/docs/spec/client_server/r0.6.0#id87
|
||||||
|
// By default if no history_visibility is set, or if the value is not understood, the visibility is assumed to be shared.
|
||||||
|
visibility := "shared"
|
||||||
|
knownStates := []string{"invited", "joined", "shared", "world_readable"}
|
||||||
|
for _, ev := range authEvents {
|
||||||
|
if ev.Type() != gomatrixserverlib.MRoomHistoryVisibility {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// TODO: This should be HistoryVisibilityContent to match things like 'MemberContent'. Do this when moving to GMSL
|
||||||
|
content := struct {
|
||||||
|
HistoryVisibility string `json:"history_visibility"`
|
||||||
|
}{}
|
||||||
|
if err := json.Unmarshal(ev.Content(), &content); err != nil {
|
||||||
|
break // value is not understood
|
||||||
|
}
|
||||||
|
for _, s := range knownStates {
|
||||||
|
if s == content.HistoryVisibility {
|
||||||
|
visibility = s
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return visibility
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsAnyUserOnServerWithMembership(serverName gomatrixserverlib.ServerName, authEvents []gomatrixserverlib.Event, wantMembership string) bool {
|
||||||
for _, ev := range authEvents {
|
for _, ev := range authEvents {
|
||||||
membership, err := ev.Membership()
|
membership, err := ev.Membership()
|
||||||
if err != nil || membership != gomatrixserverlib.Join {
|
if err != nil || membership != wantMembership {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,7 +99,5 @@ func IsServerAllowed(
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Check if history visibility is shared and if the server is currently in the room
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
|
@ -447,14 +447,26 @@ func (r *RoomserverQueryAPI) QueryServerAllowedToSeeEvent(
|
||||||
request *api.QueryServerAllowedToSeeEventRequest,
|
request *api.QueryServerAllowedToSeeEventRequest,
|
||||||
response *api.QueryServerAllowedToSeeEventResponse,
|
response *api.QueryServerAllowedToSeeEventResponse,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
|
events, err := r.DB.EventsFromIDs(ctx, []string{request.EventID})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(events) == 0 {
|
||||||
|
response.AllowedToSeeEvent = false // event doesn't exist so not allowed to see
|
||||||
|
return
|
||||||
|
}
|
||||||
|
isServerInRoom, err := r.isServerCurrentlyInRoom(ctx, request.ServerName, events[0].RoomID())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
response.AllowedToSeeEvent, err = r.checkServerAllowedToSeeEvent(
|
response.AllowedToSeeEvent, err = r.checkServerAllowedToSeeEvent(
|
||||||
ctx, request.EventID, request.ServerName,
|
ctx, request.EventID, request.ServerName, isServerInRoom,
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent(
|
func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent(
|
||||||
ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName,
|
ctx context.Context, eventID string, serverName gomatrixserverlib.ServerName, isServerInRoom bool,
|
||||||
) (bool, error) {
|
) (bool, error) {
|
||||||
roomState := state.NewStateResolution(r.DB)
|
roomState := state.NewStateResolution(r.DB)
|
||||||
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
|
stateEntries, err := roomState.LoadStateAtEvent(ctx, eventID)
|
||||||
|
@ -469,7 +481,7 @@ func (r *RoomserverQueryAPI) checkServerAllowedToSeeEvent(
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return auth.IsServerAllowed(serverName, stateAtEvent), nil
|
return auth.IsServerAllowed(serverName, isServerInRoom, stateAtEvent), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryMissingEvents implements api.RoomserverQueryAPI
|
// QueryMissingEvents implements api.RoomserverQueryAPI
|
||||||
|
@ -564,17 +576,55 @@ func (r *RoomserverQueryAPI) QueryBackfill(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RoomserverQueryAPI) isServerCurrentlyInRoom(ctx context.Context, serverName gomatrixserverlib.ServerName, roomID string) (bool, error) {
|
||||||
|
roomNID, err := r.DB.RoomNID(ctx, roomID)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
eventNIDs, err := r.DB.GetMembershipEventNIDsForRoom(ctx, roomNID, true)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
events, err := r.DB.Events(ctx, eventNIDs)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
gmslEvents := make([]gomatrixserverlib.Event, len(events))
|
||||||
|
for i := range events {
|
||||||
|
gmslEvents[i] = events[i].Event
|
||||||
|
}
|
||||||
|
return auth.IsAnyUserOnServerWithMembership(serverName, gmslEvents, gomatrixserverlib.Join), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Remove this when we have tests to assert correctness of this function
|
||||||
|
// nolint:gocyclo
|
||||||
func (r *RoomserverQueryAPI) scanEventTree(
|
func (r *RoomserverQueryAPI) scanEventTree(
|
||||||
ctx context.Context, front []string, visited map[string]bool, limit int,
|
ctx context.Context, front []string, visited map[string]bool, limit int,
|
||||||
serverName gomatrixserverlib.ServerName,
|
serverName gomatrixserverlib.ServerName,
|
||||||
) (resultNIDs []types.EventNID, err error) {
|
) ([]types.EventNID, error) {
|
||||||
|
var resultNIDs []types.EventNID
|
||||||
|
var err error
|
||||||
var allowed bool
|
var allowed bool
|
||||||
var events []types.Event
|
var events []types.Event
|
||||||
var next []string
|
var next []string
|
||||||
var pre string
|
var pre string
|
||||||
|
|
||||||
|
// TODO: add tests for this function to ensure it meets the contract that callers expect (and doc what that is supposed to be)
|
||||||
|
// Currently, callers like QueryBackfill will call scanEventTree with a pre-populated `visited` map, assuming that by doing
|
||||||
|
// so means that the events in that map will NOT be returned from this function. That is not currently true, resulting in
|
||||||
|
// duplicate events being sent in response to /backfill requests.
|
||||||
|
initialIgnoreList := make(map[string]bool, len(visited))
|
||||||
|
for k, v := range visited {
|
||||||
|
initialIgnoreList[k] = v
|
||||||
|
}
|
||||||
|
|
||||||
resultNIDs = make([]types.EventNID, 0, limit)
|
resultNIDs = make([]types.EventNID, 0, limit)
|
||||||
|
|
||||||
|
var checkedServerInRoom bool
|
||||||
|
var isServerInRoom bool
|
||||||
|
|
||||||
// Loop through the event IDs to retrieve the requested events and go
|
// Loop through the event IDs to retrieve the requested events and go
|
||||||
// through the whole tree (up to the provided limit) using the events'
|
// through the whole tree (up to the provided limit) using the events'
|
||||||
// "prev_event" key.
|
// "prev_event" key.
|
||||||
|
@ -587,7 +637,18 @@ BFSLoop:
|
||||||
// Retrieve the events to process from the database.
|
// Retrieve the events to process from the database.
|
||||||
events, err = r.DB.EventsFromIDs(ctx, front)
|
events, err = r.DB.EventsFromIDs(ctx, front)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return resultNIDs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !checkedServerInRoom && len(events) > 0 {
|
||||||
|
// It's nasty that we have to extract the room ID from an event, but many federation requests
|
||||||
|
// only talk in event IDs, no room IDs at all (!!!)
|
||||||
|
ev := events[0]
|
||||||
|
isServerInRoom, err = r.isServerCurrentlyInRoom(ctx, serverName, ev.RoomID())
|
||||||
|
if err != nil {
|
||||||
|
util.GetLogger(ctx).WithError(err).Error("Failed to check if server is currently in room, assuming not.")
|
||||||
|
}
|
||||||
|
checkedServerInRoom = true
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, ev := range events {
|
for _, ev := range events {
|
||||||
|
@ -595,17 +656,23 @@ BFSLoop:
|
||||||
if len(resultNIDs) == limit {
|
if len(resultNIDs) == limit {
|
||||||
break BFSLoop
|
break BFSLoop
|
||||||
}
|
}
|
||||||
// Update the list of events to retrieve.
|
|
||||||
resultNIDs = append(resultNIDs, ev.EventNID)
|
if !initialIgnoreList[ev.EventID()] {
|
||||||
|
// Update the list of events to retrieve.
|
||||||
|
resultNIDs = append(resultNIDs, ev.EventNID)
|
||||||
|
}
|
||||||
// Loop through the event's parents.
|
// Loop through the event's parents.
|
||||||
for _, pre = range ev.PrevEventIDs() {
|
for _, pre = range ev.PrevEventIDs() {
|
||||||
// Only add an event to the list of next events to process if it
|
// Only add an event to the list of next events to process if it
|
||||||
// hasn't been seen before.
|
// hasn't been seen before.
|
||||||
if !visited[pre] {
|
if !visited[pre] {
|
||||||
visited[pre] = true
|
visited[pre] = true
|
||||||
allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName)
|
allowed, err = r.checkServerAllowedToSeeEvent(ctx, pre, serverName, isServerInRoom)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).WithError(err).Error(
|
||||||
|
"Error checking if allowed to see event",
|
||||||
|
)
|
||||||
|
return resultNIDs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the event hasn't been seen before and the HS
|
// If the event hasn't been seen before and the HS
|
||||||
|
@ -613,6 +680,8 @@ BFSLoop:
|
||||||
// the list of events to retrieve.
|
// the list of events to retrieve.
|
||||||
if allowed {
|
if allowed {
|
||||||
next = append(next, pre)
|
next = append(next, pre)
|
||||||
|
} else {
|
||||||
|
util.GetLogger(ctx).WithField("server", serverName).WithField("event_id", pre).Info("Not allowed to see event")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -621,7 +690,7 @@ BFSLoop:
|
||||||
front = next
|
front = next
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return resultNIDs, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// QueryStateAndAuthChain implements api.RoomserverQueryAPI
|
// QueryStateAndAuthChain implements api.RoomserverQueryAPI
|
||||||
|
|
|
@ -28,7 +28,7 @@ import (
|
||||||
"github.com/matrix-org/dendrite/syncapi/types"
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
"github.com/matrix-org/gomatrixserverlib"
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
"github.com/matrix-org/util"
|
"github.com/matrix-org/util"
|
||||||
log "github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
type messagesReq struct {
|
type messagesReq struct {
|
||||||
|
@ -151,6 +151,14 @@ func OnIncomingMessagesRequest(
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
|
util.GetLogger(req.Context()).WithError(err).Error("mreq.retrieveEvents failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||||
|
"from": from.String(),
|
||||||
|
"to": to.String(),
|
||||||
|
"limit": limit,
|
||||||
|
"backwards": backwardOrdering,
|
||||||
|
"return_start": start.String(),
|
||||||
|
"return_end": end.String(),
|
||||||
|
}).Info("Responding")
|
||||||
|
|
||||||
// Respond with the events.
|
// Respond with the events.
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
|
@ -302,8 +310,9 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
events []gomatrixserverlib.HeaderedEvent, err error,
|
events []gomatrixserverlib.HeaderedEvent, err error,
|
||||||
) {
|
) {
|
||||||
// Check if we have enough events.
|
// Check if we have enough events.
|
||||||
isSetLargeEnough := true
|
isSetLargeEnough := len(streamEvents) >= r.limit
|
||||||
if len(streamEvents) < r.limit {
|
if !isSetLargeEnough {
|
||||||
|
// it might be fine we don't have up to 'limit' events, let's find out
|
||||||
if r.backwardOrdering {
|
if r.backwardOrdering {
|
||||||
if r.wasToProvided {
|
if r.wasToProvided {
|
||||||
// The condition in the SQL query is a strict "greater than" so
|
// The condition in the SQL query is a strict "greater than" so
|
||||||
|
@ -343,54 +352,6 @@ func (r *messagesReq) handleNonEmptyEventsSlice(streamEvents []types.StreamEvent
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// containsBackwardExtremity checks if a slice of StreamEvent contains a
|
|
||||||
// backward extremity. It does so by selecting the earliest event in the slice
|
|
||||||
// and by checking the presence in the database of all of its parent events, and
|
|
||||||
// considers the event itself a backward extremity if at least one of the parent
|
|
||||||
// events doesn't exist in the database.
|
|
||||||
// Returns an error if there was an issue with talking to the database.
|
|
||||||
//
|
|
||||||
// This function is unused but currently set to nolint for now until we are
|
|
||||||
// absolutely sure that the changes in matrix-org/dendrite#847 are behaving
|
|
||||||
// properly.
|
|
||||||
// nolint:unused
|
|
||||||
func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (bool, error) {
|
|
||||||
// Select the earliest retrieved event.
|
|
||||||
var ev *types.StreamEvent
|
|
||||||
if r.backwardOrdering {
|
|
||||||
ev = &(events[len(events)-1])
|
|
||||||
} else {
|
|
||||||
ev = &(events[0])
|
|
||||||
}
|
|
||||||
// Get the earliest retrieved event's parents.
|
|
||||||
prevIDs := ev.PrevEventIDs()
|
|
||||||
prevs, err := r.db.Events(r.ctx, prevIDs)
|
|
||||||
if err != nil {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
// Check if we have all of the events we requested. If not, it means we've
|
|
||||||
// reached a backward extremity.
|
|
||||||
var eventInDB bool
|
|
||||||
var id string
|
|
||||||
// Iterate over the IDs we used in the request.
|
|
||||||
for _, id = range prevIDs {
|
|
||||||
eventInDB = false
|
|
||||||
// Iterate over the events we got in response.
|
|
||||||
for _, ev := range prevs {
|
|
||||||
if ev.EventID() == id {
|
|
||||||
eventInDB = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// One occurrence of one the event's parents not being present in the
|
|
||||||
// database is enough to say that the event is a backward extremity.
|
|
||||||
if !eventInDB {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// backfill performs a backfill request over the federation on another
|
// backfill performs a backfill request over the federation on another
|
||||||
// homeserver in the room.
|
// homeserver in the room.
|
||||||
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
|
// See: https://matrix.org/docs/spec/server_server/latest#get-matrix-federation-v1-backfill-roomid
|
||||||
|
@ -401,6 +362,48 @@ func (r *messagesReq) containsBackwardExtremity(events []types.StreamEvent) (boo
|
||||||
// Returns an error if there was an issue with retrieving the list of servers in
|
// Returns an error if there was an issue with retrieving the list of servers in
|
||||||
// the room or sending the request.
|
// the room or sending the request.
|
||||||
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
|
func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserverlib.HeaderedEvent, error) {
|
||||||
|
srvToBackfillFrom, err := r.serverToBackfillFrom(fromEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Cannot find server to backfill from: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
|
||||||
|
|
||||||
|
// If the roomserver responded with at least one server that isn't us,
|
||||||
|
// send it a request for backfill.
|
||||||
|
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("limit", limit).Info("Backfilling from server")
|
||||||
|
txn, err := r.federation.Backfill(
|
||||||
|
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, p := range txn.PDUs {
|
||||||
|
pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
|
||||||
|
}
|
||||||
|
util.GetLogger(r.ctx).WithField("server", srvToBackfillFrom).WithField("new_events", len(pdus)).Info("Storing new events from backfill")
|
||||||
|
|
||||||
|
// Store the events in the database, while marking them as unfit to show
|
||||||
|
// up in responses to sync requests.
|
||||||
|
for _, pdu := range pdus {
|
||||||
|
headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
|
||||||
|
if _, err = r.db.WriteEvent(
|
||||||
|
r.ctx,
|
||||||
|
&headered,
|
||||||
|
[]gomatrixserverlib.HeaderedEvent{},
|
||||||
|
[]string{},
|
||||||
|
[]string{},
|
||||||
|
nil, true,
|
||||||
|
); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pdus, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *messagesReq) serverToBackfillFrom(fromEventIDs []string) (gomatrixserverlib.ServerName, error) {
|
||||||
// Query the list of servers in the room when one of the backward extremities
|
// Query the list of servers in the room when one of the backward extremities
|
||||||
// was sent.
|
// was sent.
|
||||||
var serversResponse api.QueryServersInRoomAtEventResponse
|
var serversResponse api.QueryServersInRoomAtEventResponse
|
||||||
|
@ -409,7 +412,33 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
|
||||||
EventID: fromEventIDs[0],
|
EventID: fromEventIDs[0],
|
||||||
}
|
}
|
||||||
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
|
if err := r.queryAPI.QueryServersInRoomAtEvent(r.ctx, &serversRequest, &serversResponse); err != nil {
|
||||||
return nil, err
|
util.GetLogger(r.ctx).WithError(err).Warn("Failed to query servers in room at event, falling back to event sender")
|
||||||
|
// FIXME: We shouldn't be doing this but in situations where we have already backfilled once
|
||||||
|
// the query API doesn't work as backfilled events do not make it to the room server.
|
||||||
|
// This means QueryServersInRoomAtEvent returns an error as it doesn't have the event ID in question.
|
||||||
|
// We need to inject backfilled events into the room server and store them appropriately.
|
||||||
|
events, err := r.db.Events(r.ctx, fromEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if len(events) == 0 {
|
||||||
|
// should be impossible as these event IDs are backwards extremities
|
||||||
|
return "", fmt.Errorf("backfill: missing backwards extremities, event IDs: %s", fromEventIDs)
|
||||||
|
}
|
||||||
|
// The rationale here is that the last event was unlikely to be sent by us, so poke the server who sent it.
|
||||||
|
// We shouldn't be doing this really, but as a heuristic it should work pretty well for now.
|
||||||
|
for _, e := range events {
|
||||||
|
_, srv, srverr := gomatrixserverlib.SplitID('@', e.Sender())
|
||||||
|
if srverr != nil {
|
||||||
|
util.GetLogger(r.ctx).WithError(srverr).Warn("Failed to extract domain from event sender")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if srv != r.cfg.Matrix.ServerName {
|
||||||
|
return srv, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// no valid events which have a remote server, fail.
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use the first server from the response, except if that server is us.
|
// Use the first server from the response, except if that server is us.
|
||||||
|
@ -423,45 +452,11 @@ func (r *messagesReq) backfill(fromEventIDs []string, limit int) ([]gomatrixserv
|
||||||
if len(serversResponse.Servers) > 1 {
|
if len(serversResponse.Servers) > 1 {
|
||||||
srvToBackfillFrom = serversResponse.Servers[1]
|
srvToBackfillFrom = serversResponse.Servers[1]
|
||||||
} else {
|
} else {
|
||||||
srvToBackfillFrom = gomatrixserverlib.ServerName("")
|
util.GetLogger(r.ctx).Info("Not enough servers to backfill from")
|
||||||
log.Warn("Not enough servers to backfill from")
|
return "", nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return srvToBackfillFrom, nil
|
||||||
pdus := make([]gomatrixserverlib.HeaderedEvent, 0)
|
|
||||||
|
|
||||||
// If the roomserver responded with at least one server that isn't us,
|
|
||||||
// send it a request for backfill.
|
|
||||||
if len(srvToBackfillFrom) > 0 {
|
|
||||||
txn, err := r.federation.Backfill(
|
|
||||||
r.ctx, srvToBackfillFrom, r.roomID, limit, fromEventIDs,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, p := range txn.PDUs {
|
|
||||||
pdus = append(pdus, p.Headered(gomatrixserverlib.RoomVersionV1))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store the events in the database, while marking them as unfit to show
|
|
||||||
// up in responses to sync requests.
|
|
||||||
for _, pdu := range pdus {
|
|
||||||
headered := pdu.Headered(gomatrixserverlib.RoomVersionV1)
|
|
||||||
if _, err = r.db.WriteEvent(
|
|
||||||
r.ctx,
|
|
||||||
&headered,
|
|
||||||
[]gomatrixserverlib.HeaderedEvent{},
|
|
||||||
[]string{},
|
|
||||||
[]string{},
|
|
||||||
nil, true,
|
|
||||||
); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pdus, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setToDefault returns the default value for the "to" query parameter of a
|
// setToDefault returns the default value for the "to" query parameter of a
|
||||||
|
@ -475,7 +470,8 @@ func setToDefault(
|
||||||
roomID string,
|
roomID string,
|
||||||
) (to *types.PaginationToken, err error) {
|
) (to *types.PaginationToken, err error) {
|
||||||
if backwardOrdering {
|
if backwardOrdering {
|
||||||
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 1, 0)
|
// go 1 earlier than the first event so we correctly fetch the earliest event
|
||||||
|
to = types.NewPaginationTokenFromTypeAndPosition(types.PaginationTokenTypeTopology, 0, 0)
|
||||||
} else {
|
} else {
|
||||||
var pos types.StreamPosition
|
var pos types.StreamPosition
|
||||||
pos, err = db.MaxTopologicalPosition(ctx, roomID)
|
pos, err = db.MaxTopologicalPosition(ctx, roomID)
|
||||||
|
|
|
@ -21,39 +21,53 @@ import (
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The purpose of this table is to keep track of backwards extremities for a room.
|
||||||
|
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||||
|
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||||
|
// even earlier events.
|
||||||
|
//
|
||||||
|
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||||
|
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||||
|
// A
|
||||||
|
// | Event C has 1 prev_event ID: A.
|
||||||
|
// B C
|
||||||
|
// |___| Event D has 2 prev_event IDs: B and C.
|
||||||
|
// |
|
||||||
|
// D
|
||||||
|
// The earliest known event we have is D, so this table has 2 rows.
|
||||||
|
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||||
|
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||||
|
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||||
|
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||||
|
// a backwards extremity because there are no more rows with event_id=B.
|
||||||
const backwardExtremitiesSchema = `
|
const backwardExtremitiesSchema = `
|
||||||
-- Stores output room events received from the roomserver.
|
-- Stores output room events received from the roomserver.
|
||||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||||
-- The 'room_id' key for the event.
|
-- The 'room_id' key for the event.
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
-- The event ID for the event.
|
-- The event ID for the last known event. This is the backwards extremity.
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
|
-- The prev_events for the last known event. This is used to update extremities.
|
||||||
|
prev_event_id TEXT NOT NULL,
|
||||||
|
|
||||||
PRIMARY KEY(room_id, event_id)
|
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertBackwardExtremitySQL = "" +
|
const insertBackwardExtremitySQL = "" +
|
||||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
|
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||||
" VALUES ($1, $2)" +
|
" VALUES ($1, $2, $3)" +
|
||||||
" ON CONFLICT DO NOTHING"
|
" ON CONFLICT DO NOTHING"
|
||||||
|
|
||||||
const selectBackwardExtremitiesForRoomSQL = "" +
|
const selectBackwardExtremitiesForRoomSQL = "" +
|
||||||
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||||
|
|
||||||
const isBackwardExtremitySQL = "" +
|
|
||||||
"SELECT EXISTS (" +
|
|
||||||
" SELECT TRUE FROM syncapi_backward_extremities" +
|
|
||||||
" WHERE room_id = $1 AND event_id = $2" +
|
|
||||||
")"
|
|
||||||
|
|
||||||
const deleteBackwardExtremitySQL = "" +
|
const deleteBackwardExtremitySQL = "" +
|
||||||
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND event_id = $2"
|
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||||
|
|
||||||
type backwardExtremitiesStatements struct {
|
type backwardExtremitiesStatements struct {
|
||||||
insertBackwardExtremityStmt *sql.Stmt
|
insertBackwardExtremityStmt *sql.Stmt
|
||||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||||
isBackwardExtremityStmt *sql.Stmt
|
|
||||||
deleteBackwardExtremityStmt *sql.Stmt
|
deleteBackwardExtremityStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
|
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -78,17 +89,15 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
|
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
|
||||||
ctx context.Context, roomID, eventID string,
|
ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
_, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
|
_, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
||||||
ctx context.Context, roomID string,
|
ctx context.Context, roomID string,
|
||||||
) (eventIDs []string, err error) {
|
) (eventIDs []string, err error) {
|
||||||
eventIDs = make([]string, 0)
|
|
||||||
|
|
||||||
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
|
@ -107,16 +116,9 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
||||||
return eventIDs, rows.Err()
|
return eventIDs, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) isBackwardExtremity(
|
|
||||||
ctx context.Context, roomID, eventID string,
|
|
||||||
) (isBE bool, err error) {
|
|
||||||
err = s.isBackwardExtremityStmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
|
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
|
||||||
ctx context.Context, roomID, eventID string,
|
ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
_, err = s.insertBackwardExtremityStmt.ExecContext(ctx, roomID, eventID)
|
_, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -305,7 +305,6 @@ func (s *outputRoomEventsStatements) selectRecentEvents(
|
||||||
} else {
|
} else {
|
||||||
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
|
stmt = common.TxStmt(txn, s.selectRecentEventsStmt)
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
|
rows, err := stmt.QueryContext(ctx, roomID, fromPos, toPos, limit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
|
@ -111,22 +111,17 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
|
||||||
return d.StreamEventsToEvents(nil, streamEvents), nil
|
return d.StreamEventsToEvents(nil, streamEvents), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev *gomatrixserverlib.HeaderedEvent) error {
|
// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
|
||||||
// If the event is already known as a backward extremity, don't consider
|
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
|
||||||
// it as such anymore now that we have it.
|
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
|
||||||
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, ev.RoomID(), ev.EventID())
|
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
||||||
if err != nil {
|
if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if isBackwardExtremity {
|
|
||||||
if err = d.backwardExtremities.deleteBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have all of the event's previous events. If an event is
|
// Check if we have all of the event's previous events. If an event is
|
||||||
// missing, add it to the room's backward extremities.
|
// missing, add it to the room's backward extremities.
|
||||||
prevEvents, err := d.events.selectEvents(ctx, nil, ev.PrevEventIDs())
|
prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -141,7 +136,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, ev
|
||||||
|
|
||||||
// If the event is missing, consider it a backward extremity.
|
// If the event is missing, consider it a backward extremity.
|
||||||
if !found {
|
if !found {
|
||||||
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, ev.RoomID(), ev.EventID()); err != nil {
|
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -174,7 +169,7 @@ func (d *SyncServerDatasource) WriteEvent(
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = d.handleBackwardExtremities(ctx, ev); err != nil {
|
if err = d.handleBackwardExtremities(ctx, txn, ev); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,38 +21,53 @@ import (
|
||||||
"github.com/matrix-org/dendrite/common"
|
"github.com/matrix-org/dendrite/common"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The purpose of this table is to keep track of backwards extremities for a room.
|
||||||
|
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
||||||
|
// the entire event JSON. These event IDs are used in federation requests to fetch
|
||||||
|
// even earlier events.
|
||||||
|
//
|
||||||
|
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
||||||
|
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
||||||
|
// A
|
||||||
|
// | Event C has 1 prev_event ID: A.
|
||||||
|
// B C
|
||||||
|
// |___| Event D has 2 prev_event IDs: B and C.
|
||||||
|
// |
|
||||||
|
// D
|
||||||
|
// The earliest known event we have is D, so this table has 2 rows.
|
||||||
|
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
||||||
|
// still means that D is a backwards extremity as we do not have event B. However, event
|
||||||
|
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
||||||
|
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
||||||
|
// a backwards extremity because there are no more rows with event_id=B.
|
||||||
const backwardExtremitiesSchema = `
|
const backwardExtremitiesSchema = `
|
||||||
-- Stores output room events received from the roomserver.
|
-- Stores output room events received from the roomserver.
|
||||||
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
CREATE TABLE IF NOT EXISTS syncapi_backward_extremities (
|
||||||
|
-- The 'room_id' key for the event.
|
||||||
room_id TEXT NOT NULL,
|
room_id TEXT NOT NULL,
|
||||||
|
-- The event ID for the last known event. This is the backwards extremity.
|
||||||
event_id TEXT NOT NULL,
|
event_id TEXT NOT NULL,
|
||||||
|
-- The prev_events for the last known event. This is used to update extremities.
|
||||||
|
prev_event_id TEXT NOT NULL,
|
||||||
|
|
||||||
PRIMARY KEY(room_id, event_id)
|
PRIMARY KEY(room_id, event_id, prev_event_id)
|
||||||
);
|
);
|
||||||
`
|
`
|
||||||
|
|
||||||
const insertBackwardExtremitySQL = "" +
|
const insertBackwardExtremitySQL = "" +
|
||||||
"INSERT INTO syncapi_backward_extremities (room_id, event_id)" +
|
"INSERT INTO syncapi_backward_extremities (room_id, event_id, prev_event_id)" +
|
||||||
" VALUES ($1, $2)" +
|
" VALUES ($1, $2, $3)" +
|
||||||
" ON CONFLICT (room_id, event_id) DO NOTHING"
|
" ON CONFLICT (room_id, event_id, prev_event_id) DO NOTHING"
|
||||||
|
|
||||||
const selectBackwardExtremitiesForRoomSQL = "" +
|
const selectBackwardExtremitiesForRoomSQL = "" +
|
||||||
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
"SELECT event_id FROM syncapi_backward_extremities WHERE room_id = $1"
|
||||||
|
|
||||||
const isBackwardExtremitySQL = "" +
|
|
||||||
"SELECT EXISTS (" +
|
|
||||||
" SELECT TRUE FROM syncapi_backward_extremities" +
|
|
||||||
" WHERE room_id = $1 AND event_id = $2" +
|
|
||||||
")"
|
|
||||||
|
|
||||||
const deleteBackwardExtremitySQL = "" +
|
const deleteBackwardExtremitySQL = "" +
|
||||||
"DELETE FROM syncapi_backward_extremities" +
|
"DELETE FROM syncapi_backward_extremities WHERE room_id = $1 AND prev_event_id = $2"
|
||||||
" WHERE room_id = $1 AND event_id = $2"
|
|
||||||
|
|
||||||
type backwardExtremitiesStatements struct {
|
type backwardExtremitiesStatements struct {
|
||||||
insertBackwardExtremityStmt *sql.Stmt
|
insertBackwardExtremityStmt *sql.Stmt
|
||||||
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
selectBackwardExtremitiesForRoomStmt *sql.Stmt
|
||||||
isBackwardExtremityStmt *sql.Stmt
|
|
||||||
deleteBackwardExtremityStmt *sql.Stmt
|
deleteBackwardExtremityStmt *sql.Stmt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,9 +82,6 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
|
||||||
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
|
if s.selectBackwardExtremitiesForRoomStmt, err = db.Prepare(selectBackwardExtremitiesForRoomSQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if s.isBackwardExtremityStmt, err = db.Prepare(isBackwardExtremitySQL); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
if s.deleteBackwardExtremityStmt, err = db.Prepare(deleteBackwardExtremitySQL); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -77,23 +89,20 @@ func (s *backwardExtremitiesStatements) prepare(db *sql.DB) (err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
|
func (s *backwardExtremitiesStatements) insertsBackwardExtremity(
|
||||||
ctx context.Context, txn *sql.Tx, roomID, eventID string,
|
ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
stmt := common.TxStmt(txn, s.insertBackwardExtremityStmt)
|
_, err = txn.Stmt(s.insertBackwardExtremityStmt).ExecContext(ctx, roomID, eventID, prevEventID)
|
||||||
_, err = stmt.ExecContext(ctx, roomID, eventID)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
||||||
ctx context.Context, txn *sql.Tx, roomID string,
|
ctx context.Context, roomID string,
|
||||||
) (eventIDs []string, err error) {
|
) (eventIDs []string, err error) {
|
||||||
eventIDs = make([]string, 0)
|
rows, err := s.selectBackwardExtremitiesForRoomStmt.QueryContext(ctx, roomID)
|
||||||
|
|
||||||
stmt := common.TxStmt(txn, s.selectBackwardExtremitiesForRoomStmt)
|
|
||||||
rows, err := stmt.QueryContext(ctx, roomID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
defer common.CloseAndLogIfError(ctx, rows, "selectBackwardExtremitiesForRoom: rows.close() failed")
|
||||||
|
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var eID string
|
var eID string
|
||||||
|
@ -104,21 +113,12 @@ func (s *backwardExtremitiesStatements) selectBackwardExtremitiesForRoom(
|
||||||
eventIDs = append(eventIDs, eID)
|
eventIDs = append(eventIDs, eID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return eventIDs, rows.Err()
|
||||||
}
|
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) isBackwardExtremity(
|
|
||||||
ctx context.Context, txn *sql.Tx, roomID, eventID string,
|
|
||||||
) (isBE bool, err error) {
|
|
||||||
stmt := common.TxStmt(txn, s.isBackwardExtremityStmt)
|
|
||||||
err = stmt.QueryRowContext(ctx, roomID, eventID).Scan(&isBE)
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
|
func (s *backwardExtremitiesStatements) deleteBackwardExtremity(
|
||||||
ctx context.Context, txn *sql.Tx, roomID, eventID string,
|
ctx context.Context, txn *sql.Tx, roomID, knownEventID string,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
stmt := common.TxStmt(txn, s.deleteBackwardExtremityStmt)
|
_, err = txn.Stmt(s.deleteBackwardExtremityStmt).ExecContext(ctx, roomID, knownEventID)
|
||||||
_, err = stmt.ExecContext(ctx, roomID, eventID)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -137,18 +137,13 @@ func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([
|
||||||
return d.StreamEventsToEvents(nil, streamEvents), nil
|
return d.StreamEventsToEvents(nil, streamEvents), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
|
||||||
|
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
|
||||||
|
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
|
||||||
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, txn *sql.Tx, ev *gomatrixserverlib.HeaderedEvent) error {
|
||||||
// If the event is already known as a backward extremity, don't consider
|
if err := d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
||||||
// it as such anymore now that we have it.
|
|
||||||
isBackwardExtremity, err := d.backwardExtremities.isBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if isBackwardExtremity {
|
|
||||||
if err = d.backwardExtremities.deleteBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if we have all of the event's previous events. If an event is
|
// Check if we have all of the event's previous events. If an event is
|
||||||
// missing, add it to the room's backward extremities.
|
// missing, add it to the room's backward extremities.
|
||||||
|
@ -167,7 +162,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
|
||||||
|
|
||||||
// If the event is missing, consider it a backward extremity.
|
// If the event is missing, consider it a backward extremity.
|
||||||
if !found {
|
if !found {
|
||||||
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID()); err != nil {
|
if err = d.backwardExtremities.insertsBackwardExtremity(ctx, txn, ev.RoomID(), ev.EventID(), eID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -348,7 +343,7 @@ func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.Pagi
|
||||||
func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
|
func (d *SyncServerDatasource) BackwardExtremitiesForRoom(
|
||||||
ctx context.Context, roomID string,
|
ctx context.Context, roomID string,
|
||||||
) (backwardExtremities []string, err error) {
|
) (backwardExtremities []string, err error) {
|
||||||
return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, nil, roomID)
|
return d.backwardExtremities.selectBackwardExtremitiesForRoom(ctx, roomID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// MaxTopologicalPosition returns the highest topological position for a given
|
// MaxTopologicalPosition returns the highest topological position for a given
|
||||||
|
|
|
@ -47,7 +47,6 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
var syncData *types.Response
|
var syncData *types.Response
|
||||||
|
|
||||||
// Extract values from request
|
// Extract values from request
|
||||||
logger := util.GetLogger(req.Context())
|
|
||||||
userID := device.UserID
|
userID := device.UserID
|
||||||
syncReq, err := newSyncRequest(req, *device)
|
syncReq, err := newSyncRequest(req, *device)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -56,20 +55,21 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
JSON: jsonerror.Unknown(err.Error()),
|
JSON: jsonerror.Unknown(err.Error()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.WithFields(log.Fields{
|
logger := util.GetLogger(req.Context()).WithFields(log.Fields{
|
||||||
"userID": userID,
|
"userID": userID,
|
||||||
"since": syncReq.since,
|
"since": syncReq.since,
|
||||||
"timeout": syncReq.timeout,
|
"timeout": syncReq.timeout,
|
||||||
}).Info("Incoming /sync request")
|
})
|
||||||
|
|
||||||
currPos := rp.notifier.CurrentPosition()
|
currPos := rp.notifier.CurrentPosition()
|
||||||
|
|
||||||
if shouldReturnImmediately(syncReq) {
|
if shouldReturnImmediately(syncReq) {
|
||||||
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
|
logger.WithError(err).Error("rp.currentSyncForUser failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
logger.WithField("next", syncData.NextBatch).Info("Responding immediately")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: syncData,
|
JSON: syncData,
|
||||||
|
@ -107,7 +107,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
hasTimedOut = true
|
hasTimedOut = true
|
||||||
// Or for the request to be cancelled
|
// Or for the request to be cancelled
|
||||||
case <-req.Context().Done():
|
case <-req.Context().Done():
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("request cancelled")
|
logger.WithError(err).Error("request cancelled")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,11 +118,12 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *authtype
|
||||||
|
|
||||||
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
syncData, err = rp.currentSyncForUser(*syncReq, currPos)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.GetLogger(req.Context()).WithError(err).Error("rp.currentSyncForUser failed")
|
logger.WithError(err).Error("rp.currentSyncForUser failed")
|
||||||
return jsonerror.InternalServerError()
|
return jsonerror.InternalServerError()
|
||||||
}
|
}
|
||||||
|
|
||||||
if !syncData.IsEmpty() || hasTimedOut {
|
if !syncData.IsEmpty() || hasTimedOut {
|
||||||
|
logger.WithField("next", syncData.NextBatch).WithField("timed_out", hasTimedOut).Info("Responding")
|
||||||
return util.JSONResponse{
|
return util.JSONResponse{
|
||||||
Code: http.StatusOK,
|
Code: http.StatusOK,
|
||||||
JSON: syncData,
|
JSON: syncData,
|
||||||
|
|
|
@ -220,3 +220,4 @@ Regular users can add and delete aliases when m.room.aliases is restricted
|
||||||
GET /r0/capabilities is not public
|
GET /r0/capabilities is not public
|
||||||
GET /joined_rooms lists newly-created room
|
GET /joined_rooms lists newly-created room
|
||||||
/joined_rooms returns only joined rooms
|
/joined_rooms returns only joined rooms
|
||||||
|
Message history can be paginated over federation
|
||||||
|
|
Loading…
Reference in a new issue