mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-30 21:12:45 +00:00
Some tweaks for the device list updater (#3251)
This makes the following changes: - Adds two new metrics observing the usage of the `DeviceListUpdater` workers - Makes the number of workers configurable - Adds a 30s timeout for DB requests when receiving a device list update over federation
This commit is contained in:
parent
32f7c4b166
commit
da7bca0224
18 changed files with 92 additions and 47 deletions
|
@ -17,6 +17,7 @@ package consumers
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/userapi/internal"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
@ -82,7 +83,10 @@ func (t *DeviceListUpdateConsumer) onMessage(ctx context.Context, msgs []*nats.M
|
|||
return true
|
||||
}
|
||||
|
||||
err := t.updater.Update(ctx, m)
|
||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||
defer cancel()
|
||||
|
||||
err := t.updater.Update(timeoutCtx, m)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{
|
||||
"user_id": m.UserID,
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"fmt"
|
||||
"hash/fnv"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -142,13 +143,36 @@ type KeyChangeProducer interface {
|
|||
ProduceKeyChanges(keys []api.DeviceMessage) error
|
||||
}
|
||||
|
||||
var deviceListUpdaterBackpressure = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "dendrite",
|
||||
Subsystem: "keyserver",
|
||||
Name: "worker_backpressure",
|
||||
Help: "How many device list updater requests are queued",
|
||||
},
|
||||
[]string{"worker_id"},
|
||||
)
|
||||
var deviceListUpdaterServersRetrying = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "dendrite",
|
||||
Subsystem: "keyserver",
|
||||
Name: "worker_servers_retrying",
|
||||
Help: "How many servers are queued for retry",
|
||||
},
|
||||
[]string{"worker_id"},
|
||||
)
|
||||
|
||||
// NewDeviceListUpdater creates a new updater which fetches fresh device lists when they go stale.
|
||||
func NewDeviceListUpdater(
|
||||
process *process.ProcessContext, db DeviceListUpdaterDatabase,
|
||||
api DeviceListUpdaterAPI, producer KeyChangeProducer,
|
||||
fedClient fedsenderapi.KeyserverFederationAPI, numWorkers int,
|
||||
rsAPI rsapi.KeyserverRoomserverAPI, thisServer spec.ServerName,
|
||||
enableMetrics bool,
|
||||
) *DeviceListUpdater {
|
||||
if enableMetrics {
|
||||
prometheus.MustRegister(deviceListUpdaterBackpressure, deviceListUpdaterServersRetrying)
|
||||
}
|
||||
return &DeviceListUpdater{
|
||||
process: process,
|
||||
userIDToMutex: make(map[string]*sync.Mutex),
|
||||
|
@ -173,7 +197,7 @@ func (u *DeviceListUpdater) Start() error {
|
|||
// to stop (in this transaction) until key requests can be made.
|
||||
ch := make(chan spec.ServerName, 10)
|
||||
u.workerChans[i] = ch
|
||||
go u.worker(ch)
|
||||
go u.worker(ch, i)
|
||||
}
|
||||
|
||||
staleLists, err := u.db.StaleDeviceLists(u.process.Context(), []spec.ServerName{})
|
||||
|
@ -343,6 +367,8 @@ func (u *DeviceListUpdater) notifyWorkers(userID string) {
|
|||
index := int(int64(hash.Sum32()) % int64(len(u.workerChans)))
|
||||
|
||||
ch := u.assignChannel(userID)
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Inc()
|
||||
defer deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(index)}).Dec()
|
||||
u.workerChans[index] <- remoteServer
|
||||
select {
|
||||
case <-ch:
|
||||
|
@ -372,7 +398,7 @@ func (u *DeviceListUpdater) clearChannel(userID string) {
|
|||
}
|
||||
}
|
||||
|
||||
func (u *DeviceListUpdater) worker(ch chan spec.ServerName) {
|
||||
func (u *DeviceListUpdater) worker(ch chan spec.ServerName, workerID int) {
|
||||
retries := make(map[spec.ServerName]time.Time)
|
||||
retriesMu := &sync.Mutex{}
|
||||
// restarter goroutine which will inject failed servers into ch when it is time
|
||||
|
@ -391,9 +417,12 @@ func (u *DeviceListUpdater) worker(ch chan spec.ServerName) {
|
|||
for _, srv := range serversToRetry {
|
||||
delete(retries, srv)
|
||||
}
|
||||
deviceListUpdaterServersRetrying.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Set(float64(len(retries)))
|
||||
retriesMu.Unlock()
|
||||
for _, srv := range serversToRetry {
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Inc()
|
||||
ch <- srv
|
||||
deviceListUpdaterBackpressure.With(prometheus.Labels{"worker_id": strconv.Itoa(workerID)}).Dec()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/caching"
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/gomatrixserverlib/fclient"
|
||||
|
@ -161,7 +162,7 @@ func TestUpdateHavePrevID(t *testing.T) {
|
|||
}
|
||||
ap := &mockDeviceListUpdaterAPI{}
|
||||
producer := &mockKeyChangeProducer{}
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost")
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, nil, 1, nil, "localhost", caching.DisableMetrics)
|
||||
event := gomatrixserverlib.DeviceListUpdateEvent{
|
||||
DeviceDisplayName: "Foo Bar",
|
||||
Deleted: false,
|
||||
|
@ -233,7 +234,7 @@ func TestUpdateNoPrevID(t *testing.T) {
|
|||
`)),
|
||||
}, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test")
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 2, nil, "example.test", caching.DisableMetrics)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
@ -303,7 +304,7 @@ func TestDebounce(t *testing.T) {
|
|||
close(incomingFedReq)
|
||||
return <-fedCh, nil
|
||||
})
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost")
|
||||
updater := NewDeviceListUpdater(process.NewProcessContext(), db, ap, producer, fedClient, 1, nil, "localhost", caching.DisableMetrics)
|
||||
if err := updater.Start(); err != nil {
|
||||
t.Fatalf("failed to start updater: %s", err)
|
||||
}
|
||||
|
@ -406,7 +407,7 @@ func TestDeviceListUpdater_CleanUp(t *testing.T) {
|
|||
|
||||
updater := NewDeviceListUpdater(processCtx, db, nil,
|
||||
nil, nil,
|
||||
0, rsAPI, "test")
|
||||
0, rsAPI, "test", caching.DisableMetrics)
|
||||
if err := updater.CleanUp(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ func NewInternalAPI(
|
|||
natsInstance *jetstream.NATSInstance,
|
||||
rsAPI rsapi.UserRoomserverAPI,
|
||||
fedClient fedsenderapi.KeyserverFederationAPI,
|
||||
enableMetrics bool,
|
||||
) *internal.UserInternalAPI {
|
||||
js, _ := natsInstance.Prepare(processContext, &dendriteCfg.Global.JetStream)
|
||||
appServices := dendriteCfg.Derived.ApplicationServices
|
||||
|
@ -99,7 +100,7 @@ func NewInternalAPI(
|
|||
FedClient: fedClient,
|
||||
}
|
||||
|
||||
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, 8, rsAPI, dendriteCfg.Global.ServerName) // 8 workers TODO: configurable
|
||||
updater := internal.NewDeviceListUpdater(processContext, keyDB, userAPI, keyChangeProducer, fedClient, dendriteCfg.UserAPI.WorkerCount, rsAPI, dendriteCfg.Global.ServerName, enableMetrics)
|
||||
userAPI.Updater = updater
|
||||
// Remove users which we don't share a room with anymore
|
||||
if err := updater.CleanUp(); err != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue