mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-30 21:12:45 +00:00
Use IsBlacklistedOrBackingOff
to determine if we should try to fetch devices (#3254)
Use `IsBlacklistedOrBackingOff` from the federation API to check if we should fetch devices. To reduce back pressure, we now only queue retrying servers if there's space in the channel.
This commit is contained in:
parent
699f5ca8c1
commit
7863a405a5
20 changed files with 212 additions and 90 deletions
|
@ -25,6 +25,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
|
@ -108,6 +109,8 @@ type DeviceListUpdater struct {
|
|||
userIDToChan map[string]chan bool
|
||||
userIDToChanMu *sync.Mutex
|
||||
rsAPI rsapi.KeyserverRoomserverAPI
|
||||
|
||||
isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error)
|
||||
}
|
||||
|
||||
// DeviceListUpdaterDatabase is the subset of functionality from storage.Database required for the updater.
|
||||
|
@ -167,25 +170,28 @@ func NewDeviceListUpdater(
|
|||
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
||||
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
||||
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
|
||||
rsAPI rsapi.KeyserverRoomserverAPI,
|
||||
thisServer spec.ServerName,
|
||||
enableMetrics bool,
|
||||
isBlacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
|
||||
) *DeviceListUpdater {
|
||||
if enableMetrics {
|
||||
prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
|
||||
}
|
||||
return &DeviceListUpdater{
|
||||
process: process,
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
mu: &sync.Mutex{},
|
||||
db: db,
|
||||
api: api,
|
||||
producer: producer,
|
||||
fedClient: fedClient,
|
||||
thisServer: thisServer,
|
||||
workerChans: make([]chan spec.ServerName, numWorkers),
|
||||
userIDToChan: make(map[string]chan bool),
|
||||
userIDToChanMu: &sync.Mutex{},
|
||||
rsAPI: rsAPI,
|
||||
process: process,
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
mu: &sync.Mutex{},
|
||||
db: db,
|
||||
api: api,
|
||||
producer: producer,
|
||||
fedClient: fedClient,
|
||||
thisServer: thisServer,
|
||||
workerChans: make([]chan spec.ServerName, numWorkers),
|
||||
userIDToChan: make(map[string]chan bool),
|
||||
userIDToChanMu: &sync.Mutex{},
|
||||
rsAPI: rsAPI,
|
||||
isBlacklistedOrBackingOffFn: isBlacklistedOrBackingOffFn,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -362,13 +368,22 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, err = u.isBlacklistedOrBackingOffFn(remoteServer)
|
||||
var federationClientError *fedsenderapi.FederationClientError
|
||||
if errors.As(err, &federationClientError) {
|
||||
if federationClientError.Blacklisted {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
hash := fnv.New32a()
|
||||
_, _ = hash.Write([]byte(remoteServer))
|
||||
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
|
||||
|
||||
ch := u.assignChannel(userID)
|
||||
// Since workerChans are buffered, we only increment here and let the worker
|
||||
// decrement it once it is done processing.
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
|
||||
defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
|
||||
u.workerChans[index] <- remoteServer
|
||||
select {
|
||||
case <-ch:
|
||||
|
@ -405,24 +420,38 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
|||
go func() {
|
||||
var serversToRetry []spec.ServerName
|
||||
for {
|
||||
serversToRetry = serversToRetry[:0] // reuse memory
|
||||
time.Sleep(time.Second)
|
||||
// nuke serversToRetry by re-slicing it to be "empty".
|
||||
// The capacity of the slice is unchanged, which ensures we can reuse the memory.
|
||||
serversToRetry = serversToRetry[:0]
|
||||
|
||||
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
// -2, so we have space for incoming device list updates over federation
|
||||
maxServers := (cap(ch) - len(ch)) - 2
|
||||
if maxServers <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
retriesMu.Lock()
|
||||
now := time.Now()
|
||||
for srv, retryAt := range retries {
|
||||
if now.After(retryAt) {
|
||||
serversToRetry = append(serversToRetry, srv)
|
||||
if maxServers == len(serversToRetry) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, srv := range serversToRetry {
|
||||
delete(retries, srv)
|
||||
}
|
||||
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
||||
retriesMu.Unlock()
|
||||
|
||||
for _, srv := range serversToRetry {
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
|
||||
ch <- srv
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -430,8 +459,18 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
|||
retriesMu.Lock()
|
||||
_, exists := retries[serverName]
|
||||
retriesMu.Unlock()
|
||||
if exists {
|
||||
// Don't retry a server that we're already waiting for.
|
||||
|
||||
// If the serverName is coming from retries, maybe it was
|
||||
// blacklisted in the meantime.
|
||||
_, err := u.isBlacklistedOrBackingOffFn(serverName)
|
||||
var federationClientError *fedsenderapi.FederationClientError
|
||||
// unwrap errors and check for FederationClientError, if found, federationClientError will be not nil
|
||||
errors.As(err, &federationClientError)
|
||||
isBlacklisted := federationClientError != nil && federationClientError.Blacklisted
|
||||
|
||||
// Don't retry a server that we're already waiting for or is blacklisted by now.
|
||||
if exists || isBlacklisted {
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
|
||||
continue
|
||||
}
|
||||
waitTime, shouldRetry := u.processServer(serverName)
|
||||
|
@ -442,6 +481,7 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
|||
}
|
||||
retriesMu.Unlock()
|
||||
}
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,6 +27,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
api2 "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
@ -129,6 +131,10 @@ type mockDeviceListUpdaterAPI struct {
|
|||
func (d *mockDeviceListUpdaterAPI) PerformUploadDeviceKeys(ctx context.Context, req *api.PerformUploadDeviceKeysRequest, res *api.PerformUploadDeviceKeysResponse) {
|
||||
}
|
||||
|
||||
var testIsBlacklistedOrBackingOff = func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||
return &statistics.ServerStatistics{}, nil
|
||||
}
|
||||
|
||||
type roundTripper struct {
|
||||
fn func(*http.Request) (*http.Response, error)
|
||||
}
|
||||
|
@ -162,7 +168,7 @@ func TestUpdateHavePrevID(t *testing.T) {
|
|||
}
|
||||
ap := &mockDeviceListUpdaterAPI{}
|
||||
producer := &mockKeyChangeProducer{}
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics)
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||
DeviceDisplayName: "Foo Bar",
|
||||
Deleted: false,
|
||||
|
@ -234,7 +240,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
|||
`)),
|
||||
}, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics)
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
@ -304,7 +310,7 @@ func TestDebounce(t *testing.T) {
|
|||
close(incomingFedReq)
|
||||
return <-fedCh, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics)
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
@ -407,7 +413,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
|
|||
|
||||
updater := NewDeviceListUpdater(processCtx, db, nil,
|
||||
nil, nil,
|
||||
0, rsAPI, "test", caching.DisableMetrics)
|
||||
0, rsAPI, "test", caching.DisableMetrics, testIsBlacklistedOrBackingOff)
|
||||
if err := updater.CleanUp(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -475,3 +481,68 @@ func Test_dedupeStateList(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeviceListUpdaterIgnoreBlacklisted(t *testing.T) {
|
||||
unreachableServer := spec.ServerName("notlocalhost")
|
||||
|
||||
updater := DeviceListUpdater{
|
||||
workerChans: make([]chan spec.ServerName, 1),
|
||||
isBlacklistedOrBackingOffFn: func(s spec.ServerName) (*statistics.ServerStatistics, error) {
|
||||
switch s {
|
||||
case unreachableServer:
|
||||
return nil, &api2.FederationClientError{Blacklisted: true}
|
||||
}
|
||||
return nil, nil
|
||||
},
|
||||
mu: &sync.Mutex{},
|
||||
userIDToChanMu: &sync.Mutex{},
|
||||
userIDToChan: make(map[string]chan bool),
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
}
|
||||
workerCh := make(chan spec.ServerName)
|
||||
defer close(workerCh)
|
||||
updater.workerChans[0] = workerCh
|
||||
|
||||
// happy case
|
||||
alice := "@alice:localhost"
|
||||
aliceCh := updater.assignChannel(alice)
|
||||
defer updater.clearChannel(alice)
|
||||
|
||||
// failing case
|
||||
bob := "@bob:" + unreachableServer
|
||||
bobCh := updater.assignChannel(string(bob))
|
||||
defer updater.clearChannel(string(bob))
|
||||
|
||||
expectedServers := map[spec.ServerName]struct{}{
|
||||
"localhost": {},
|
||||
}
|
||||
unexpectedServers := make(map[spec.ServerName]struct{})
|
||||
|
||||
go func() {
|
||||
for serverName := range workerCh {
|
||||
switch serverName {
|
||||
case "localhost":
|
||||
delete(expectedServers, serverName)
|
||||
aliceCh <- true // unblock notifyWorkers
|
||||
case unreachableServer: // this should not happen as it is "filtered" away by the blacklist
|
||||
unexpectedServers[serverName] = struct{}{}
|
||||
bobCh <- true
|
||||
default:
|
||||
unexpectedServers[serverName] = struct{}{}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// alice is not blacklisted
|
||||
updater.notifyWorkers(alice)
|
||||
// bob is blacklisted
|
||||
updater.notifyWorkers(string(bob))
|
||||
|
||||
for server := range expectedServers {
|
||||
t.Errorf("Server still in expectedServers map: %s", server)
|
||||
}
|
||||
|
||||
for server := range unexpectedServers {
|
||||
t.Errorf("unexpected server in result: %s", server)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,10 +18,12 @@ import (
|
|||
"time"
|
||||
|
||||
fedsenderapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/federationapi/statistics"
|
||||
"github.com/matrix-org/dendrite/internal/pushgateway"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/gomatrixserverlib/spec"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
rsapi "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
@ -47,6 +49,7 @@ func NewInternalAPI(
|
|||
rsAPI rsapi.UserRoomserverAPI,
|
||||
fedClient fedsenderapi.KeyserverFederationAPI,
|
||||
enableMetrics bool,
|
||||
blacklistedOrBackingOffFn func(s spec.ServerName) (*statistics.ServerStatistics, error),
|
||||
) *internal.UserInternalAPI {
|
||||
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
|
||||
appServices := dendriteCfg.Derived.ApplicationServices
|
||||
|
@ -100,7 +103,7 @@ func NewInternalAPI(
|
|||
FedClient: fedClient,
|
||||
}
|
||||
|
||||
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics)
|
||||
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics, blacklistedOrBackingOffFn)
|
||||
userAPI.Updater = updater
|
||||
// Remove users which we don't share a room with anymore
|
||||
if err := updater.CleanUp(); err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue