mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 13:22:46 +00:00
Add possibility to ignore users (#2329)
* Add ignore users * Ignore users in pushrules Add passing tests * Update sytest lists * Store ignore knowledge in the sync API * Fix copyrights Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
This commit is contained in:
parent
99ef547295
commit
60ee7eef4c
23 changed files with 307 additions and 20 deletions
|
@ -119,6 +119,15 @@ func (s *OutputClientDataConsumer) onMessage(ctx context.Context, msg *nats.Msg)
|
|||
return false
|
||||
}
|
||||
|
||||
if output.IgnoredUsers != nil {
|
||||
if err := s.db.UpdateIgnoresForUser(ctx, userID, output.IgnoredUsers); err != nil {
|
||||
log.WithError(err).WithFields(logrus.Fields{
|
||||
"user_id": userID,
|
||||
}).Errorf("Failed to update ignored users")
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
s.stream.Advance(streamPos)
|
||||
s.notifier.OnNewAccountData(userID, types.StreamingToken{AccountDataPosition: streamPos})
|
||||
|
||||
|
|
|
@ -150,6 +150,9 @@ type Database interface {
|
|||
SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error)
|
||||
|
||||
StreamToTopologicalPosition(ctx context.Context, roomID string, streamPos types.StreamPosition, backwardOrdering bool) (types.TopologyToken, error)
|
||||
|
||||
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
|
|
87
syncapi/storage/postgres/ignores_table.go
Normal file
87
syncapi/storage/postgres/ignores_table.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
const ignoresSchema = `
|
||||
-- Stores data about ignoress
|
||||
CREATE TABLE IF NOT EXISTS syncapi_ignores (
|
||||
-- The user ID whose ignore list this belongs to.
|
||||
user_id TEXT NOT NULL,
|
||||
ignores_json TEXT NOT NULL,
|
||||
PRIMARY KEY(user_id)
|
||||
);
|
||||
`
|
||||
|
||||
const selectIgnoresSQL = "" +
|
||||
"SELECT ignores_json FROM syncapi_ignores WHERE user_id = $1"
|
||||
|
||||
const upsertIgnoresSQL = "" +
|
||||
"INSERT INTO syncapi_ignores (user_id, ignores_json) VALUES ($1, $2)" +
|
||||
" ON CONFLICT (user_id) DO UPDATE set ignores_json = $2"
|
||||
|
||||
type ignoresStatements struct {
|
||||
selectIgnoresStmt *sql.Stmt
|
||||
upsertIgnoresStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
||||
_, err := db.Exec(ignoresSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &ignoresStatements{}
|
||||
if s.selectIgnoresStmt, err = db.Prepare(selectIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertIgnoresStmt, err = db.Prepare(upsertIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ignores types.IgnoredUsers
|
||||
if err = json.Unmarshal(ignoresData, &ignores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ignores, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
|
@ -90,6 +90,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ignores, err := NewPostgresIgnoresTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
presence, err := NewPostgresPresenceTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -115,6 +119,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
NotificationData: notificationData,
|
||||
Ignores: ignores,
|
||||
Presence: presence,
|
||||
}
|
||||
return &d, nil
|
||||
|
|
|
@ -48,6 +48,7 @@ type Database struct {
|
|||
Receipts tables.Receipts
|
||||
Memberships tables.Memberships
|
||||
NotificationData tables.NotificationData
|
||||
Ignores tables.Ignores
|
||||
Presence tables.Presence
|
||||
}
|
||||
|
||||
|
@ -1004,6 +1005,14 @@ func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID s
|
|||
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
|
||||
func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
|
||||
return s.Ignores.SelectIgnores(ctx, userID)
|
||||
}
|
||||
|
||||
func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
|
||||
return s.Ignores.UpsertIgnores(ctx, userID, ignores)
|
||||
}
|
||||
|
||||
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
}
|
||||
|
|
87
syncapi/storage/sqlite3/ignores_table.go
Normal file
87
syncapi/storage/sqlite3/ignores_table.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
// Copyright 2022 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
)
|
||||
|
||||
const ignoresSchema = `
|
||||
-- Stores data about ignoress
|
||||
CREATE TABLE IF NOT EXISTS syncapi_ignores (
|
||||
-- The user ID whose ignore list this belongs to.
|
||||
user_id TEXT NOT NULL,
|
||||
ignores_json TEXT NOT NULL,
|
||||
PRIMARY KEY(user_id)
|
||||
);
|
||||
`
|
||||
|
||||
const selectIgnoresSQL = "" +
|
||||
"SELECT ignores_json FROM syncapi_ignores WHERE user_id = $1"
|
||||
|
||||
const upsertIgnoresSQL = "" +
|
||||
"INSERT INTO syncapi_ignores (user_id, ignores_json) VALUES ($1, $2)" +
|
||||
" ON CONFLICT DO UPDATE set ignores_json = $2"
|
||||
|
||||
type ignoresStatements struct {
|
||||
selectIgnoresStmt *sql.Stmt
|
||||
upsertIgnoresStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteIgnoresTable(db *sql.DB) (tables.Ignores, error) {
|
||||
_, err := db.Exec(ignoresSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s := &ignoresStatements{}
|
||||
if s.selectIgnoresStmt, err = db.Prepare(selectIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertIgnoresStmt, err = db.Prepare(upsertIgnoresSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) SelectIgnores(
|
||||
ctx context.Context, userID string,
|
||||
) (*types.IgnoredUsers, error) {
|
||||
var ignoresData []byte
|
||||
err := s.selectIgnoresStmt.QueryRowContext(ctx, userID).Scan(&ignoresData)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var ignores types.IgnoredUsers
|
||||
if err = json.Unmarshal(ignoresData, &ignores); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ignores, nil
|
||||
}
|
||||
|
||||
func (s *ignoresStatements) UpsertIgnores(
|
||||
ctx context.Context, userID string, ignores *types.IgnoredUsers,
|
||||
) error {
|
||||
ignoresJSON, err := json.Marshal(ignores)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = s.upsertIgnoresStmt.ExecContext(ctx, userID, ignoresJSON)
|
||||
return err
|
||||
}
|
|
@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ignores, err := NewSqliteIgnoresTable(d.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
presence, err := NewSqlitePresenceTable(d.db, &d.streamID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -125,6 +129,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
NotificationData: notificationData,
|
||||
Ignores: ignores,
|
||||
Presence: presence,
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -183,6 +183,11 @@ type NotificationData interface {
|
|||
SelectMaxID(ctx context.Context) (int64, error)
|
||||
}
|
||||
|
||||
type Ignores interface {
|
||||
SelectIgnores(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpsertIgnores(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
UpsertPresence(ctx context.Context, txn *sql.Tx, userID string, statusMsg *string, presence types.Presence, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (pos types.StreamPosition, err error)
|
||||
GetPresenceForUser(ctx context.Context, txn *sql.Tx, userID string) (presence *types.PresenceInternal, err error)
|
||||
|
|
|
@ -54,6 +54,10 @@ func (p *InviteStreamProvider) IncrementalSync(
|
|||
}
|
||||
|
||||
for roomID, inviteEvent := range invites {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[inviteEvent.Sender()]; ok {
|
||||
continue
|
||||
}
|
||||
ir := types.NewInviteResponse(inviteEvent)
|
||||
req.Response.Rooms.Invite[roomID] = *ir
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package streams
|
|||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -25,6 +26,7 @@ type PDUStreamProvider struct {
|
|||
|
||||
tasks chan func()
|
||||
workers atomic.Int32
|
||||
userAPI userapi.UserInternalAPI
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) worker() {
|
||||
|
@ -87,6 +89,10 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
stateFilter := req.Filter.Room.State
|
||||
eventFilter := req.Filter.Room.Timeline
|
||||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
}
|
||||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
var reqMutex sync.Mutex
|
||||
var reqWaitGroup sync.WaitGroup
|
||||
|
@ -175,6 +181,10 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
return to
|
||||
}
|
||||
|
||||
if err = p.addIgnoredUsersToFilter(ctx, req, &eventFilter); err != nil {
|
||||
req.Log.WithError(err).Error("unable to update event filter with ignored users")
|
||||
}
|
||||
|
||||
newPos = from
|
||||
for _, delta := range stateDeltas {
|
||||
var pos types.StreamPosition
|
||||
|
@ -402,6 +412,23 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
|||
return jr, nil
|
||||
}
|
||||
|
||||
// addIgnoredUsersToFilter adds ignored users to the eventfilter and
|
||||
// the syncreq itself for further use in streams.
|
||||
func (p *PDUStreamProvider) addIgnoredUsersToFilter(ctx context.Context, req *types.SyncRequest, eventFilter *gomatrixserverlib.RoomEventFilter) error {
|
||||
ignores, err := p.DB.IgnoresForUser(ctx, req.Device.UserID)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
req.IgnoredUsers = *ignores
|
||||
for userID := range ignores.List {
|
||||
eventFilter.NotSenders = append(eventFilter.NotSenders, userID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeDuplicates(stateEvents, recentEvents []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
for _, recentEv := range recentEvents {
|
||||
if recentEv.StateKey() == nil {
|
||||
|
|
|
@ -54,6 +54,10 @@ func (p *ReceiptStreamProvider) IncrementalSync(
|
|||
// Group receipts by room, so we can create one ClientEvent for every room
|
||||
receiptsByRoom := make(map[string][]types.OutputReceiptEvent)
|
||||
for _, receipt := range receipts {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[receipt.UserID]; ok {
|
||||
continue
|
||||
}
|
||||
receiptsByRoom[receipt.RoomID] = append(receiptsByRoom[receipt.RoomID], receipt)
|
||||
}
|
||||
|
||||
|
|
|
@ -48,6 +48,10 @@ func (p *SendToDeviceStreamProvider) IncrementalSync(
|
|||
|
||||
// Add the updates into the sync response.
|
||||
for _, event := range events {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[event.Sender]; ok {
|
||||
continue
|
||||
}
|
||||
req.Response.ToDevice.Events = append(req.Response.ToDevice.Events, event.SendToDeviceEvent)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -40,11 +40,18 @@ func (p *TypingStreamProvider) IncrementalSync(
|
|||
if users, updated := p.EDUCache.GetTypingUsersIfUpdatedAfter(
|
||||
roomID, int64(from),
|
||||
); updated {
|
||||
typingUsers := make([]string, 0, len(users))
|
||||
for i := range users {
|
||||
// skip ignored user events
|
||||
if _, ok := req.IgnoredUsers.List[users[i]]; !ok {
|
||||
typingUsers = append(typingUsers, users[i])
|
||||
}
|
||||
}
|
||||
ev := gomatrixserverlib.ClientEvent{
|
||||
Type: gomatrixserverlib.MTyping,
|
||||
}
|
||||
ev.Content, err = json.Marshal(map[string]interface{}{
|
||||
"user_ids": users,
|
||||
"user_ids": typingUsers,
|
||||
})
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("json.Marshal failed")
|
||||
|
|
|
@ -32,6 +32,7 @@ func NewSyncStreamProviders(
|
|||
streams := &Streams{
|
||||
PDUStreamProvider: &PDUStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
userAPI: userAPI,
|
||||
},
|
||||
TypingStreamProvider: &TypingStreamProvider{
|
||||
StreamProvider: StreamProvider{DB: d},
|
||||
|
|
|
@ -21,6 +21,8 @@ type SyncRequest struct {
|
|||
|
||||
// Updated by the PDU stream.
|
||||
Rooms map[string]string
|
||||
// Updated by the PDU stream.
|
||||
IgnoredUsers IgnoredUsers
|
||||
}
|
||||
|
||||
type StreamProvider interface {
|
||||
|
|
|
@ -518,3 +518,7 @@ type OutputSendToDeviceEvent struct {
|
|||
DeviceID string `json:"device_id"`
|
||||
gomatrixserverlib.SendToDeviceEvent
|
||||
}
|
||||
|
||||
type IgnoredUsers struct {
|
||||
List map[string]interface{} `json:"ignored_users"`
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue