mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 05:12:46 +00:00
Use sync API database in filterSharedUsers
(#2572)
* Add function to the sync API storage package for filtering shared users * Use the database instead of asking the RS API * Fix unit tests * Fix map handling in `filterSharedUsers`
This commit is contained in:
parent
69c86295f7
commit
90bf01d8b1
9 changed files with 118 additions and 27 deletions
|
@ -27,6 +27,8 @@ import (
|
|||
|
||||
type Database interface {
|
||||
Presence
|
||||
SharedUsers
|
||||
|
||||
MaxStreamPositionForPDUs(ctx context.Context) (types.StreamPosition, error)
|
||||
MaxStreamPositionForReceipts(ctx context.Context) (types.StreamPosition, error)
|
||||
MaxStreamPositionForInvites(ctx context.Context) (types.StreamPosition, error)
|
||||
|
@ -165,3 +167,8 @@ type Presence interface {
|
|||
PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error)
|
||||
MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error)
|
||||
}
|
||||
|
||||
type SharedUsers interface {
|
||||
// SharedUsers returns a subset of otherUserIDs that share a room with userID.
|
||||
SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error)
|
||||
}
|
||||
|
|
|
@ -107,6 +107,11 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
||||
" FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
|
||||
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND state_key = ANY($2) AND membership='join';"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
upsertRoomStateStmt *sql.Stmt
|
||||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
|
@ -118,6 +123,7 @@ type currentRoomStateStatements struct {
|
|||
selectJoinedUsersInRoomStmt *sql.Stmt
|
||||
selectEventsWithEventIDsStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
selectSharedUsersStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
||||
|
@ -156,6 +162,9 @@ func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, erro
|
|||
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectSharedUsersStmt, err = db.Prepare(selectSharedUsersSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
@ -379,3 +388,24 @@ func (s *currentRoomStateStatements) SelectStateEvent(
|
|||
}
|
||||
return &ev, err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectSharedUsers(
|
||||
ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string,
|
||||
) ([]string, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectSharedUsersStmt)
|
||||
rows, err := stmt.QueryContext(ctx, userID, otherUserIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed")
|
||||
|
||||
var stateKey string
|
||||
result := make([]string, 0, len(otherUserIDs))
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&stateKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, stateKey)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
@ -176,6 +176,10 @@ func (d *Database) AllPeekingDevicesInRooms(ctx context.Context) (map[string][]t
|
|||
return d.Peeks.SelectPeekingDevices(ctx)
|
||||
}
|
||||
|
||||
func (d *Database) SharedUsers(ctx context.Context, userID string, otherUserIDs []string) ([]string, error) {
|
||||
return d.CurrentRoomState.SelectSharedUsers(ctx, nil, userID, otherUserIDs)
|
||||
}
|
||||
|
||||
func (d *Database) GetStateEvent(
|
||||
ctx context.Context, roomID, evType, stateKey string,
|
||||
) (*gomatrixserverlib.HeaderedEvent, error) {
|
||||
|
|
|
@ -91,6 +91,11 @@ const selectEventsWithEventIDsSQL = "" +
|
|||
"SELECT event_id, added_at, headered_event_json, 0 AS session_id, false AS exclude_from_sync, '' AS transaction_id" +
|
||||
" FROM syncapi_current_room_state WHERE event_id IN ($1)"
|
||||
|
||||
const selectSharedUsersSQL = "" +
|
||||
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
|
||||
" SELECT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
||||
") AND state_key IN ($2) AND membership='join';"
|
||||
|
||||
type currentRoomStateStatements struct {
|
||||
db *sql.DB
|
||||
streamIDStatements *StreamIDStatements
|
||||
|
@ -100,8 +105,9 @@ type currentRoomStateStatements struct {
|
|||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
|
||||
selectJoinedUsersStmt *sql.Stmt
|
||||
//selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic
|
||||
//selectJoinedUsersInRoomStmt *sql.Stmt - prepared at runtime due to variadic
|
||||
selectStateEventStmt *sql.Stmt
|
||||
//selectSharedUsersSQL *sql.Stmt - prepared at runtime due to variadic
|
||||
}
|
||||
|
||||
func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *StreamIDStatements) (tables.CurrentRoomState, error) {
|
||||
|
@ -396,3 +402,29 @@ func (s *currentRoomStateStatements) SelectStateEvent(
|
|||
}
|
||||
return &ev, err
|
||||
}
|
||||
|
||||
func (s *currentRoomStateStatements) SelectSharedUsers(
|
||||
ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string,
|
||||
) ([]string, error) {
|
||||
query := strings.Replace(selectSharedUsersSQL, "($2)", sqlutil.QueryVariadicOffset(len(otherUserIDs), 1), 1)
|
||||
stmt, err := s.db.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("SelectSharedUsers s.db.Prepare: %w", err)
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, stmt, "SelectSharedUsers: stmt.close() failed")
|
||||
rows, err := sqlutil.TxStmt(txn, stmt).QueryContext(ctx, userID, otherUserIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed")
|
||||
|
||||
var stateKey string
|
||||
result := make([]string, 0, len(otherUserIDs))
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&stateKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result = append(result, stateKey)
|
||||
}
|
||||
return result, rows.Err()
|
||||
}
|
||||
|
|
|
@ -104,6 +104,8 @@ type CurrentRoomState interface {
|
|||
SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
|
||||
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
|
||||
SelectJoinedUsersInRoom(ctx context.Context, roomIDs []string) (map[string][]string, error)
|
||||
// SelectSharedUsers returns a subset of otherUserIDs that share a room with userID.
|
||||
SelectSharedUsers(ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string) ([]string, error)
|
||||
}
|
||||
|
||||
// BackwardsExtremities keeps track of backwards extremities for a room.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue