mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-01 13:52:46 +00:00
Send presence to joined hosts only (#2858)
Send presence events only to rooms the user is participating, not all servers we know about. Should fix #2752
This commit is contained in:
parent
eeabe892a9
commit
b13cb43785
4 changed files with 21 additions and 3 deletions
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/federationapi/queue"
|
||||
"github.com/matrix-org/dendrite/federationapi/storage"
|
||||
fedTypes "github.com/matrix-org/dendrite/federationapi/types"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
|
@ -39,6 +40,7 @@ type OutputPresenceConsumer struct {
|
|||
db storage.Database
|
||||
queues *queue.OutgoingQueues
|
||||
isLocalServerName func(gomatrixserverlib.ServerName) bool
|
||||
rsAPI roomserverAPI.FederationRoomserverAPI
|
||||
topic string
|
||||
outboundPresenceEnabled bool
|
||||
}
|
||||
|
@ -50,6 +52,7 @@ func NewOutputPresenceConsumer(
|
|||
js nats.JetStreamContext,
|
||||
queues *queue.OutgoingQueues,
|
||||
store storage.Database,
|
||||
rsAPI roomserverAPI.FederationRoomserverAPI,
|
||||
) *OutputPresenceConsumer {
|
||||
return &OutputPresenceConsumer{
|
||||
ctx: process.Context(),
|
||||
|
@ -60,6 +63,7 @@ func NewOutputPresenceConsumer(
|
|||
durable: cfg.Matrix.JetStream.Durable("FederationAPIPresenceConsumer"),
|
||||
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputPresenceEvent),
|
||||
outboundPresenceEnabled: cfg.Matrix.Presence.EnableOutbound,
|
||||
rsAPI: rsAPI,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -89,6 +93,16 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
|||
return true
|
||||
}
|
||||
|
||||
var queryRes roomserverAPI.QueryRoomsForUserResponse
|
||||
err = t.rsAPI.QueryRoomsForUser(t.ctx, &roomserverAPI.QueryRoomsForUserRequest{
|
||||
UserID: userID,
|
||||
WantMembership: "join",
|
||||
}, &queryRes)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to calculate joined rooms for user")
|
||||
return true
|
||||
}
|
||||
|
||||
presence := msg.Header.Get("presence")
|
||||
|
||||
ts, err := strconv.Atoi(msg.Header.Get("last_active_ts"))
|
||||
|
@ -96,11 +110,13 @@ func (t *OutputPresenceConsumer) onMessage(ctx context.Context, msgs []*nats.Msg
|
|||
return true
|
||||
}
|
||||
|
||||
joined, err := t.db.GetAllJoinedHosts(ctx)
|
||||
// send this presence to all servers who share rooms with this user.
|
||||
joined, err := t.db.GetJoinedHostsForRooms(t.ctx, queryRes.RoomIDs, true)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("failed to get joined hosts")
|
||||
return true
|
||||
}
|
||||
|
||||
if len(joined) == 0 {
|
||||
return true
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue