2020-02-13 17:27:33 +00:00
// Copyright 2017-2018 New Vector Ltd
// Copyright 2019-2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm
* Use sqlite3_js driver when running in JS
* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only
* Update to latest go-sqlite-js version
* Replace prometheus with a stub. sigh
* Hard-code a config and don't use opentracing
* Latest go-sqlite3-js version
* Generate a key for now
* Listen for fetch traffic rather than HTTP
* Latest hacks for js
* libp2p support
* More libp2p
* Fork gjson to allow us to enforce auth checks as before
Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.
See https://github.com/tidwall/gjson/issues/157
When it's resolved, let's go back to mainline.
* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157
* Use latest gomatrixserverlib for sig checks
* Fix a bug which could cause exclude_from_sync to not be set
Caused when sending events over federation.
* Use query variadic to make lookups actually work!
* Latest gomatrixserverlib
* Add notes on getting p2p up and running
Partly so I don't forget myself!
* refactor: Move p2p specific stuff to cmd/dendritejs
This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.
Also, clean up main.go to read a bit better.
* Update ho-http-js-libp2p to return errors from RoundTrip
* Add an LRU cache around the key DB
We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.
Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)
* breaking: Add Tracing.Enabled to toggle whether we do opentracing
Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.
* Start adding conditional builds for wasm to handle lib/pq
The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).
* Remove lib/pq for wasm for syncapi
* Add conditional building to remaining storage APIs
* Update build script to set env vars correctly for dendritejs
* sqlite bug fixes
* Docs
* Add a no-op main for dendritejs when not building under wasm
* Use the real prometheus, even for WASM
Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:
```
global.process = {
pid: 1,
};
global.fs.stat = function(path, cb) {
cb({
code: "EINVAL",
});
}
```
* Linting
2020-03-06 10:23:55 +00:00
"encoding/json"
2023-05-24 10:14:42 +00:00
"errors"
2020-02-13 17:27:33 +00:00
"fmt"
2021-04-26 12:25:57 +00:00
"sort"
2020-02-13 17:27:33 +00:00
"strings"
2020-05-21 13:40:13 +00:00
"github.com/matrix-org/dendrite/internal"
2020-06-12 13:55:57 +00:00
"github.com/matrix-org/dendrite/internal/sqlutil"
2023-05-24 10:14:42 +00:00
"github.com/matrix-org/dendrite/roomserver/storage/sqlite3/deltas"
2020-05-26 15:45:28 +00:00
"github.com/matrix-org/dendrite/roomserver/storage/tables"
2020-02-13 17:27:33 +00:00
"github.com/matrix-org/dendrite/roomserver/types"
)
const eventsSchema = `
CREATE TABLE IF NOT EXISTS roomserver_events (
event_nid INTEGER PRIMARY KEY AUTOINCREMENT ,
room_nid INTEGER NOT NULL ,
event_type_nid INTEGER NOT NULL ,
event_state_key_nid INTEGER NOT NULL ,
sent_to_output BOOLEAN NOT NULL DEFAULT FALSE ,
state_snapshot_nid INTEGER NOT NULL DEFAULT 0 ,
depth INTEGER NOT NULL ,
event_id TEXT NOT NULL UNIQUE ,
2020-09-16 12:00:52 +00:00
auth_event_nids TEXT NOT NULL DEFAULT ' [ ] ' ,
is_rejected BOOLEAN NOT NULL DEFAULT FALSE
2020-02-13 17:27:33 +00:00
) ;
`
const insertEventSQL = `
2023-05-24 10:14:42 +00:00
INSERT INTO roomserver_events ( room_nid , event_type_nid , event_state_key_nid , event_id , auth_event_nids , depth , is_rejected )
VALUES ( $ 1 , $ 2 , $ 3 , $ 4 , $ 5 , $ 6 , $ 7 )
2022-02-08 13:45:48 +00:00
ON CONFLICT DO UPDATE
2023-05-24 10:14:42 +00:00
SET is_rejected = $ 7 WHERE is_rejected = 1
2021-12-09 15:03:26 +00:00
RETURNING event_nid , state_snapshot_nid ;
2020-02-13 17:27:33 +00:00
`
const selectEventSQL = "" +
"SELECT event_nid, state_snapshot_nid FROM roomserver_events WHERE event_id = $1"
2022-11-01 15:07:17 +00:00
const bulkSelectSnapshotsForEventIDsSQL = "" +
"SELECT event_id, state_snapshot_nid FROM roomserver_events WHERE event_id IN ($1)"
2020-02-13 17:27:33 +00:00
// Bulk lookup of events by string ID.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id IN ($1)" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
2022-08-18 16:06:13 +00:00
// Bulk lookup of events by string ID that aren't rejected.
// Sort by the numeric IDs for event type and state key.
// This means we can use binary search to lookup entries by type and state key.
const bulkSelectStateEventByIDExcludingRejectedSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_id IN ($1) AND is_rejected = 0" +
" ORDER BY event_type_nid, event_state_key_nid ASC"
2021-04-26 12:25:57 +00:00
const bulkSelectStateEventByNIDSQL = "" +
"SELECT event_type_nid, event_state_key_nid, event_nid FROM roomserver_events" +
" WHERE event_nid IN ($1)"
2022-05-09 13:30:32 +00:00
// Rest of query is built by BulkSelectStateEventByNID
2021-04-26 12:25:57 +00:00
2020-02-13 17:27:33 +00:00
const bulkSelectStateAtEventByIDSQL = "" +
2020-09-16 12:00:52 +00:00
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, is_rejected FROM roomserver_events" +
2020-02-13 17:27:33 +00:00
" WHERE event_id IN ($1)"
const updateEventStateSQL = "" +
"UPDATE roomserver_events SET state_snapshot_nid = $1 WHERE event_nid = $2"
const selectEventSentToOutputSQL = "" +
"SELECT sent_to_output FROM roomserver_events WHERE event_nid = $1"
const updateEventSentToOutputSQL = "" +
"UPDATE roomserver_events SET sent_to_output = TRUE WHERE event_nid = $1"
const selectEventIDSQL = "" +
"SELECT event_id FROM roomserver_events WHERE event_nid = $1"
const bulkSelectStateAtEventAndReferenceSQL = "" +
2023-05-24 10:14:42 +00:00
"SELECT event_type_nid, event_state_key_nid, event_nid, state_snapshot_nid, event_id" +
2020-02-13 17:27:33 +00:00
" FROM roomserver_events WHERE event_nid IN ($1)"
const bulkSelectEventIDSQL = "" +
"SELECT event_nid, event_id FROM roomserver_events WHERE event_nid IN ($1)"
const bulkSelectEventNIDSQL = "" +
2023-02-24 08:40:20 +00:00
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE event_id IN ($1)"
2020-02-13 17:27:33 +00:00
2022-02-17 13:53:48 +00:00
const bulkSelectUnsentEventNIDSQL = "" +
2023-02-24 08:40:20 +00:00
"SELECT event_id, event_nid, room_nid FROM roomserver_events WHERE sent_to_output = 0 AND event_id IN ($1)"
2022-02-17 13:53:48 +00:00
2020-02-13 17:27:33 +00:00
const selectMaxEventDepthSQL = "" +
"SELECT COALESCE(MAX(depth) + 1, 0) FROM roomserver_events WHERE event_nid IN ($1)"
2020-12-16 10:33:28 +00:00
const selectRoomNIDsForEventNIDsSQL = "" +
"SELECT event_nid, room_nid FROM roomserver_events WHERE event_nid IN ($1)"
2020-03-27 16:28:22 +00:00
2022-08-18 09:37:47 +00:00
const selectEventRejectedSQL = "" +
"SELECT is_rejected FROM roomserver_events WHERE room_nid = $1 AND event_id = $2"
2020-02-13 17:27:33 +00:00
type eventStatements struct {
2022-08-18 16:06:13 +00:00
db * sql . DB
insertEventStmt * sql . Stmt
selectEventStmt * sql . Stmt
2022-11-01 15:07:17 +00:00
bulkSelectSnapshotsForEventIDsStmt * sql . Stmt
2022-08-18 16:06:13 +00:00
bulkSelectStateEventByIDStmt * sql . Stmt
bulkSelectStateEventByIDExcludingRejectedStmt * sql . Stmt
bulkSelectStateAtEventByIDStmt * sql . Stmt
updateEventStateStmt * sql . Stmt
selectEventSentToOutputStmt * sql . Stmt
updateEventSentToOutputStmt * sql . Stmt
selectEventIDStmt * sql . Stmt
bulkSelectStateAtEventAndReferenceStmt * sql . Stmt
bulkSelectEventIDStmt * sql . Stmt
selectEventRejectedStmt * sql . Stmt
2022-02-17 13:53:48 +00:00
//bulkSelectEventNIDStmt *sql.Stmt
//bulkSelectUnsentEventNIDStmt *sql.Stmt
//selectRoomNIDsForEventNIDsStmt *sql.Stmt
2020-02-13 17:27:33 +00:00
}
2022-05-09 13:30:32 +00:00
func CreateEventsTable ( db * sql . DB ) error {
2021-04-26 12:25:57 +00:00
_ , err := db . Exec ( eventsSchema )
2023-05-24 10:14:42 +00:00
if err != nil {
return err
}
// check if the column exists
var cName string
migrationName := "roomserver: drop column reference_sha from roomserver_events"
err = db . QueryRowContext ( context . Background ( ) , ` SELECT p.name FROM sqlite_master AS m JOIN pragma_table_info(m.name) AS p WHERE m.name = 'roomserver_events' AND p.name = 'reference_sha256' ` ) . Scan ( & cName )
if err != nil {
if errors . Is ( err , sql . ErrNoRows ) { // migration was already executed, as the column was removed
if err = sqlutil . InsertMigration ( context . Background ( ) , db , migrationName ) ; err != nil {
return fmt . Errorf ( "unable to manually insert migration '%s': %w" , migrationName , err )
}
return nil
}
return err
}
m := sqlutil . NewMigrator ( db )
m . AddMigrations ( [ ] sqlutil . Migration {
{
Version : migrationName ,
Up : deltas . UpDropEventReferenceSHA ,
} ,
} ... )
return m . Up ( context . Background ( ) )
2021-04-26 12:25:57 +00:00
}
2022-05-09 13:30:32 +00:00
func PrepareEventsTable ( db * sql . DB ) ( tables . Events , error ) {
2020-07-21 09:48:49 +00:00
s := & eventStatements {
2020-08-19 14:38:27 +00:00
db : db ,
2020-07-21 09:48:49 +00:00
}
2020-02-13 17:27:33 +00:00
2021-07-28 17:30:04 +00:00
return s , sqlutil . StatementList {
2020-02-13 17:27:33 +00:00
{ & s . insertEventStmt , insertEventSQL } ,
{ & s . selectEventStmt , selectEventSQL } ,
2022-11-01 15:07:17 +00:00
{ & s . bulkSelectSnapshotsForEventIDsStmt , bulkSelectSnapshotsForEventIDsSQL } ,
2020-02-13 17:27:33 +00:00
{ & s . bulkSelectStateEventByIDStmt , bulkSelectStateEventByIDSQL } ,
2022-08-18 16:06:13 +00:00
{ & s . bulkSelectStateEventByIDExcludingRejectedStmt , bulkSelectStateEventByIDExcludingRejectedSQL } ,
2020-02-13 17:27:33 +00:00
{ & s . bulkSelectStateAtEventByIDStmt , bulkSelectStateAtEventByIDSQL } ,
{ & s . updateEventStateStmt , updateEventStateSQL } ,
{ & s . updateEventSentToOutputStmt , updateEventSentToOutputSQL } ,
{ & s . selectEventSentToOutputStmt , selectEventSentToOutputSQL } ,
{ & s . selectEventIDStmt , selectEventIDSQL } ,
{ & s . bulkSelectStateAtEventAndReferenceStmt , bulkSelectStateAtEventAndReferenceSQL } ,
{ & s . bulkSelectEventIDStmt , bulkSelectEventIDSQL } ,
2022-02-17 13:53:48 +00:00
//{&s.bulkSelectEventNIDStmt, bulkSelectEventNIDSQL},
//{&s.bulkSelectUnsentEventNIDStmt, bulkSelectUnsentEventNIDSQL},
2020-12-16 10:33:28 +00:00
//{&s.selectRoomNIDForEventNIDStmt, selectRoomNIDForEventNIDSQL},
2022-08-18 09:37:47 +00:00
{ & s . selectEventRejectedStmt , selectEventRejectedSQL } ,
2020-05-27 10:03:47 +00:00
} . Prepare ( db )
2020-02-13 17:27:33 +00:00
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) InsertEvent (
2020-02-13 17:27:33 +00:00
ctx context . Context ,
txn * sql . Tx ,
roomNID types . RoomNID ,
eventTypeNID types . EventTypeNID ,
eventStateKeyNID types . EventStateKeyNID ,
eventID string ,
authEventNIDs [ ] types . EventNID ,
depth int64 ,
2020-09-16 12:00:52 +00:00
isRejected bool ,
2020-05-26 15:45:28 +00:00
) ( types . EventNID , types . StateSnapshotNID , error ) {
2020-04-29 17:41:45 +00:00
// attempt to insert: the last_row_id is the event NID
2020-07-21 09:48:49 +00:00
var eventNID int64
2021-12-09 15:03:26 +00:00
var stateNID int64
2020-08-19 14:38:27 +00:00
insertStmt := sqlutil . TxStmt ( txn , s . insertEventStmt )
2021-12-09 15:03:26 +00:00
err := insertStmt . QueryRowContext (
2020-08-19 14:38:27 +00:00
ctx , int64 ( roomNID ) , int64 ( eventTypeNID ) , int64 ( eventStateKeyNID ) ,
2023-05-24 10:14:42 +00:00
eventID , eventNIDsAsArray ( authEventNIDs ) , depth , isRejected ,
2021-12-09 15:03:26 +00:00
) . Scan ( & eventNID , & stateNID )
return types . EventNID ( eventNID ) , types . StateSnapshotNID ( stateNID ) , err
2020-02-13 17:27:33 +00:00
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) SelectEvent (
2020-02-13 17:27:33 +00:00
ctx context . Context , txn * sql . Tx , eventID string ,
) ( types . EventNID , types . StateSnapshotNID , error ) {
var eventNID int64
var stateNID int64
2020-06-12 13:55:57 +00:00
selectStmt := sqlutil . TxStmt ( txn , s . selectEventStmt )
2020-02-13 17:27:33 +00:00
err := selectStmt . QueryRowContext ( ctx , eventID ) . Scan ( & eventNID , & stateNID )
return types . EventNID ( eventNID ) , types . StateSnapshotNID ( stateNID ) , err
}
2022-11-01 15:07:17 +00:00
func ( s * eventStatements ) BulkSelectSnapshotsFromEventIDs (
ctx context . Context , txn * sql . Tx , eventIDs [ ] string ,
) ( map [ types . StateSnapshotNID ] [ ] string , error ) {
qry := strings . Replace ( bulkSelectSnapshotsForEventIDsSQL , "($1)" , sqlutil . QueryVariadic ( len ( eventIDs ) ) , 1 )
stmt , err := s . db . Prepare ( qry )
if err != nil {
return nil , err
}
defer internal . CloseAndLogIfError ( ctx , stmt , "BulkSelectSnapshotsFromEventIDs: stmt.close() failed" )
params := make ( [ ] interface { } , len ( eventIDs ) )
for i := range eventIDs {
params [ i ] = eventIDs [ i ]
}
rows , err := stmt . QueryContext ( ctx , params ... )
if err != nil {
return nil , err
}
defer internal . CloseAndLogIfError ( ctx , rows , "BulkSelectSnapshotsFromEventIDs: rows.close() failed" )
var eventID string
var stateNID types . StateSnapshotNID
result := make ( map [ types . StateSnapshotNID ] [ ] string )
for rows . Next ( ) {
if err := rows . Scan ( & eventID , & stateNID ) ; err != nil {
return nil , err
}
result [ stateNID ] = append ( result [ stateNID ] , eventID )
}
return result , rows . Err ( )
}
2020-02-13 17:27:33 +00:00
// bulkSelectStateEventByID lookups a list of state events by event ID.
2022-08-18 16:06:13 +00:00
// If not excluding rejected events, and any of the requested events are missing from
// the database it returns a types.MissingEventError. If excluding rejected events,
// the events will be silently omitted without error.
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) BulkSelectStateEventByID (
2022-08-18 16:06:13 +00:00
ctx context . Context , txn * sql . Tx , eventIDs [ ] string , excludeRejected bool ,
2020-02-13 17:27:33 +00:00
) ( [ ] types . StateEntry , error ) {
///////////////
2022-08-18 16:06:13 +00:00
var sql string
if excludeRejected {
sql = bulkSelectStateEventByIDExcludingRejectedSQL
} else {
sql = bulkSelectStateEventByIDSQL
}
2020-02-13 17:27:33 +00:00
iEventIDs := make ( [ ] interface { } , len ( eventIDs ) )
for k , v := range eventIDs {
iEventIDs [ k ] = v
}
2022-08-18 16:06:13 +00:00
selectOrig := strings . Replace ( sql , "($1)" , sqlutil . QueryVariadic ( len ( iEventIDs ) ) , 1 )
2022-03-04 15:05:42 +00:00
selectPrep , err := s . db . Prepare ( selectOrig )
2020-02-13 17:27:33 +00:00
if err != nil {
return nil , err
}
2022-03-04 15:05:42 +00:00
defer selectPrep . Close ( ) // nolint:errcheck
selectStmt := sqlutil . TxStmt ( txn , selectPrep )
2020-02-13 17:27:33 +00:00
///////////////
rows , err := selectStmt . QueryContext ( ctx , iEventIDs ... )
if err != nil {
return nil , err
}
2020-05-21 13:40:13 +00:00
defer internal . CloseAndLogIfError ( ctx , rows , "bulkSelectStateEventByID: rows.close() failed" )
2020-02-13 17:27:33 +00:00
// We know that we will only get as many results as event IDs
// because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
2022-08-18 16:06:13 +00:00
results := make ( [ ] types . StateEntry , 0 , len ( eventIDs ) )
2020-02-13 17:27:33 +00:00
i := 0
for ; rows . Next ( ) ; i ++ {
2022-08-18 16:06:13 +00:00
var result types . StateEntry
2020-02-13 17:27:33 +00:00
if err = rows . Scan (
& result . EventTypeNID ,
& result . EventStateKeyNID ,
& result . EventNID ,
) ; err != nil {
return nil , err
}
2022-08-18 16:06:13 +00:00
results = append ( results , result )
2020-02-13 17:27:33 +00:00
}
2023-11-09 07:42:33 +00:00
if err = rows . Err ( ) ; err != nil {
return nil , err
}
2022-08-18 16:06:13 +00:00
if ! excludeRejected && i != len ( eventIDs ) {
2020-02-13 17:27:33 +00:00
// If there are fewer rows returned than IDs then we were asked to lookup event IDs we don't have.
// We don't know which ones were missing because we don't return the string IDs in the query.
// However it should be possible debug this by replaying queries or entries from the input kafka logs.
// If this turns out to be impossible and we do need the debug information here, it would be better
2020-05-21 13:40:13 +00:00
// to do it as a separate query rather than slowing down/complicating the internal case.
2020-02-13 17:27:33 +00:00
return nil , types . MissingEventError (
fmt . Sprintf ( "storage: state event IDs missing from the database (%d != %d)" , i , len ( eventIDs ) ) ,
)
}
return results , err
}
2021-04-26 12:25:57 +00:00
// bulkSelectStateEventByID lookups a list of state events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError
func ( s * eventStatements ) BulkSelectStateEventByNID (
2022-02-04 10:39:34 +00:00
ctx context . Context , txn * sql . Tx , eventNIDs [ ] types . EventNID ,
2021-04-26 12:25:57 +00:00
stateKeyTuples [ ] types . StateKeyTuple ,
) ( [ ] types . StateEntry , error ) {
2022-05-16 17:33:16 +00:00
tuples := types . StateKeyTupleSorter ( stateKeyTuples )
2021-04-26 12:25:57 +00:00
sort . Sort ( tuples )
2022-05-16 17:33:16 +00:00
eventTypeNIDArray , eventStateKeyNIDArray := tuples . TypesAndStateKeysAsArrays ( )
2021-04-26 12:25:57 +00:00
params := make ( [ ] interface { } , 0 , len ( eventNIDs ) + len ( eventTypeNIDArray ) + len ( eventStateKeyNIDArray ) )
selectOrig := strings . Replace ( bulkSelectStateEventByNIDSQL , "($1)" , sqlutil . QueryVariadic ( len ( eventNIDs ) ) , 1 )
for _ , v := range eventNIDs {
params = append ( params , v )
}
if len ( eventTypeNIDArray ) > 0 {
selectOrig += " AND event_type_nid IN " + sqlutil . QueryVariadicOffset ( len ( eventTypeNIDArray ) , len ( params ) )
for _ , v := range eventTypeNIDArray {
params = append ( params , v )
}
}
if len ( eventStateKeyNIDArray ) > 0 {
selectOrig += " AND event_state_key_nid IN " + sqlutil . QueryVariadicOffset ( len ( eventStateKeyNIDArray ) , len ( params ) )
for _ , v := range eventStateKeyNIDArray {
params = append ( params , v )
}
}
selectOrig += " ORDER BY event_type_nid, event_state_key_nid ASC"
2022-03-04 15:05:42 +00:00
selectPrep , err := s . db . Prepare ( selectOrig )
2021-04-26 12:25:57 +00:00
if err != nil {
return nil , fmt . Errorf ( "s.db.Prepare: %w" , err )
}
2022-03-04 15:05:42 +00:00
defer selectPrep . Close ( ) // nolint:errcheck
selectStmt := sqlutil . TxStmt ( txn , selectPrep )
2021-04-26 12:25:57 +00:00
rows , err := selectStmt . QueryContext ( ctx , params ... )
if err != nil {
return nil , fmt . Errorf ( "selectStmt.QueryContext: %w" , err )
}
defer internal . CloseAndLogIfError ( ctx , rows , "bulkSelectStateEventByID: rows.close() failed" )
// We know that we will only get as many results as event IDs
// because of the unique constraint on event IDs.
// So we can allocate an array of the correct size now.
// We might get fewer results than IDs so we adjust the length of the slice before returning it.
results := make ( [ ] types . StateEntry , len ( eventNIDs ) )
i := 0
for ; rows . Next ( ) ; i ++ {
result := & results [ i ]
if err = rows . Scan (
& result . EventTypeNID ,
& result . EventStateKeyNID ,
& result . EventNID ,
) ; err != nil {
return nil , err
}
}
2023-11-09 07:42:33 +00:00
return results [ : i ] , rows . Err ( )
2021-04-26 12:25:57 +00:00
}
2020-02-13 17:27:33 +00:00
// bulkSelectStateAtEventByID lookups the state at a list of events by event ID.
// If any of the requested events are missing from the database it returns a types.MissingEventError.
// If we do not have the state for any of the requested events it returns a types.MissingEventError.
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) BulkSelectStateAtEventByID (
2022-02-04 10:39:34 +00:00
ctx context . Context , txn * sql . Tx , eventIDs [ ] string ,
2020-02-13 17:27:33 +00:00
) ( [ ] types . StateAtEvent , error ) {
///////////////
iEventIDs := make ( [ ] interface { } , len ( eventIDs ) )
for k , v := range eventIDs {
iEventIDs [ k ] = v
}
2020-06-12 13:55:57 +00:00
selectOrig := strings . Replace ( bulkSelectStateAtEventByIDSQL , "($1)" , sqlutil . QueryVariadic ( len ( iEventIDs ) ) , 1 )
2022-03-04 15:05:42 +00:00
selectPrep , err := s . db . Prepare ( selectOrig )
2020-02-13 17:27:33 +00:00
if err != nil {
return nil , err
}
2022-03-04 15:05:42 +00:00
defer selectPrep . Close ( ) // nolint:errcheck
selectStmt := sqlutil . TxStmt ( txn , selectPrep )
2020-02-13 17:27:33 +00:00
///////////////
rows , err := selectStmt . QueryContext ( ctx , iEventIDs ... )
if err != nil {
return nil , err
}
2020-05-21 13:40:13 +00:00
defer internal . CloseAndLogIfError ( ctx , rows , "bulkSelectStateAtEventByID: rows.close() failed" )
2020-02-13 17:27:33 +00:00
results := make ( [ ] types . StateAtEvent , len ( eventIDs ) )
i := 0
for ; rows . Next ( ) ; i ++ {
result := & results [ i ]
if err = rows . Scan (
& result . EventTypeNID ,
& result . EventStateKeyNID ,
& result . EventNID ,
& result . BeforeStateSnapshotNID ,
2020-09-16 12:00:52 +00:00
& result . IsRejected ,
2020-02-13 17:27:33 +00:00
) ; err != nil {
return nil , err
}
2022-01-27 14:29:14 +00:00
// Genuine create events are the only case where it's OK to have no previous state.
isCreate := result . EventTypeNID == types . MRoomCreateNID && result . EventStateKeyNID == 1
if result . BeforeStateSnapshotNID == 0 && ! isCreate {
2022-08-23 12:57:11 +00:00
return nil , types . MissingStateError (
2020-02-13 17:27:33 +00:00
fmt . Sprintf ( "storage: missing state for event NID %d" , result . EventNID ) ,
)
}
}
2023-11-09 07:42:33 +00:00
if err = rows . Err ( ) ; err != nil {
return nil , err
}
2020-02-13 17:27:33 +00:00
if i != len ( eventIDs ) {
return nil , types . MissingEventError (
fmt . Sprintf ( "storage: event IDs missing from the database (%d != %d)" , i , len ( eventIDs ) ) ,
)
}
return results , err
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) UpdateEventState (
2020-08-20 15:24:33 +00:00
ctx context . Context , txn * sql . Tx , eventNID types . EventNID , stateNID types . StateSnapshotNID ,
2020-02-13 17:27:33 +00:00
) error {
2020-08-20 15:24:33 +00:00
stmt := sqlutil . TxStmt ( txn , s . updateEventStateStmt )
_ , err := stmt . ExecContext ( ctx , int64 ( stateNID ) , int64 ( eventNID ) )
2020-08-19 14:38:27 +00:00
return err
2020-02-13 17:27:33 +00:00
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) SelectEventSentToOutput (
2020-02-13 17:27:33 +00:00
ctx context . Context , txn * sql . Tx , eventNID types . EventNID ,
) ( sentToOutput bool , err error ) {
2020-06-12 13:55:57 +00:00
selectStmt := sqlutil . TxStmt ( txn , s . selectEventSentToOutputStmt )
2020-02-13 17:27:33 +00:00
err = selectStmt . QueryRowContext ( ctx , int64 ( eventNID ) ) . Scan ( & sentToOutput )
return
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) UpdateEventSentToOutput ( ctx context . Context , txn * sql . Tx , eventNID types . EventNID ) error {
2020-08-19 14:38:27 +00:00
updateStmt := sqlutil . TxStmt ( txn , s . updateEventSentToOutputStmt )
_ , err := updateStmt . ExecContext ( ctx , int64 ( eventNID ) )
return err
2020-02-13 17:27:33 +00:00
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) SelectEventID (
2020-02-13 17:27:33 +00:00
ctx context . Context , txn * sql . Tx , eventNID types . EventNID ,
) ( eventID string , err error ) {
2020-06-12 13:55:57 +00:00
selectStmt := sqlutil . TxStmt ( txn , s . selectEventIDStmt )
2020-02-13 17:27:33 +00:00
err = selectStmt . QueryRowContext ( ctx , int64 ( eventNID ) ) . Scan ( & eventID )
return
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) BulkSelectStateAtEventAndReference (
2020-02-13 17:27:33 +00:00
ctx context . Context , txn * sql . Tx , eventNIDs [ ] types . EventNID ,
) ( [ ] types . StateAtEventAndReference , error ) {
///////////////
iEventNIDs := make ( [ ] interface { } , len ( eventNIDs ) )
for k , v := range eventNIDs {
iEventNIDs [ k ] = v
}
2020-06-12 13:55:57 +00:00
selectOrig := strings . Replace ( bulkSelectStateAtEventAndReferenceSQL , "($1)" , sqlutil . QueryVariadic ( len ( iEventNIDs ) ) , 1 )
2020-08-19 12:24:54 +00:00
selectPrep , err := s . db . Prepare ( selectOrig )
if err != nil {
return nil , err
}
2022-03-04 15:05:42 +00:00
defer selectPrep . Close ( ) // nolint:errcheck
selectStmt := sqlutil . TxStmt ( txn , selectPrep )
2020-02-13 17:27:33 +00:00
//////////////
2022-03-04 15:05:42 +00:00
rows , err := sqlutil . TxStmt ( txn , selectStmt ) . QueryContext ( ctx , iEventNIDs ... )
2020-02-13 17:27:33 +00:00
if err != nil {
2020-08-19 14:38:27 +00:00
return nil , fmt . Errorf ( "sqlutil.TxStmt.QueryContext: %w" , err )
2020-02-13 17:27:33 +00:00
}
2020-05-21 13:40:13 +00:00
defer internal . CloseAndLogIfError ( ctx , rows , "bulkSelectStateAtEventAndReference: rows.close() failed" )
2020-02-13 17:27:33 +00:00
results := make ( [ ] types . StateAtEventAndReference , len ( eventNIDs ) )
i := 0
2022-05-09 13:30:32 +00:00
var (
eventTypeNID int64
eventStateKeyNID int64
eventNID int64
stateSnapshotNID int64
eventID string
)
2020-02-13 17:27:33 +00:00
for ; rows . Next ( ) ; i ++ {
if err = rows . Scan (
2023-05-24 10:14:42 +00:00
& eventTypeNID , & eventStateKeyNID , & eventNID , & stateSnapshotNID , & eventID ,
2020-02-13 17:27:33 +00:00
) ; err != nil {
return nil , err
}
result := & results [ i ]
result . EventTypeNID = types . EventTypeNID ( eventTypeNID )
result . EventStateKeyNID = types . EventStateKeyNID ( eventStateKeyNID )
result . EventNID = types . EventNID ( eventNID )
result . BeforeStateSnapshotNID = types . StateSnapshotNID ( stateSnapshotNID )
result . EventID = eventID
}
2023-11-09 07:42:33 +00:00
if err = rows . Err ( ) ; err != nil {
return nil , err
}
2020-02-13 17:27:33 +00:00
if i != len ( eventNIDs ) {
return nil , fmt . Errorf ( "storage: event NIDs missing from the database (%d != %d)" , i , len ( eventNIDs ) )
}
return results , nil
}
// bulkSelectEventID returns a map from numeric event ID to string event ID.
2022-02-04 10:39:34 +00:00
func ( s * eventStatements ) BulkSelectEventID ( ctx context . Context , txn * sql . Tx , eventNIDs [ ] types . EventNID ) ( map [ types . EventNID ] string , error ) {
2020-02-13 17:27:33 +00:00
///////////////
iEventNIDs := make ( [ ] interface { } , len ( eventNIDs ) )
for k , v := range eventNIDs {
iEventNIDs [ k ] = v
}
2020-06-12 13:55:57 +00:00
selectOrig := strings . Replace ( bulkSelectEventIDSQL , "($1)" , sqlutil . QueryVariadic ( len ( iEventNIDs ) ) , 1 )
2022-03-04 15:05:42 +00:00
selectPrep , err := s . db . Prepare ( selectOrig )
2020-02-13 17:27:33 +00:00
if err != nil {
return nil , err
}
2022-03-04 15:05:42 +00:00
defer selectPrep . Close ( ) // nolint:errcheck
selectStmt := sqlutil . TxStmt ( txn , selectPrep )
2020-02-13 17:27:33 +00:00
///////////////
rows , err := selectStmt . QueryContext ( ctx , iEventNIDs ... )
if err != nil {
return nil , err
}
2020-05-21 13:40:13 +00:00
defer internal . CloseAndLogIfError ( ctx , rows , "bulkSelectEventID: rows.close() failed" )
2020-02-13 17:27:33 +00:00
results := make ( map [ types . EventNID ] string , len ( eventNIDs ) )
i := 0
2022-05-09 13:30:32 +00:00
var eventNID int64
var eventID string
2020-02-13 17:27:33 +00:00
for ; rows . Next ( ) ; i ++ {
if err = rows . Scan ( & eventNID , & eventID ) ; err != nil {
return nil , err
}
results [ types . EventNID ( eventNID ) ] = eventID
}
2023-11-09 07:42:33 +00:00
if err = rows . Err ( ) ; err != nil {
return nil , err
}
2020-02-13 17:27:33 +00:00
if i != len ( eventNIDs ) {
return nil , fmt . Errorf ( "storage: event NIDs missing from the database (%d != %d)" , i , len ( eventNIDs ) )
}
return results , nil
}
2022-02-17 13:53:48 +00:00
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID.
2020-02-13 17:27:33 +00:00
// If an event ID is not in the database then it is omitted from the map.
2023-02-24 08:40:20 +00:00
func ( s * eventStatements ) BulkSelectEventNID ( ctx context . Context , txn * sql . Tx , eventIDs [ ] string ) ( map [ string ] types . EventMetadata , error ) {
2022-02-17 13:53:48 +00:00
return s . bulkSelectEventNID ( ctx , txn , eventIDs , false )
}
// BulkSelectEventNIDs returns a map from string event ID to numeric event ID
// only for events that haven't already been sent to the roomserver output.
// If an event ID is not in the database then it is omitted from the map.
2023-02-24 08:40:20 +00:00
func ( s * eventStatements ) BulkSelectUnsentEventNID ( ctx context . Context , txn * sql . Tx , eventIDs [ ] string ) ( map [ string ] types . EventMetadata , error ) {
2022-02-17 13:53:48 +00:00
return s . bulkSelectEventNID ( ctx , txn , eventIDs , true )
}
// bulkSelectEventNIDs returns a map from string event ID to numeric event ID.
// If an event ID is not in the database then it is omitted from the map.
2023-02-24 08:40:20 +00:00
func ( s * eventStatements ) bulkSelectEventNID ( ctx context . Context , txn * sql . Tx , eventIDs [ ] string , onlyUnsent bool ) ( map [ string ] types . EventMetadata , error ) {
2020-02-13 17:27:33 +00:00
///////////////
iEventIDs := make ( [ ] interface { } , len ( eventIDs ) )
for k , v := range eventIDs {
iEventIDs [ k ] = v
}
2022-02-17 13:53:48 +00:00
var selectOrig string
if onlyUnsent {
selectOrig = strings . Replace ( bulkSelectUnsentEventNIDSQL , "($1)" , sqlutil . QueryVariadic ( len ( iEventIDs ) ) , 1 )
} else {
selectOrig = strings . Replace ( bulkSelectEventNIDSQL , "($1)" , sqlutil . QueryVariadic ( len ( iEventIDs ) ) , 1 )
}
2022-03-04 15:05:42 +00:00
selectPrep , err := s . db . Prepare ( selectOrig )
2020-02-13 17:27:33 +00:00
if err != nil {
return nil , err
}
2022-03-04 15:05:42 +00:00
defer selectPrep . Close ( ) // nolint:errcheck
selectStmt := sqlutil . TxStmt ( txn , selectPrep )
2020-02-13 17:27:33 +00:00
///////////////
rows , err := selectStmt . QueryContext ( ctx , iEventIDs ... )
if err != nil {
return nil , err
}
2020-05-21 13:40:13 +00:00
defer internal . CloseAndLogIfError ( ctx , rows , "bulkSelectEventNID: rows.close() failed" )
2023-02-24 08:40:20 +00:00
results := make ( map [ string ] types . EventMetadata , len ( eventIDs ) )
2022-05-09 13:30:32 +00:00
var eventID string
var eventNID int64
2023-02-24 08:40:20 +00:00
var roomNID int64
2020-02-13 17:27:33 +00:00
for rows . Next ( ) {
2023-02-24 08:40:20 +00:00
if err = rows . Scan ( & eventID , & eventNID , & roomNID ) ; err != nil {
2020-02-13 17:27:33 +00:00
return nil , err
}
2023-02-24 08:40:20 +00:00
results [ eventID ] = types . EventMetadata {
EventNID : types . EventNID ( eventNID ) ,
RoomNID : types . RoomNID ( roomNID ) ,
}
2020-02-13 17:27:33 +00:00
}
2023-11-09 07:42:33 +00:00
return results , rows . Err ( )
2020-02-13 17:27:33 +00:00
}
2020-05-26 15:45:28 +00:00
func ( s * eventStatements ) SelectMaxEventDepth ( ctx context . Context , txn * sql . Tx , eventNIDs [ ] types . EventNID ) ( int64 , error ) {
2020-02-13 17:27:33 +00:00
var result int64
2020-03-06 14:31:12 +00:00
iEventIDs := make ( [ ] interface { } , len ( eventNIDs ) )
for i , v := range eventNIDs {
iEventIDs [ i ] = v
}
2020-06-12 13:55:57 +00:00
sqlStr := strings . Replace ( selectMaxEventDepthSQL , "($1)" , sqlutil . QueryVariadic ( len ( iEventIDs ) ) , 1 )
2020-08-19 12:24:54 +00:00
sqlPrep , err := s . db . Prepare ( sqlStr )
if err != nil {
return 0 , err
}
2022-03-24 10:03:22 +00:00
defer internal . CloseAndLogIfError ( ctx , sqlPrep , "sqlPrep.close() failed" )
2020-08-19 12:24:54 +00:00
err = sqlutil . TxStmt ( txn , sqlPrep ) . QueryRowContext ( ctx , iEventIDs ... ) . Scan ( & result )
2020-02-13 17:27:33 +00:00
if err != nil {
2020-08-19 14:38:27 +00:00
return 0 , fmt . Errorf ( "sqlutil.TxStmt.QueryRowContext: %w" , err )
2020-02-13 17:27:33 +00:00
}
return result , nil
}
2020-12-16 10:33:28 +00:00
func ( s * eventStatements ) SelectRoomNIDsForEventNIDs (
2022-02-04 10:39:34 +00:00
ctx context . Context , txn * sql . Tx , eventNIDs [ ] types . EventNID ,
2020-12-16 10:33:28 +00:00
) ( map [ types . EventNID ] types . RoomNID , error ) {
sqlStr := strings . Replace ( selectRoomNIDsForEventNIDsSQL , "($1)" , sqlutil . QueryVariadic ( len ( eventNIDs ) ) , 1 )
sqlPrep , err := s . db . Prepare ( sqlStr )
if err != nil {
return nil , err
}
2022-03-24 10:03:22 +00:00
defer internal . CloseAndLogIfError ( ctx , sqlPrep , "sqlPrep.close() failed" )
2022-03-04 15:05:42 +00:00
sqlStmt := sqlutil . TxStmt ( txn , sqlPrep )
2020-12-16 10:33:28 +00:00
iEventNIDs := make ( [ ] interface { } , len ( eventNIDs ) )
for i , v := range eventNIDs {
iEventNIDs [ i ] = v
}
2022-03-04 15:05:42 +00:00
rows , err := sqlStmt . QueryContext ( ctx , iEventNIDs ... )
2020-12-16 10:33:28 +00:00
if err != nil {
return nil , err
}
defer internal . CloseAndLogIfError ( ctx , rows , "selectRoomNIDsForEventNIDsStmt: rows.close() failed" )
result := make ( map [ types . EventNID ] types . RoomNID )
2022-05-09 13:30:32 +00:00
var eventNID types . EventNID
var roomNID types . RoomNID
2020-12-16 10:33:28 +00:00
for rows . Next ( ) {
if err = rows . Scan ( & eventNID , & roomNID ) ; err != nil {
return nil , err
}
result [ eventNID ] = roomNID
}
2023-11-09 07:42:33 +00:00
return result , rows . Err ( )
2020-03-27 16:28:22 +00:00
}
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm
* Use sqlite3_js driver when running in JS
* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only
* Update to latest go-sqlite-js version
* Replace prometheus with a stub. sigh
* Hard-code a config and don't use opentracing
* Latest go-sqlite3-js version
* Generate a key for now
* Listen for fetch traffic rather than HTTP
* Latest hacks for js
* libp2p support
* More libp2p
* Fork gjson to allow us to enforce auth checks as before
Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.
See https://github.com/tidwall/gjson/issues/157
When it's resolved, let's go back to mainline.
* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157
* Use latest gomatrixserverlib for sig checks
* Fix a bug which could cause exclude_from_sync to not be set
Caused when sending events over federation.
* Use query variadic to make lookups actually work!
* Latest gomatrixserverlib
* Add notes on getting p2p up and running
Partly so I don't forget myself!
* refactor: Move p2p specific stuff to cmd/dendritejs
This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.
Also, clean up main.go to read a bit better.
* Update ho-http-js-libp2p to return errors from RoundTrip
* Add an LRU cache around the key DB
We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.
Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)
* breaking: Add Tracing.Enabled to toggle whether we do opentracing
Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.
* Start adding conditional builds for wasm to handle lib/pq
The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).
* Remove lib/pq for wasm for syncapi
* Add conditional building to remaining storage APIs
* Update build script to set env vars correctly for dendritejs
* sqlite bug fixes
* Docs
* Add a no-op main for dendritejs when not building under wasm
* Use the real prometheus, even for WASM
Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:
```
global.process = {
pid: 1,
};
global.fs.stat = function(path, cb) {
cb({
code: "EINVAL",
});
}
```
* Linting
2020-03-06 10:23:55 +00:00
func eventNIDsAsArray ( eventNIDs [ ] types . EventNID ) string {
2021-08-04 16:08:17 +00:00
if eventNIDs == nil {
eventNIDs = [ ] types . EventNID { } // don't store 'null' in the DB
}
Add peer-to-peer support into Dendrite via libp2p and fetch (#880)
* Use a fork of pq which supports userCurrent on wasm
* Use sqlite3_js driver when running in JS
* Add cmd/dendritejs to pull in sqlite3_js driver for wasm only
* Update to latest go-sqlite-js version
* Replace prometheus with a stub. sigh
* Hard-code a config and don't use opentracing
* Latest go-sqlite3-js version
* Generate a key for now
* Listen for fetch traffic rather than HTTP
* Latest hacks for js
* libp2p support
* More libp2p
* Fork gjson to allow us to enforce auth checks as before
Previously, all events would come down redacted because the hash
checks would fail. They would fail because sjson.DeleteBytes didn't
remove keys not used for hashing. This didn't work because of a build
tag which included a file which no-oped the index returned.
See https://github.com/tidwall/gjson/issues/157
When it's resolved, let's go back to mainline.
* Use gjson@1.6.0 as it fixes https://github.com/tidwall/gjson/issues/157
* Use latest gomatrixserverlib for sig checks
* Fix a bug which could cause exclude_from_sync to not be set
Caused when sending events over federation.
* Use query variadic to make lookups actually work!
* Latest gomatrixserverlib
* Add notes on getting p2p up and running
Partly so I don't forget myself!
* refactor: Move p2p specific stuff to cmd/dendritejs
This is important or else the normal build of dendrite will fail
because the p2p libraries depend on syscall/js which doesn't work
on normal builds.
Also, clean up main.go to read a bit better.
* Update ho-http-js-libp2p to return errors from RoundTrip
* Add an LRU cache around the key DB
We actually need this for P2P because otherwise we can *segfault*
with things like: "runtime: unexpected return pc for runtime.handleEvent"
where the event is a `syscall/js` event, caused by spamming sql.js
caused by "Checking event signatures for 14 events of room state" which
hammers the key DB repeatedly in quick succession.
Using a cache fixes this, though the underlying cause is probably a bug
in the version of Go I'm on (1.13.7)
* breaking: Add Tracing.Enabled to toggle whether we do opentracing
Defaults to false, which is why this is a breaking change. We need
this flag because WASM builds cannot do opentracing.
* Start adding conditional builds for wasm to handle lib/pq
The general idea here is to have the wasm build have a `NewXXXDatabase`
that doesn't import any postgres package and hence we never import
`lib/pq`, which doesn't work under WASM (undefined `userCurrent`).
* Remove lib/pq for wasm for syncapi
* Add conditional building to remaining storage APIs
* Update build script to set env vars correctly for dendritejs
* sqlite bug fixes
* Docs
* Add a no-op main for dendritejs when not building under wasm
* Use the real prometheus, even for WASM
Instead, the dendrite-sw.js must mock out `process.pid` and
`fs.stat` - which must invoke the callback with an error (e.g `EINVAL`)
in order for it to work:
```
global.process = {
pid: 1,
};
global.fs.stat = function(path, cb) {
cb({
code: "EINVAL",
});
}
```
* Linting
2020-03-06 10:23:55 +00:00
b , _ := json . Marshal ( eventNIDs )
return string ( b )
2020-02-13 17:27:33 +00:00
}
2022-08-18 09:37:47 +00:00
func ( s * eventStatements ) SelectEventRejected (
ctx context . Context , txn * sql . Tx , roomNID types . RoomNID , eventID string ,
) ( rejected bool , err error ) {
stmt := sqlutil . TxStmt ( txn , s . selectEventRejectedStmt )
err = stmt . QueryRowContext ( ctx , roomNID , eventID ) . Scan ( & rejected )
return
}