2020-01-03 14:07:05 +00:00
|
|
|
// Copyright 2017-2018 New Vector Ltd
|
|
|
|
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
|
2017-04-20 22:40:52 +00:00
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2020-01-03 14:07:05 +00:00
|
|
|
package postgres
|
2017-04-05 09:30:13 +00:00
|
|
|
|
|
|
|
import (
|
2017-09-18 15:52:22 +00:00
|
|
|
"context"
|
2017-04-05 09:30:13 +00:00
|
|
|
"database/sql"
|
2019-08-07 10:12:09 +00:00
|
|
|
"encoding/json"
|
2023-01-12 09:06:03 +00:00
|
|
|
"errors"
|
2017-08-07 10:51:46 +00:00
|
|
|
|
2017-06-07 15:35:41 +00:00
|
|
|
"github.com/lib/pq"
|
2020-05-21 13:40:13 +00:00
|
|
|
"github.com/matrix-org/dendrite/internal"
|
2020-06-12 13:55:57 +00:00
|
|
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
2023-04-27 11:54:20 +00:00
|
|
|
rstypes "github.com/matrix-org/dendrite/roomserver/types"
|
2022-07-25 09:39:22 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
2020-05-14 15:11:37 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
2023-04-04 17:16:53 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/synctypes"
|
2020-01-23 17:51:10 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
2017-04-05 09:30:13 +00:00
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
2023-04-19 14:50:33 +00:00
|
|
|
"github.com/matrix-org/gomatrixserverlib/spec"
|
2017-04-05 09:30:13 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
const currentRoomStateSchema = `
|
|
|
|
-- Stores the current room state for every room.
|
2017-08-07 10:51:46 +00:00
|
|
|
CREATE TABLE IF NOT EXISTS syncapi_current_room_state (
|
2017-04-05 09:30:13 +00:00
|
|
|
-- The 'room_id' key for the state event.
|
|
|
|
room_id TEXT NOT NULL,
|
|
|
|
-- The state event ID
|
|
|
|
event_id TEXT NOT NULL,
|
|
|
|
-- The state event type e.g 'm.room.member'
|
|
|
|
type TEXT NOT NULL,
|
2019-08-07 10:12:09 +00:00
|
|
|
-- The 'sender' property of the event.
|
|
|
|
sender TEXT NOT NULL,
|
|
|
|
-- true if the event content contains a url key
|
|
|
|
contains_url BOOL NOT NULL,
|
2017-04-05 09:30:13 +00:00
|
|
|
-- The state_key value for this state event e.g ''
|
|
|
|
state_key TEXT NOT NULL,
|
|
|
|
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
|
2020-03-19 12:07:01 +00:00
|
|
|
headered_event_json TEXT NOT NULL,
|
2017-04-05 09:30:13 +00:00
|
|
|
-- The 'content.membership' value if this event is an m.room.member event. For other
|
|
|
|
-- events, this will be NULL.
|
|
|
|
membership TEXT,
|
2017-06-07 15:35:41 +00:00
|
|
|
-- The serial ID of the output_room_events table when this event became
|
|
|
|
-- part of the current state of the room.
|
|
|
|
added_at BIGINT,
|
2022-07-18 12:46:15 +00:00
|
|
|
history_visibility SMALLINT NOT NULL DEFAULT 2,
|
2017-04-05 09:30:13 +00:00
|
|
|
-- Clobber based on 3-uple of room_id, type and state_key
|
2017-08-07 10:51:46 +00:00
|
|
|
CONSTRAINT syncapi_room_state_unique UNIQUE (room_id, type, state_key)
|
2017-04-05 09:30:13 +00:00
|
|
|
);
|
|
|
|
-- for event deletion
|
2019-08-07 10:12:09 +00:00
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_current_room_state(event_id, room_id, type, sender, contains_url);
|
2017-04-13 15:56:46 +00:00
|
|
|
-- for querying membership states of users
|
2017-08-07 10:51:46 +00:00
|
|
|
CREATE INDEX IF NOT EXISTS syncapi_membership_idx ON syncapi_current_room_state(type, state_key, membership) WHERE membership IS NOT NULL AND membership != 'leave';
|
2020-12-16 18:16:39 +00:00
|
|
|
-- for querying state by event IDs
|
|
|
|
CREATE UNIQUE INDEX IF NOT EXISTS syncapi_current_room_state_eventid_idx ON syncapi_current_room_state(event_id);
|
Improve selectRoomIDsWithAnyMembershipSQL performance (#2738)
Recently I have observed that dendrite spends a lot of time (~390s) in
`selectRoomIDsWithAnyMembershipSQL` query
```
dendrite_syncapi=# select total_exec_time, left(query,100) from pg_stat_statements order by total_exec_time desc limit 5 ;
total_exec_time | left
--------------------+------------------------------------------------------------------------------------------------------
747826.5800519128 | SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_vis
389130.5490339942 | SELECT DISTINCT room_id, membership FROM syncapi_current_room_state WHERE type = $2 AND state_key =
376104.17514700035 | SELECT psd.datname, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_
363644.164092031 | SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events WHERE event_nid = ANY($
58570.48104699995 | SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND ( $2::te
(5 rows)
```
Explain analyze showed correct usage of `syncapi_room_state_unique`
index:
```
dendrite_syncapi=#
explain analyze SELECT distinct room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = '@qjfl:dendrite.stg.globekeeper.com';
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique (cost=2749.38..2749.56 rows=24 width=52) (actual time=2.933..2.956 rows=65 loops=1)
-> Sort (cost=2749.38..2749.44 rows=24 width=52) (actual time=2.932..2.937 rows=65 loops=1)
Sort Key: room_id, membership
Sort Method: quicksort Memory: 34kB
-> Index Scan using syncapi_room_state_unique on syncapi_current_room_state (cost=0.41..2748.83 rows=24 width=52) (actual time=0.030..2.890 rows=65 loops=1)
Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Planning Time: 0.140 ms
Execution Time: 2.990 ms
(8 rows)
```
Multi-column indexes in Postgres shall perform well for leftmost
columns, but I gave it a try and created
`syncapi_current_room_state_type_state_key_idx` index. I could observe
significant performance improvement. Execution time dropped from 2.9 ms
to 0.24 ms:
```
explain analyze SELECT distinct room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = '@qjfl:dendrite.stg.globekeeper.com';
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique (cost=96.46..96.64 rows=24 width=52) (actual time=0.199..0.218 rows=65 loops=1)
-> Sort (cost=96.46..96.52 rows=24 width=52) (actual time=0.199..0.202 rows=65 loops=1)
Sort Key: room_id, membership
Sort Method: quicksort Memory: 34kB
-> Bitmap Heap Scan on syncapi_current_room_state (cost=4.53..95.91 rows=24 width=52) (actual time=0.048..0.139 rows=65 loops=1)
Recheck Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Heap Blocks: exact=59
-> Bitmap Index Scan on syncapi_current_room_state_type_state_key_idx (cost=0.00..4.53 rows=24 width=0) (actual time=0.037..0.037 rows=65 loops=1)
Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Planning Time: 0.236 ms
Execution Time: 0.242 ms
(11 rows)
```
Next improvement is skipping DISTINCT and rely on map assignment in
`SelectRoomIDsWithAnyMembership`. Execution time drops by almost half:
```
explain analyze SELECT room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = '@qjfl:dendrite.stg.globekeeper.com';
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on syncapi_current_room_state (cost=4.53..95.91 rows=24 width=52) (actual time=0.032..0.113 rows=65 loops=1)
Recheck Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Heap Blocks: exact=59
-> Bitmap Index Scan on syncapi_current_room_state_type_state_key_idx (cost=0.00..4.53 rows=24 width=0) (actual time=0.021..0.021 rows=65 loops=1)
Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Planning Time: 0.087 ms
Execution Time: 0.136 ms
(7 rows)
```
In our env we spend only 1s on inserting to table, so the write penalty
of creating an index should be small.
```
dendrite_syncapi=# select total_exec_time, left(query,100) from pg_stat_statements where query like '%INSERT%syncapi_current_room_state%' order by total_exec_time desc;
total_exec_time | left
--------------------+------------------------------------------------------------------------------------------------------
1139.9057619999971 | INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, he
(1 row)
```
This PR does not require test modifications.
### Pull Request Checklist
<!-- Please read docs/CONTRIBUTING.md before submitting your pull
request -->
* [x] I have added added tests for PR _or_ I have justified why this PR
doesn't need tests.
* [x] Pull request includes a [sign
off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
Signed-off-by: `Piotr Kozimor <p1996k@gmail.com>`
2022-09-27 08:41:36 +00:00
|
|
|
-- for improving selectRoomIDsWithAnyMembershipSQL
|
|
|
|
CREATE INDEX IF NOT EXISTS syncapi_current_room_state_type_state_key_idx ON syncapi_current_room_state(type, state_key);
|
2017-04-05 09:30:13 +00:00
|
|
|
`
|
|
|
|
|
|
|
|
const upsertRoomStateSQL = "" +
|
2022-07-18 12:46:15 +00:00
|
|
|
"INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, headered_event_json, membership, added_at, history_visibility)" +
|
|
|
|
" VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)" +
|
2017-08-07 10:51:46 +00:00
|
|
|
" ON CONFLICT ON CONSTRAINT syncapi_room_state_unique" +
|
2020-03-19 12:07:01 +00:00
|
|
|
" DO UPDATE SET event_id = $2, sender=$4, contains_url=$5, headered_event_json = $7, membership = $8, added_at = $9"
|
2017-04-05 09:30:13 +00:00
|
|
|
|
|
|
|
const deleteRoomStateByEventIDSQL = "" +
|
2017-08-07 10:51:46 +00:00
|
|
|
"DELETE FROM syncapi_current_room_state WHERE event_id = $1"
|
2017-04-05 09:30:13 +00:00
|
|
|
|
2022-03-16 10:25:50 +00:00
|
|
|
const deleteRoomStateForRoomSQL = "" +
|
|
|
|
"DELETE FROM syncapi_current_room_state WHERE room_id = $1"
|
2020-09-15 10:17:46 +00:00
|
|
|
|
2017-04-13 15:56:46 +00:00
|
|
|
const selectRoomIDsWithMembershipSQL = "" +
|
2020-08-21 08:57:52 +00:00
|
|
|
"SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
2017-04-13 15:56:46 +00:00
|
|
|
|
2022-03-11 12:48:45 +00:00
|
|
|
const selectRoomIDsWithAnyMembershipSQL = "" +
|
Improve selectRoomIDsWithAnyMembershipSQL performance (#2738)
Recently I have observed that dendrite spends a lot of time (~390s) in
`selectRoomIDsWithAnyMembershipSQL` query
```
dendrite_syncapi=# select total_exec_time, left(query,100) from pg_stat_statements order by total_exec_time desc limit 5 ;
total_exec_time | left
--------------------+------------------------------------------------------------------------------------------------------
747826.5800519128 | SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id, history_vis
389130.5490339942 | SELECT DISTINCT room_id, membership FROM syncapi_current_room_state WHERE type = $2 AND state_key =
376104.17514700035 | SELECT psd.datname, xact_commit, xact_rollback, blks_read, blks_hit, tup_returned, tup_fetched, tup_
363644.164092031 | SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events WHERE event_nid = ANY($
58570.48104699995 | SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND ( $2::te
(5 rows)
```
Explain analyze showed correct usage of `syncapi_room_state_unique`
index:
```
dendrite_syncapi=#
explain analyze SELECT distinct room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = '@qjfl:dendrite.stg.globekeeper.com';
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique (cost=2749.38..2749.56 rows=24 width=52) (actual time=2.933..2.956 rows=65 loops=1)
-> Sort (cost=2749.38..2749.44 rows=24 width=52) (actual time=2.932..2.937 rows=65 loops=1)
Sort Key: room_id, membership
Sort Method: quicksort Memory: 34kB
-> Index Scan using syncapi_room_state_unique on syncapi_current_room_state (cost=0.41..2748.83 rows=24 width=52) (actual time=0.030..2.890 rows=65 loops=1)
Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Planning Time: 0.140 ms
Execution Time: 2.990 ms
(8 rows)
```
Multi-column indexes in Postgres shall perform well for leftmost
columns, but I gave it a try and created
`syncapi_current_room_state_type_state_key_idx` index. I could observe
significant performance improvement. Execution time dropped from 2.9 ms
to 0.24 ms:
```
explain analyze SELECT distinct room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = '@qjfl:dendrite.stg.globekeeper.com';
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Unique (cost=96.46..96.64 rows=24 width=52) (actual time=0.199..0.218 rows=65 loops=1)
-> Sort (cost=96.46..96.52 rows=24 width=52) (actual time=0.199..0.202 rows=65 loops=1)
Sort Key: room_id, membership
Sort Method: quicksort Memory: 34kB
-> Bitmap Heap Scan on syncapi_current_room_state (cost=4.53..95.91 rows=24 width=52) (actual time=0.048..0.139 rows=65 loops=1)
Recheck Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Heap Blocks: exact=59
-> Bitmap Index Scan on syncapi_current_room_state_type_state_key_idx (cost=0.00..4.53 rows=24 width=0) (actual time=0.037..0.037 rows=65 loops=1)
Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Planning Time: 0.236 ms
Execution Time: 0.242 ms
(11 rows)
```
Next improvement is skipping DISTINCT and rely on map assignment in
`SelectRoomIDsWithAnyMembership`. Execution time drops by almost half:
```
explain analyze SELECT room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = '@qjfl:dendrite.stg.globekeeper.com';
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------
Bitmap Heap Scan on syncapi_current_room_state (cost=4.53..95.91 rows=24 width=52) (actual time=0.032..0.113 rows=65 loops=1)
Recheck Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Heap Blocks: exact=59
-> Bitmap Index Scan on syncapi_current_room_state_type_state_key_idx (cost=0.00..4.53 rows=24 width=0) (actual time=0.021..0.021 rows=65 loops=1)
Index Cond: ((type = 'm.room.member'::text) AND (state_key = '@qjfl:dendrite.stg.globekeeper.com'::text))
Planning Time: 0.087 ms
Execution Time: 0.136 ms
(7 rows)
```
In our env we spend only 1s on inserting to table, so the write penalty
of creating an index should be small.
```
dendrite_syncapi=# select total_exec_time, left(query,100) from pg_stat_statements where query like '%INSERT%syncapi_current_room_state%' order by total_exec_time desc;
total_exec_time | left
--------------------+------------------------------------------------------------------------------------------------------
1139.9057619999971 | INSERT INTO syncapi_current_room_state (room_id, event_id, type, sender, contains_url, state_key, he
(1 row)
```
This PR does not require test modifications.
### Pull Request Checklist
<!-- Please read docs/CONTRIBUTING.md before submitting your pull
request -->
* [x] I have added added tests for PR _or_ I have justified why this PR
doesn't need tests.
* [x] Pull request includes a [sign
off](https://github.com/matrix-org/dendrite/blob/main/docs/CONTRIBUTING.md#sign-off)
Signed-off-by: `Piotr Kozimor <p1996k@gmail.com>`
2022-09-27 08:41:36 +00:00
|
|
|
"SELECT room_id, membership FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1"
|
2022-03-11 12:48:45 +00:00
|
|
|
|
2017-04-13 15:56:46 +00:00
|
|
|
const selectCurrentStateSQL = "" +
|
2020-12-09 18:07:17 +00:00
|
|
|
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" +
|
2019-08-07 10:12:09 +00:00
|
|
|
" AND ( $2::text[] IS NULL OR sender = ANY($2) )" +
|
|
|
|
" AND ( $3::text[] IS NULL OR NOT(sender = ANY($3)) )" +
|
|
|
|
" AND ( $4::text[] IS NULL OR type LIKE ANY($4) )" +
|
|
|
|
" AND ( $5::text[] IS NULL OR NOT(type LIKE ANY($5)) )" +
|
|
|
|
" AND ( $6::bool IS NULL OR contains_url = $6 )" +
|
2022-11-02 09:34:19 +00:00
|
|
|
" AND (event_id = ANY($7)) IS NOT TRUE"
|
2017-04-13 15:56:46 +00:00
|
|
|
|
2017-05-17 14:38:24 +00:00
|
|
|
const selectJoinedUsersSQL = "" +
|
2017-08-07 10:51:46 +00:00
|
|
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
2017-05-17 14:38:24 +00:00
|
|
|
|
2022-04-28 16:53:28 +00:00
|
|
|
const selectJoinedUsersInRoomSQL = "" +
|
|
|
|
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join' AND room_id = ANY($1)"
|
|
|
|
|
2017-07-25 15:10:59 +00:00
|
|
|
const selectStateEventSQL = "" +
|
2020-03-19 12:07:01 +00:00
|
|
|
"SELECT headered_event_json FROM syncapi_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
|
2017-07-25 15:10:59 +00:00
|
|
|
|
2017-06-07 15:35:41 +00:00
|
|
|
const selectEventsWithEventIDsSQL = "" +
|
2022-10-03 10:57:21 +00:00
|
|
|
"SELECT event_id, added_at, headered_event_json, history_visibility FROM syncapi_current_room_state WHERE event_id = ANY($1)"
|
2017-06-07 15:35:41 +00:00
|
|
|
|
2022-07-15 15:25:26 +00:00
|
|
|
const selectSharedUsersSQL = "" +
|
|
|
|
"SELECT state_key FROM syncapi_current_room_state WHERE room_id = ANY(" +
|
2022-09-09 12:50:50 +00:00
|
|
|
" SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE state_key = $1 AND membership='join'" +
|
2022-09-09 13:18:45 +00:00
|
|
|
") AND type = 'm.room.member' AND state_key = ANY($2) AND membership IN ('join', 'invite');"
|
2022-07-15 15:25:26 +00:00
|
|
|
|
2023-01-12 09:06:03 +00:00
|
|
|
const selectMembershipCount = `SELECT count(*) FROM syncapi_current_room_state WHERE type = 'm.room.member' AND room_id = $1 AND membership = $2`
|
|
|
|
|
|
|
|
const selectRoomHeroes = `
|
|
|
|
SELECT state_key FROM syncapi_current_room_state
|
|
|
|
WHERE type = 'm.room.member' AND room_id = $1 AND membership = ANY($2) AND state_key != $3
|
|
|
|
ORDER BY added_at, state_key
|
|
|
|
LIMIT 5
|
|
|
|
`
|
|
|
|
|
2017-04-05 09:30:13 +00:00
|
|
|
type currentRoomStateStatements struct {
|
2022-03-11 12:48:45 +00:00
|
|
|
upsertRoomStateStmt *sql.Stmt
|
|
|
|
deleteRoomStateByEventIDStmt *sql.Stmt
|
2022-03-16 10:25:50 +00:00
|
|
|
deleteRoomStateForRoomStmt *sql.Stmt
|
2022-03-11 12:48:45 +00:00
|
|
|
selectRoomIDsWithMembershipStmt *sql.Stmt
|
|
|
|
selectRoomIDsWithAnyMembershipStmt *sql.Stmt
|
|
|
|
selectCurrentStateStmt *sql.Stmt
|
|
|
|
selectJoinedUsersStmt *sql.Stmt
|
2022-04-28 16:53:28 +00:00
|
|
|
selectJoinedUsersInRoomStmt *sql.Stmt
|
2022-03-11 12:48:45 +00:00
|
|
|
selectEventsWithEventIDsStmt *sql.Stmt
|
|
|
|
selectStateEventStmt *sql.Stmt
|
2022-07-15 15:25:26 +00:00
|
|
|
selectSharedUsersStmt *sql.Stmt
|
2023-01-12 09:06:03 +00:00
|
|
|
selectMembershipCountStmt *sql.Stmt
|
|
|
|
selectRoomHeroesStmt *sql.Stmt
|
2017-04-05 09:30:13 +00:00
|
|
|
}
|
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
|
|
|
|
s := ¤tRoomStateStatements{}
|
|
|
|
_, err := db.Exec(currentRoomStateSchema)
|
2017-04-05 09:30:13 +00:00
|
|
|
if err != nil {
|
2020-05-14 15:11:37 +00:00
|
|
|
return nil, err
|
2017-04-05 09:30:13 +00:00
|
|
|
}
|
2022-07-25 09:39:22 +00:00
|
|
|
|
|
|
|
m := sqlutil.NewMigrator(db)
|
|
|
|
m.AddMigrations(sqlutil.Migration{
|
|
|
|
Version: "syncapi: add history visibility column (current_room_state)",
|
|
|
|
Up: deltas.UpAddHistoryVisibilityColumnCurrentRoomState,
|
|
|
|
})
|
|
|
|
err = m.Up(context.Background())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2023-01-12 09:06:03 +00:00
|
|
|
return s, sqlutil.StatementList{
|
|
|
|
{&s.upsertRoomStateStmt, upsertRoomStateSQL},
|
|
|
|
{&s.deleteRoomStateByEventIDStmt, deleteRoomStateByEventIDSQL},
|
|
|
|
{&s.deleteRoomStateForRoomStmt, deleteRoomStateForRoomSQL},
|
|
|
|
{&s.selectRoomIDsWithMembershipStmt, selectRoomIDsWithMembershipSQL},
|
|
|
|
{&s.selectRoomIDsWithAnyMembershipStmt, selectRoomIDsWithAnyMembershipSQL},
|
|
|
|
{&s.selectCurrentStateStmt, selectCurrentStateSQL},
|
|
|
|
{&s.selectJoinedUsersStmt, selectJoinedUsersSQL},
|
|
|
|
{&s.selectJoinedUsersInRoomStmt, selectJoinedUsersInRoomSQL},
|
|
|
|
{&s.selectEventsWithEventIDsStmt, selectEventsWithEventIDsSQL},
|
|
|
|
{&s.selectStateEventStmt, selectStateEventSQL},
|
|
|
|
{&s.selectSharedUsersStmt, selectSharedUsersSQL},
|
|
|
|
{&s.selectMembershipCountStmt, selectMembershipCount},
|
|
|
|
{&s.selectRoomHeroesStmt, selectRoomHeroes},
|
|
|
|
}.Prepare(db)
|
2017-04-05 09:30:13 +00:00
|
|
|
}
|
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
|
|
|
|
func (s *currentRoomStateStatements) SelectJoinedUsers(
|
2022-09-28 09:18:03 +00:00
|
|
|
ctx context.Context, txn *sql.Tx,
|
2017-09-18 15:52:22 +00:00
|
|
|
) (map[string][]string, error) {
|
2022-09-28 09:18:03 +00:00
|
|
|
rows, err := sqlutil.TxStmt(txn, s.selectJoinedUsersStmt).QueryContext(ctx)
|
2017-05-17 14:38:24 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
2017-05-17 14:38:24 +00:00
|
|
|
|
|
|
|
result := make(map[string][]string)
|
2022-04-28 16:53:28 +00:00
|
|
|
var roomID string
|
|
|
|
var userID string
|
|
|
|
for rows.Next() {
|
|
|
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
users := result[roomID]
|
|
|
|
users = append(users, userID)
|
|
|
|
result[roomID] = users
|
|
|
|
}
|
|
|
|
return result, rows.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
// SelectJoinedUsersInRoom returns a map of room ID to a list of joined user IDs for a given room.
|
|
|
|
func (s *currentRoomStateStatements) SelectJoinedUsersInRoom(
|
2022-09-28 09:18:03 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, roomIDs []string,
|
2022-04-28 16:53:28 +00:00
|
|
|
) (map[string][]string, error) {
|
2022-09-28 09:18:03 +00:00
|
|
|
rows, err := sqlutil.TxStmt(txn, s.selectJoinedUsersInRoomStmt).QueryContext(ctx, pq.StringArray(roomIDs))
|
2022-04-28 16:53:28 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsers: rows.close() failed")
|
|
|
|
|
|
|
|
result := make(map[string][]string)
|
|
|
|
var userID, roomID string
|
2017-05-17 14:38:24 +00:00
|
|
|
for rows.Next() {
|
|
|
|
if err := rows.Scan(&roomID, &userID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
users := result[roomID]
|
|
|
|
users = append(users, userID)
|
|
|
|
result[roomID] = users
|
|
|
|
}
|
2020-02-11 14:12:21 +00:00
|
|
|
return result, rows.Err()
|
2017-05-17 14:38:24 +00:00
|
|
|
}
|
|
|
|
|
2017-04-13 15:56:46 +00:00
|
|
|
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
2020-05-14 15:11:37 +00:00
|
|
|
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
2017-09-20 14:36:41 +00:00
|
|
|
ctx context.Context,
|
|
|
|
txn *sql.Tx,
|
|
|
|
userID string,
|
|
|
|
membership string, // nolint: unparam
|
2017-09-18 15:52:22 +00:00
|
|
|
) ([]string, error) {
|
2020-06-12 13:55:57 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
|
2017-09-18 15:52:22 +00:00
|
|
|
rows, err := stmt.QueryContext(ctx, userID, membership)
|
2017-04-13 15:56:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
|
2017-04-13 15:56:46 +00:00
|
|
|
|
|
|
|
var result []string
|
|
|
|
for rows.Next() {
|
|
|
|
var roomID string
|
|
|
|
if err := rows.Scan(&roomID); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result = append(result, roomID)
|
|
|
|
}
|
2020-02-11 14:12:21 +00:00
|
|
|
return result, rows.Err()
|
2017-04-13 15:56:46 +00:00
|
|
|
}
|
|
|
|
|
2022-03-11 12:48:45 +00:00
|
|
|
// SelectRoomIDsWithAnyMembership returns a map of all memberships for the given user.
|
|
|
|
func (s *currentRoomStateStatements) SelectRoomIDsWithAnyMembership(
|
|
|
|
ctx context.Context,
|
|
|
|
txn *sql.Tx,
|
|
|
|
userID string,
|
|
|
|
) (map[string]string, error) {
|
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithAnyMembershipStmt)
|
|
|
|
rows, err := stmt.QueryContext(ctx, userID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithAnyMembership: rows.close() failed")
|
|
|
|
|
|
|
|
result := map[string]string{}
|
|
|
|
for rows.Next() {
|
|
|
|
var roomID string
|
|
|
|
var membership string
|
|
|
|
if err := rows.Scan(&roomID, &membership); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result[roomID] = membership
|
|
|
|
}
|
|
|
|
return result, rows.Err()
|
|
|
|
}
|
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
// SelectCurrentState returns all the current state events for the given room.
|
|
|
|
func (s *currentRoomStateStatements) SelectCurrentState(
|
2017-09-18 15:52:22 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, roomID string,
|
2023-04-04 17:16:53 +00:00
|
|
|
stateFilter *synctypes.StateFilter,
|
2021-02-04 12:20:37 +00:00
|
|
|
excludeEventIDs []string,
|
2023-04-27 11:54:20 +00:00
|
|
|
) ([]*rstypes.HeaderedEvent, error) {
|
2020-06-12 13:55:57 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
|
2022-04-11 07:05:23 +00:00
|
|
|
senders, notSenders := getSendersStateFilterFilter(stateFilter)
|
2023-02-07 13:31:23 +00:00
|
|
|
// We're going to query members later, so remove them from this request
|
|
|
|
if stateFilter.LazyLoadMembers && !stateFilter.IncludeRedundantMembers {
|
2023-04-19 14:50:33 +00:00
|
|
|
notTypes := &[]string{spec.MRoomMember}
|
2023-02-07 13:31:23 +00:00
|
|
|
if stateFilter.NotTypes != nil {
|
2023-04-19 14:50:33 +00:00
|
|
|
*stateFilter.NotTypes = append(*stateFilter.NotTypes, spec.MRoomMember)
|
2023-02-07 13:31:23 +00:00
|
|
|
} else {
|
|
|
|
stateFilter.NotTypes = notTypes
|
|
|
|
}
|
|
|
|
}
|
2019-08-07 10:12:09 +00:00
|
|
|
rows, err := stmt.QueryContext(ctx, roomID,
|
2022-04-11 07:05:23 +00:00
|
|
|
pq.StringArray(senders),
|
|
|
|
pq.StringArray(notSenders),
|
Upgrade gomatrixserverlib dependency (#808)
* Upgrade gomatrixserverlib dependency
Signed-off-by: Thibaut CHARLES cromfr@gmail.com
* Added missing passing sytest
Signed-off-by: Thibaut CHARLES cromfr@gmail.com
* Fix login using identifier key
Not a full fix, it only really supports logging in with
the localpart of an mxid.
Signed-off-by: Serra Allgood <serra@allgood.dev>
* Replace deprecated prometheus.InstrumentHandler and unsafe time.Ticker
* goimports
* re-add temporarily missing deps?
* Refactor InstrumentHandlerCounter definition
* URL decode args
* Return server names (#833)
* Remove unnecessary map->array processing
* Return server names in room federation directory query
* Knock off a TODO
* Fix /send_join and /send_leave (#821)
Fix the /send_join and /send_leave endpoints, so that they use the v2 endpoints as mandated by MSC1802. Also comment out the SyTest tests that are failing because of lack of support for the v1 endpoints.
* Refuse /send_join without m.room.create (#824)
Signed-off-by: Abhishek Kumar <abhishekkumar2718@gmail.com>
* AS should use the v1 endpoint, rather than r0 (#827)
* docker: Passthrough parameters to dendrite-monolith-server
* Fix copy & paste error (#812)
* Use gomatrixserverlib.Transaction instead of local type (#590) (#811)
* Move files back if linting fails (#810)
* replaced gometalinter description with golangci-lint (#837)
* Amend syncapi SQL queries to return missing columns (#840)
* This commit updates a couple of the syncapi SQL queries to return additional columns that are required/expected by rowsToStreamEvents in output_room_events_table.go.
It's not exactly clear to me yet what transaction_id and session_id do, but these being added n #367 results in state events breaking the /sync endpoint.
This is a temporary fix. We need to come up with a better solution.
* gomatrix to gomatrixserverlib on some weird line change
* Tweaks from @babolivier review comments
* Implement storage interfaces (#841)
* Implement interfaces for federationsender storage
* Implement interfaces for mediaapi storage
* Implement interfaces for publicroomsapi storage
* Implement interfaces for roomserver storage
* Implement interfaces for syncapi storage
* Implement interfaces for keydb storage
* common.PartitionStorer in publicroomsapi interface
* Update copyright notices
* make cmd directory path absolute in build.sh (#830)
* Resync testfile with current sytest pass/fail (#832)
* Resync testfile with current sytest pass/fail
* Add displayname test
* Fall back to postgres when database connection string parsing fails (#842)
* Fall back to postgres when parsing the database connection string for a URI schema fails
* Fix behaviour so that it really tries postgres when URL parsing fails and it complains about unknown schema if it succeeds
* Fix #842
* Fix #842 - again...
* Federation fixes (#845)
* Update gomatrixserverlib to p2p commit 92c0338, other tweaks
* Update gomatrixserverlib to p2p commit e5dcc65
* Rewrite getAuthChain
* Update gomatrixserverlib in go.mod/go.sum
* Correct a couple of package refs for updated gmsl/gomatrix
* Update gomatrixserverlib ref in go.mod/go.sum
* Update getAuthChain comments following @babolivier review
* Add a Sytest blacklist file (#849)
* Add more passing tests to the testfile, add test blacklist file (#848)
* CS API: Support for /messages, fixes for /sync (#847)
* Merge forward
* Tidy up a bit
* TODO: What to do with NextBatch here?
* Replace SyncPosition with PaginationToken throughout syncapi
* Fix PaginationTokens
* Fix lint errors
* Add a couple of missing functions into the syncapi external storage interface
* Some updates based on review comments from @babolivier
* Some updates based on review comments from @babolivier
* argh whitespacing
* Fix opentracing span
* Remove dead code
* Don't overshadow err (fix lint issue)
* Handle extremities after inserting event into topology
* Try insert event topology as ON CONFLICT DO NOTHING
* Prevent OOB error in addRoomDeltaToResponse
* Thwarted by gocyclo again
* Fix NewPaginationTokenFromString, define unit test for it
* Update pagination token test
* Update sytest-whitelist
* Hopefully fix some of the sync batch tokens
* Remove extraneous sync position func
* Revert to topology tokens in addRoomDeltaToResponse etc
* Fix typo
* Remove prevPDUPos as dead now that backwardTopologyPos is used instead
* Fix selectEventsWithEventIDsSQL
* Update sytest-blacklist
* Update sytest-whitelist
* Some fixes for #847 (#850)
* Fix a couple of cases where backfilling events we already had causes panics, hopefully fix ordering of events, update GMSL dependency for backfill URL fixes
* Remove commented out lines from output_room_events_table schema
* Wire up publicroomsapi for roomserver events (#851)
* Wire up publicroomsapi to roomserver events
* Remove parameter that was incorrectly brought over from p2p work
* nolint containsBackwardExtremity for now
* Store our own keys in the keydb (#853)
* Store our own keys in the keydb
The DirectKeyFetcher makes the assumption that you can always reach the key/v2/server endpoint of any server, including our own. We previously haven't bothered to store our own keys in the keydb so this would mean we end up making key requests to ourselves.
In the libp2p world as an example, self-dialling is not possible, therefore this would render it impossible to get our own keys.
This commit adds our own keys into the keydb so that we don't create unnecessarily (and maybe impossible) requests.
* Use golang.org/x/crypto/ed25519 instead of crypto/ed25519 for pre-Go 1.13
* More sync fixes (#854)
* Further sync tweaks
* Remove unnecessary blank line
* getBackwardTopologyPos always returns a usable value
* Revert order fixing
* Implement GET endpoints for account_data in clientapi (#861)
* Implement GET endpoints for account_data in clientapi
* Fix accountDB parameter
* Remove fmt.Println
* Add empty push rules into account data on account creation (#862)
* Handle kind=guest query parameter on /register (#860)
* Handle kind=guest query parameter on /register
* Reorganized imports
* Pass device_id as nil
* Added tests to systest-whitelist
* Update sytest-whitelist
* Blacklist 'displayname updates affect room member events' (#859)
* Room version abstractions (#865)
* Rough first pass at adding room version abstractions
* Define newer room versions
* Update room version metadata
* Fix roomserver/versions
* Try to fix whitespace in roomsSchema
* Implement room version capabilities in CS API (#866)
* Add wiring for querying the roomserver for the default room version
* Try to implement /capabilities for room versions
* Update copyright notices
* Update sytests, add /capabilities endpoint into CS API
* Update sytest-whitelist
* Add GetDefaultRoomVersion
* Fix cases where state package was shadowed
* Fix version formatting
* Update Dockerfile to Go 1.13.6
* oh yes types I remember
* And fix the default too
* Update documentation for Go 1.13 (#867)
* Pass cfg by reference around the codebase (#819)
* Pass cfg by reference around the codebase
* Merge branch 'master' into pass-cfg-by-ref
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
* Implement missing device management features (#835)
* Implement missing device management features
Signed-off-by: Till Faelligen <tfaelligen@gmail.com>
* Add a little more documentation
* Undo changes
* Use non-anonymous struct to decode devices list
* Update sytest-whitelist
* Update sytest-whitelist
* Update sytest-blacklist
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
* Adding sslmode: disable to sytest server config (#813)
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
* Fix AppService bind addrs in test (#805)
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
* Always defer *sql.Rows.Close and consult with Err (#844)
* Always defer *sql.Rows.Close and consult with Err
database/sql.Rows.Next() makes sure to call Close only after exhausting
result rows which would NOT happen when returning early from a bad Scan.
Close being idempotent makes it a great candidate to get always deferred
regardless of what happens later on the result set.
This change also makes sure call Err() after exhausting Next() and
propagate non-nil results from it as the documentation advises.
Closes #764
Signed-off-by: Kiril Vladimiroff <kiril@vladimiroff.org>
* Override named result parameters in last returns
Signed-off-by: Kiril Vladimiroff <kiril@vladimiroff.org>
* Do the same over new changes that got merged
Signed-off-by: Kiril Vladimiroff <kiril@vladimiroff.org>
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
* Clean up
Co-authored-by: Serra Allgood <serra@allgood.dev>
Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
Co-authored-by: Brendan Abolivier <github@brendanabolivier.com>
Co-authored-by: Abhishek Kumar <31231064+abhishekkumar2718@users.noreply.github.com>
Co-authored-by: Will Hunt <will@half-shot.uk>
Co-authored-by: S7evinK <tfaelligen@gmail.com>
Co-authored-by: Arshpreet <30545756+arsh-7@users.noreply.github.com>
Co-authored-by: Prateek Sachan <42961174+prateek2211@users.noreply.github.com>
Co-authored-by: Behouba Manassé <behouba@gmail.com>
Co-authored-by: aditsachde <23707194+aditsachde@users.noreply.github.com>
Co-authored-by: Kiril Vladimiroff <kiril@vladimiroff.org>
2020-02-11 15:46:51 +00:00
|
|
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.Types)),
|
|
|
|
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilter.NotTypes)),
|
|
|
|
stateFilter.ContainsURL,
|
2021-02-04 12:20:37 +00:00
|
|
|
pq.StringArray(excludeEventIDs),
|
2019-08-07 10:12:09 +00:00
|
|
|
)
|
2017-04-13 15:56:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectCurrentState: rows.close() failed")
|
2017-04-13 15:56:46 +00:00
|
|
|
|
2017-06-07 15:35:41 +00:00
|
|
|
return rowsToEvents(rows)
|
|
|
|
}
|
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
|
2017-09-18 15:52:22 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventID string,
|
|
|
|
) error {
|
2020-06-12 13:55:57 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
|
2017-09-18 15:52:22 +00:00
|
|
|
_, err := stmt.ExecContext(ctx, eventID)
|
2017-06-07 15:35:41 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-09-15 10:17:46 +00:00
|
|
|
func (s *currentRoomStateStatements) DeleteRoomStateForRoom(
|
|
|
|
ctx context.Context, txn *sql.Tx, roomID string,
|
|
|
|
) error {
|
2022-03-16 10:25:50 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateForRoomStmt)
|
2020-09-15 10:17:46 +00:00
|
|
|
_, err := stmt.ExecContext(ctx, roomID)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
func (s *currentRoomStateStatements) UpsertRoomState(
|
2017-09-18 15:52:22 +00:00
|
|
|
ctx context.Context, txn *sql.Tx,
|
2023-04-27 11:54:20 +00:00
|
|
|
event *rstypes.HeaderedEvent, membership *string, addedAt types.StreamPosition,
|
2017-06-07 15:35:41 +00:00
|
|
|
) error {
|
2019-08-07 10:12:09 +00:00
|
|
|
// Parse content as JSON and search for an "url" key
|
|
|
|
containsURL := false
|
|
|
|
var content map[string]interface{}
|
|
|
|
if json.Unmarshal(event.Content(), &content) != nil {
|
|
|
|
// Set containsURL to true if url is present
|
|
|
|
_, containsURL = content["url"]
|
|
|
|
}
|
|
|
|
|
2020-03-19 12:07:01 +00:00
|
|
|
headeredJSON, err := json.Marshal(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-08-07 10:12:09 +00:00
|
|
|
// upsert state event
|
2020-06-12 13:55:57 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
|
2020-03-19 12:07:01 +00:00
|
|
|
_, err = stmt.ExecContext(
|
2017-09-18 15:52:22 +00:00
|
|
|
ctx,
|
2023-09-15 14:39:06 +00:00
|
|
|
event.RoomID().String(),
|
2017-09-18 15:52:22 +00:00
|
|
|
event.EventID(),
|
|
|
|
event.Type(),
|
2023-06-28 18:29:49 +00:00
|
|
|
event.UserID.String(),
|
2019-08-07 10:12:09 +00:00
|
|
|
containsURL,
|
2023-06-28 18:29:49 +00:00
|
|
|
*event.StateKeyResolved,
|
2020-03-19 12:07:01 +00:00
|
|
|
headeredJSON,
|
2017-09-18 15:52:22 +00:00
|
|
|
membership,
|
|
|
|
addedAt,
|
2022-07-18 12:46:15 +00:00
|
|
|
event.Visibility,
|
2017-06-07 15:35:41 +00:00
|
|
|
)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
|
2017-09-18 15:52:22 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, eventIDs []string,
|
2020-01-23 17:51:10 +00:00
|
|
|
) ([]types.StreamEvent, error) {
|
2020-06-12 13:55:57 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectEventsWithEventIDsStmt)
|
2017-09-18 15:52:22 +00:00
|
|
|
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
|
2017-06-07 15:35:41 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-05-21 13:40:13 +00:00
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
|
2022-10-03 10:57:21 +00:00
|
|
|
return currentRoomStateRowsToStreamEvents(rows)
|
|
|
|
}
|
|
|
|
|
|
|
|
func currentRoomStateRowsToStreamEvents(rows *sql.Rows) ([]types.StreamEvent, error) {
|
|
|
|
var events []types.StreamEvent
|
|
|
|
for rows.Next() {
|
|
|
|
var (
|
|
|
|
eventID string
|
|
|
|
streamPos types.StreamPosition
|
|
|
|
eventBytes []byte
|
|
|
|
historyVisibility gomatrixserverlib.HistoryVisibility
|
|
|
|
)
|
|
|
|
if err := rows.Scan(&eventID, &streamPos, &eventBytes, &historyVisibility); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// TODO: Handle redacted events
|
2023-04-27 11:54:20 +00:00
|
|
|
var ev rstypes.HeaderedEvent
|
|
|
|
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
2022-10-03 10:57:21 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ev.Visibility = historyVisibility
|
|
|
|
|
|
|
|
events = append(events, types.StreamEvent{
|
|
|
|
HeaderedEvent: &ev,
|
|
|
|
StreamPosition: streamPos,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2023-11-09 07:42:33 +00:00
|
|
|
return events, rows.Err()
|
2017-06-07 15:35:41 +00:00
|
|
|
}
|
|
|
|
|
2023-04-27 11:54:20 +00:00
|
|
|
func rowsToEvents(rows *sql.Rows) ([]*rstypes.HeaderedEvent, error) {
|
|
|
|
result := []*rstypes.HeaderedEvent{}
|
2017-04-13 15:56:46 +00:00
|
|
|
for rows.Next() {
|
2020-12-09 18:07:17 +00:00
|
|
|
var eventID string
|
2017-04-13 15:56:46 +00:00
|
|
|
var eventBytes []byte
|
2020-12-09 18:07:17 +00:00
|
|
|
if err := rows.Scan(&eventID, &eventBytes); err != nil {
|
2017-04-13 15:56:46 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
// TODO: Handle redacted events
|
2023-04-27 11:54:20 +00:00
|
|
|
var ev rstypes.HeaderedEvent
|
|
|
|
if err := json.Unmarshal(eventBytes, &ev); err != nil {
|
2017-04-13 15:56:46 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
2020-11-16 15:44:53 +00:00
|
|
|
result = append(result, &ev)
|
2017-04-13 15:56:46 +00:00
|
|
|
}
|
2020-02-11 14:12:21 +00:00
|
|
|
return result, rows.Err()
|
2017-04-13 15:56:46 +00:00
|
|
|
}
|
2017-07-25 15:10:59 +00:00
|
|
|
|
2020-05-14 15:11:37 +00:00
|
|
|
func (s *currentRoomStateStatements) SelectStateEvent(
|
2022-09-28 09:18:03 +00:00
|
|
|
ctx context.Context, txn *sql.Tx, roomID, evType, stateKey string,
|
2023-04-27 11:54:20 +00:00
|
|
|
) (*rstypes.HeaderedEvent, error) {
|
2022-09-28 09:18:03 +00:00
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectStateEventStmt)
|
2017-07-25 15:10:59 +00:00
|
|
|
var res []byte
|
2017-10-06 10:23:58 +00:00
|
|
|
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
|
2017-09-18 15:52:22 +00:00
|
|
|
if err == sql.ErrNoRows {
|
2017-07-25 15:10:59 +00:00
|
|
|
return nil, nil
|
|
|
|
}
|
2017-09-18 15:52:22 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2023-04-27 11:54:20 +00:00
|
|
|
var ev rstypes.HeaderedEvent
|
2020-03-19 12:07:01 +00:00
|
|
|
if err = json.Unmarshal(res, &ev); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2017-07-25 15:10:59 +00:00
|
|
|
return &ev, err
|
|
|
|
}
|
2022-07-15 15:25:26 +00:00
|
|
|
|
|
|
|
func (s *currentRoomStateStatements) SelectSharedUsers(
|
|
|
|
ctx context.Context, txn *sql.Tx, userID string, otherUserIDs []string,
|
|
|
|
) ([]string, error) {
|
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectSharedUsersStmt)
|
2022-08-03 16:35:17 +00:00
|
|
|
rows, err := stmt.QueryContext(ctx, userID, pq.Array(otherUserIDs))
|
2022-07-15 15:25:26 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectSharedUsersStmt: rows.close() failed")
|
|
|
|
|
|
|
|
var stateKey string
|
|
|
|
result := make([]string, 0, len(otherUserIDs))
|
|
|
|
for rows.Next() {
|
|
|
|
if err := rows.Scan(&stateKey); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result = append(result, stateKey)
|
|
|
|
}
|
|
|
|
return result, rows.Err()
|
|
|
|
}
|
2023-01-12 09:06:03 +00:00
|
|
|
|
|
|
|
func (s *currentRoomStateStatements) SelectRoomHeroes(ctx context.Context, txn *sql.Tx, roomID, excludeUserID string, memberships []string) ([]string, error) {
|
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectRoomHeroesStmt)
|
|
|
|
rows, err := stmt.QueryContext(ctx, roomID, pq.StringArray(memberships), excludeUserID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomHeroesStmt: rows.close() failed")
|
|
|
|
|
|
|
|
var stateKey string
|
|
|
|
result := make([]string, 0, 5)
|
|
|
|
for rows.Next() {
|
|
|
|
if err = rows.Scan(&stateKey); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
result = append(result, stateKey)
|
|
|
|
}
|
|
|
|
return result, rows.Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *currentRoomStateStatements) SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string) (count int, err error) {
|
|
|
|
stmt := sqlutil.TxStmt(txn, s.selectMembershipCountStmt)
|
|
|
|
err = stmt.QueryRowContext(ctx, roomID, membership).Scan(&count)
|
|
|
|
if err != nil {
|
|
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
|
|
return 0, nil
|
|
|
|
}
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return count, nil
|
|
|
|
}
|