mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-31 05:12:46 +00:00
Implement history visibility on /messages
, /context
, /sync
(#2511)
* Add possibility to set history_visibility and user AccountType * Add new DB queries * Add actual history_visibility changes for /messages * Add passing tests * Extract check function * Cleanup * Cleanup * Fix build on 386 * Move ApplyHistoryVisibilityFilter to internal * Move queries to topology table * Add filtering to /sync and /context Some cleanup * Add passing tests; Remove failing tests :( * Re-add passing tests * Move filtering to own function to avoid duplication * Re-add passing test * Use newly added GMSL HistoryVisibility * Update gomatrixserverlib * Set the visibility when creating events * Default to shared history visibility * Remove unused query * Update history visibility checks to use gmsl Update tests * Remove unused statement * Update migrations to set "correct" history visibility * Add method to fetch the membership at a given event * Tweaks and logging * Use actual internal rsAPI, default to shared visibility in tests * Revert "Move queries to topology table" This reverts commit 4f0d41be9c194a46379796435ce73e79203edbd6. * Remove noise/unneeded code * More cleanup * Try to optimize database requests * Fix imports * PR peview fixes/changes * Move setting history visibility to own migration, be more restrictive * Fix unit tests * Lint * Fix missing entries * Tweaks for incremental syncs * Adapt generic changes Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com> Co-authored-by: kegsay <kegan@matrix.org>
This commit is contained in:
parent
371336c6b5
commit
05cafbd197
31 changed files with 1043 additions and 224 deletions
|
@ -161,6 +161,10 @@ type Database interface {
|
|||
|
||||
IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error)
|
||||
UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error
|
||||
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
|
||||
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
|
||||
// string as the membership.
|
||||
SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
||||
}
|
||||
|
||||
type Presence interface {
|
||||
|
|
|
@ -17,7 +17,10 @@ package deltas
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
|
||||
|
@ -31,6 +34,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T
|
|||
return nil
|
||||
}
|
||||
|
||||
// UpSetHistoryVisibility sets the history visibility for already stored events.
|
||||
// Requires current_room_state and output_room_events to be created.
|
||||
func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error {
|
||||
// get the current room history visibilities
|
||||
historyVisibilities, err := currentHistoryVisibilities(ctx, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update the history visibility
|
||||
for roomID, hisVis := range historyVisibilities {
|
||||
_, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1
|
||||
WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update history visibility: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
ALTER TABLE syncapi_current_room_state ADD COLUMN IF NOT EXISTS history_visibility SMALLINT NOT NULL DEFAULT 2;
|
||||
|
@ -39,9 +63,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// currentHistoryVisibilities returns a map from roomID to current history visibility.
|
||||
// If the history visibility was changed after room creation, defaults to joined.
|
||||
func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) {
|
||||
rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state
|
||||
WHERE type = 'm.room.history_visibility' AND state_key = '';
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query current room state: %w", err)
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
var eventBytes []byte
|
||||
var roomID string
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
var hisVis gomatrixserverlib.HistoryVisibility
|
||||
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&roomID, &eventBytes); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan row: %w", err)
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &event); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal event: %w", err)
|
||||
}
|
||||
historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined
|
||||
if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 {
|
||||
historyVisibilities[roomID] = hisVis
|
||||
}
|
||||
}
|
||||
return historyVisibilities, nil
|
||||
}
|
||||
|
||||
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
|
||||
_, err := tx.ExecContext(ctx, `
|
||||
ALTER TABLE syncapi_output_room_events DROP COLUMN IF EXISTS history_visibility;
|
||||
|
|
|
@ -66,10 +66,14 @@ const selectMembershipCountSQL = "" +
|
|||
const selectHeroesSQL = "" +
|
||||
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"
|
||||
|
||||
const selectMembershipBeforeSQL = "" +
|
||||
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
|
||||
|
||||
type membershipsStatements struct {
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
selectMembershipCountStmt *sql.Stmt
|
||||
selectHeroesStmt *sql.Stmt
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
selectMembershipCountStmt *sql.Stmt
|
||||
selectHeroesStmt *sql.Stmt
|
||||
selectMembershipForUserStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||
|
@ -82,6 +86,7 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
|||
{&s.upsertMembershipStmt, upsertMembershipSQL},
|
||||
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
|
||||
{&s.selectHeroesStmt, selectHeroesSQL},
|
||||
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
||||
|
@ -132,3 +137,20 @@ func (s *membershipsStatements) SelectHeroes(
|
|||
}
|
||||
return heroes, rows.Err()
|
||||
}
|
||||
|
||||
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
|
||||
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
|
||||
// string as the membership.
|
||||
func (s *membershipsStatements) SelectMembershipForUser(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64,
|
||||
) (membership string, topologyPos int, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt)
|
||||
err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "leave", 0, nil
|
||||
}
|
||||
return "", 0, err
|
||||
}
|
||||
return membership, topologyPos, nil
|
||||
}
|
||||
|
|
|
@ -191,10 +191,12 @@ func NewPostgresEventsTable(db *sql.DB) (tables.Events, error) {
|
|||
}
|
||||
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "syncapi: add history visibility column (output_room_events)",
|
||||
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
|
||||
})
|
||||
m.AddMigrations(
|
||||
sqlutil.Migration{
|
||||
Version: "syncapi: add history visibility column (output_room_events)",
|
||||
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
|
||||
},
|
||||
)
|
||||
err = m.Up(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/postgres/deltas"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
)
|
||||
|
||||
|
@ -97,6 +98,20 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// apply migrations which need multiple tables
|
||||
m := sqlutil.NewMigrator(d.db)
|
||||
m.AddMigrations(
|
||||
sqlutil.Migration{
|
||||
Version: "syncapi: set history visibility for existing events",
|
||||
Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
|
||||
},
|
||||
)
|
||||
err = m.Up(base.Context())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
|
@ -231,7 +231,7 @@ func (d *Database) AddPeek(
|
|||
return
|
||||
}
|
||||
|
||||
// DeletePeeks tracks the fact that a user has stopped peeking from the specified
|
||||
// DeletePeek tracks the fact that a user has stopped peeking from the specified
|
||||
// device. If the peeks was successfully deleted this returns the stream ID it was
|
||||
// stored at. Returns an error if there was a problem communicating with the database.
|
||||
func (d *Database) DeletePeek(
|
||||
|
@ -372,6 +372,7 @@ func (d *Database) WriteEvent(
|
|||
) (pduPosition types.StreamPosition, returnErr error) {
|
||||
returnErr = d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
|
||||
var err error
|
||||
ev.Visibility = historyVisibility
|
||||
pos, err := d.OutputEvents.InsertEvent(
|
||||
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync, historyVisibility,
|
||||
)
|
||||
|
@ -563,7 +564,7 @@ func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, reda
|
|||
return err
|
||||
}
|
||||
|
||||
// Retrieve the backward topology position, i.e. the position of the
|
||||
// GetBackwardTopologyPos retrieves the backward topology position, i.e. the position of the
|
||||
// oldest event in the room's topology.
|
||||
func (d *Database) GetBackwardTopologyPos(
|
||||
ctx context.Context,
|
||||
|
@ -674,7 +675,7 @@ func (d *Database) fetchMissingStateEvents(
|
|||
return events, nil
|
||||
}
|
||||
|
||||
// getStateDeltas returns the state deltas between fromPos and toPos,
|
||||
// GetStateDeltas returns the state deltas between fromPos and toPos,
|
||||
// exclusive of oldPos, inclusive of newPos, for the rooms in which
|
||||
// the user has new membership events.
|
||||
// A list of joined room IDs is also returned in case the caller needs it.
|
||||
|
@ -812,7 +813,7 @@ func (d *Database) GetStateDeltas(
|
|||
return deltas, joinedRoomIDs, nil
|
||||
}
|
||||
|
||||
// getStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
|
||||
// GetStateDeltasForFullStateSync is a variant of getStateDeltas used for /sync
|
||||
// requests with full_state=true.
|
||||
// Fetches full state for all joined rooms and uses selectStateInRange to get
|
||||
// updates for other rooms.
|
||||
|
@ -1039,37 +1040,41 @@ func (d *Database) GetUserUnreadNotificationCounts(ctx context.Context, userID s
|
|||
return d.NotificationData.SelectUserUnreadCounts(ctx, userID, from, to)
|
||||
}
|
||||
|
||||
func (s *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
|
||||
return s.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
|
||||
func (d *Database) SelectContextEvent(ctx context.Context, roomID, eventID string) (int, gomatrixserverlib.HeaderedEvent, error) {
|
||||
return d.OutputEvents.SelectContextEvent(ctx, nil, roomID, eventID)
|
||||
}
|
||||
|
||||
func (s *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
return s.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
|
||||
func (d *Database) SelectContextBeforeEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
return d.OutputEvents.SelectContextBeforeEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
func (s *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
|
||||
return s.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||
func (d *Database) SelectContextAfterEvent(ctx context.Context, id int, roomID string, filter *gomatrixserverlib.RoomEventFilter) (int, []*gomatrixserverlib.HeaderedEvent, error) {
|
||||
return d.OutputEvents.SelectContextAfterEvent(ctx, nil, id, roomID, filter)
|
||||
}
|
||||
|
||||
func (s *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
|
||||
return s.Ignores.SelectIgnores(ctx, userID)
|
||||
func (d *Database) IgnoresForUser(ctx context.Context, userID string) (*types.IgnoredUsers, error) {
|
||||
return d.Ignores.SelectIgnores(ctx, userID)
|
||||
}
|
||||
|
||||
func (s *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
|
||||
return s.Ignores.UpsertIgnores(ctx, userID, ignores)
|
||||
func (d *Database) UpdateIgnoresForUser(ctx context.Context, userID string, ignores *types.IgnoredUsers) error {
|
||||
return d.Ignores.UpsertIgnores(ctx, userID, ignores)
|
||||
}
|
||||
|
||||
func (s *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return s.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
func (d *Database) UpdatePresence(ctx context.Context, userID string, presence types.Presence, statusMsg *string, lastActiveTS gomatrixserverlib.Timestamp, fromSync bool) (types.StreamPosition, error) {
|
||||
return d.Presence.UpsertPresence(ctx, nil, userID, statusMsg, presence, lastActiveTS, fromSync)
|
||||
}
|
||||
|
||||
func (s *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||
return s.Presence.GetPresenceForUser(ctx, nil, userID)
|
||||
func (d *Database) GetPresence(ctx context.Context, userID string) (*types.PresenceInternal, error) {
|
||||
return d.Presence.GetPresenceForUser(ctx, nil, userID)
|
||||
}
|
||||
|
||||
func (s *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
||||
return s.Presence.GetPresenceAfter(ctx, nil, after, filter)
|
||||
func (d *Database) PresenceAfter(ctx context.Context, after types.StreamPosition, filter gomatrixserverlib.EventFilter) (map[string]*types.PresenceInternal, error) {
|
||||
return d.Presence.GetPresenceAfter(ctx, nil, after, filter)
|
||||
}
|
||||
|
||||
func (s *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
||||
return s.Presence.GetMaxPresenceID(ctx, nil)
|
||||
func (d *Database) MaxStreamPositionForPresence(ctx context.Context) (types.StreamPosition, error) {
|
||||
return d.Presence.GetMaxPresenceID(ctx, nil)
|
||||
}
|
||||
|
||||
func (d *Database) SelectMembershipForUser(ctx context.Context, roomID, userID string, pos int64) (membership string, topologicalPos int, err error) {
|
||||
return d.Memberships.SelectMembershipForUser(ctx, nil, roomID, userID, pos)
|
||||
}
|
||||
|
|
|
@ -17,7 +17,10 @@ package deltas
|
|||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.Tx) error {
|
||||
|
@ -37,6 +40,27 @@ func UpAddHistoryVisibilityColumnOutputRoomEvents(ctx context.Context, tx *sql.T
|
|||
return nil
|
||||
}
|
||||
|
||||
// UpSetHistoryVisibility sets the history visibility for already stored events.
|
||||
// Requires current_room_state and output_room_events to be created.
|
||||
func UpSetHistoryVisibility(ctx context.Context, tx *sql.Tx) error {
|
||||
// get the current room history visibilities
|
||||
historyVisibilities, err := currentHistoryVisibilities(ctx, tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// update the history visibility
|
||||
for roomID, hisVis := range historyVisibilities {
|
||||
_, err = tx.ExecContext(ctx, `UPDATE syncapi_output_room_events SET history_visibility = $1
|
||||
WHERE type IN ('m.room.message', 'm.room.encrypted') AND room_id = $2 AND history_visibility <> $1`, hisVis, roomID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update history visibility: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.Tx) error {
|
||||
// SQLite doesn't have "if exists", so check if the column exists. If the query doesn't return an error, it already exists.
|
||||
// Required for unit tests, as otherwise a duplicate column error will show up.
|
||||
|
@ -51,9 +75,40 @@ func UpAddHistoryVisibilityColumnCurrentRoomState(ctx context.Context, tx *sql.T
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to execute upgrade: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// currentHistoryVisibilities returns a map from roomID to current history visibility.
|
||||
// If the history visibility was changed after room creation, defaults to joined.
|
||||
func currentHistoryVisibilities(ctx context.Context, tx *sql.Tx) (map[string]gomatrixserverlib.HistoryVisibility, error) {
|
||||
rows, err := tx.QueryContext(ctx, `SELECT DISTINCT room_id, headered_event_json FROM syncapi_current_room_state
|
||||
WHERE type = 'm.room.history_visibility' AND state_key = '';
|
||||
`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query current room state: %w", err)
|
||||
}
|
||||
defer rows.Close() // nolint: errcheck
|
||||
var eventBytes []byte
|
||||
var roomID string
|
||||
var event gomatrixserverlib.HeaderedEvent
|
||||
var hisVis gomatrixserverlib.HistoryVisibility
|
||||
historyVisibilities := make(map[string]gomatrixserverlib.HistoryVisibility)
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(&roomID, &eventBytes); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan row: %w", err)
|
||||
}
|
||||
if err = json.Unmarshal(eventBytes, &event); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal event: %w", err)
|
||||
}
|
||||
historyVisibilities[roomID] = gomatrixserverlib.HistoryVisibilityJoined
|
||||
if hisVis, err = event.HistoryVisibility(); err == nil && event.Depth() < 10 {
|
||||
historyVisibilities[roomID] = hisVis
|
||||
}
|
||||
}
|
||||
return historyVisibilities, nil
|
||||
}
|
||||
|
||||
func DownAddHistoryVisibilityColumn(ctx context.Context, tx *sql.Tx) error {
|
||||
// SQLite doesn't have "if exists", so check if the column exists.
|
||||
_, err := tx.QueryContext(ctx, "SELECT history_visibility FROM syncapi_output_room_events LIMIT 1")
|
||||
|
|
|
@ -66,11 +66,15 @@ const selectMembershipCountSQL = "" +
|
|||
const selectHeroesSQL = "" +
|
||||
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5"
|
||||
|
||||
const selectMembershipBeforeSQL = "" +
|
||||
"SELECT membership, topological_pos FROM syncapi_memberships WHERE room_id = $1 and user_id = $2 AND topological_pos <= $3 ORDER BY topological_pos DESC LIMIT 1"
|
||||
|
||||
type membershipsStatements struct {
|
||||
db *sql.DB
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
selectMembershipCountStmt *sql.Stmt
|
||||
//selectHeroesStmt *sql.Stmt - prepared at runtime due to variadic
|
||||
selectMembershipForUserStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||
|
@ -84,6 +88,7 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
|||
return s, sqlutil.StatementList{
|
||||
{&s.upsertMembershipStmt, upsertMembershipSQL},
|
||||
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
|
||||
{&s.selectMembershipForUserStmt, selectMembershipBeforeSQL},
|
||||
// {&s.selectHeroesStmt, selectHeroesSQL}, - prepared at runtime due to variadic
|
||||
}.Prepare(db)
|
||||
}
|
||||
|
@ -148,3 +153,20 @@ func (s *membershipsStatements) SelectHeroes(
|
|||
}
|
||||
return heroes, rows.Err()
|
||||
}
|
||||
|
||||
// SelectMembershipForUser returns the membership of the user before and including the given position. If no membership can be found
|
||||
// returns "leave", the topological position and no error. If an error occurs, other than sql.ErrNoRows, returns that and an empty
|
||||
// string as the membership.
|
||||
func (s *membershipsStatements) SelectMembershipForUser(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64,
|
||||
) (membership string, topologyPos int, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectMembershipForUserStmt)
|
||||
err = stmt.QueryRowContext(ctx, roomID, userID, pos).Scan(&membership, &topologyPos)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return "leave", 0, nil
|
||||
}
|
||||
return "", 0, err
|
||||
}
|
||||
return membership, topologyPos, nil
|
||||
}
|
||||
|
|
|
@ -139,10 +139,12 @@ func NewSqliteEventsTable(db *sql.DB, streamID *StreamIDStatements) (tables.Even
|
|||
}
|
||||
|
||||
m := sqlutil.NewMigrator(db)
|
||||
m.AddMigrations(sqlutil.Migration{
|
||||
Version: "syncapi: add history visibility column (output_room_events)",
|
||||
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
|
||||
})
|
||||
m.AddMigrations(
|
||||
sqlutil.Migration{
|
||||
Version: "syncapi: add history visibility column (output_room_events)",
|
||||
Up: deltas.UpAddHistoryVisibilityColumnOutputRoomEvents,
|
||||
},
|
||||
)
|
||||
err = m.Up(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -16,12 +16,14 @@
|
|||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/setup/base"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/shared"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/sqlite3/deltas"
|
||||
)
|
||||
|
||||
// SyncServerDatasource represents a sync server datasource which manages
|
||||
|
@ -41,13 +43,13 @@ func NewDatabase(base *base.BaseDendrite, dbProperties *config.DatabaseOptions)
|
|||
if d.db, d.writer, err = base.DatabaseConnection(dbProperties, sqlutil.NewExclusiveWriter()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = d.prepare(); err != nil {
|
||||
if err = d.prepare(base.Context()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
|
||||
func (d *SyncServerDatasource) prepare() (err error) {
|
||||
func (d *SyncServerDatasource) prepare(ctx context.Context) (err error) {
|
||||
if err = d.streamID.Prepare(d.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -107,6 +109,19 @@ func (d *SyncServerDatasource) prepare() (err error) {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// apply migrations which need multiple tables
|
||||
m := sqlutil.NewMigrator(d.db)
|
||||
m.AddMigrations(
|
||||
sqlutil.Migration{
|
||||
Version: "syncapi: set history visibility for existing events",
|
||||
Up: deltas.UpSetHistoryVisibility, // Requires current_room_state and output_room_events to be created.
|
||||
},
|
||||
)
|
||||
err = m.Up(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Database = shared.Database{
|
||||
DB: d.db,
|
||||
Writer: d.writer,
|
||||
|
|
|
@ -12,20 +12,22 @@ import (
|
|||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/dendrite/test"
|
||||
"github.com/matrix-org/dendrite/test/testrig"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func()) {
|
||||
func MustCreateDatabase(t *testing.T, dbType test.DBType) (storage.Database, func(), func()) {
|
||||
connStr, close := test.PrepareDBConnectionString(t, dbType)
|
||||
db, err := storage.NewSyncServerDatasource(nil, &config.DatabaseOptions{
|
||||
base, closeBase := testrig.CreateBaseDendrite(t, dbType)
|
||||
db, err := storage.NewSyncServerDatasource(base, &config.DatabaseOptions{
|
||||
ConnectionString: config.DataSource(connStr),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("NewSyncServerDatasource returned %s", err)
|
||||
}
|
||||
return db, close
|
||||
return db, close, closeBase
|
||||
}
|
||||
|
||||
func MustWriteEvents(t *testing.T, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (positions []types.StreamPosition) {
|
||||
|
@ -51,8 +53,9 @@ func TestWriteEvents(t *testing.T) {
|
|||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
alice := test.NewUser(t)
|
||||
r := test.NewRoom(t, alice)
|
||||
db, close := MustCreateDatabase(t, dbType)
|
||||
db, close, closeBase := MustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
defer closeBase()
|
||||
MustWriteEvents(t, db, r.Events())
|
||||
})
|
||||
}
|
||||
|
@ -60,8 +63,9 @@ func TestWriteEvents(t *testing.T) {
|
|||
// These tests assert basic functionality of RecentEvents for PDUs
|
||||
func TestRecentEventsPDU(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, close := MustCreateDatabase(t, dbType)
|
||||
db, close, closeBase := MustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
defer closeBase()
|
||||
alice := test.NewUser(t)
|
||||
// dummy room to make sure SQL queries are filtering on room ID
|
||||
MustWriteEvents(t, db, test.NewRoom(t, alice).Events())
|
||||
|
@ -163,8 +167,9 @@ func TestRecentEventsPDU(t *testing.T) {
|
|||
// The purpose of this test is to ensure that backfill does indeed go backwards, using a topology token
|
||||
func TestGetEventsInRangeWithTopologyToken(t *testing.T) {
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, close := MustCreateDatabase(t, dbType)
|
||||
db, close, closeBase := MustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
defer closeBase()
|
||||
alice := test.NewUser(t)
|
||||
r := test.NewRoom(t, alice)
|
||||
for i := 0; i < 10; i++ {
|
||||
|
@ -404,8 +409,9 @@ func TestSendToDeviceBehaviour(t *testing.T) {
|
|||
bob := test.NewUser(t)
|
||||
deviceID := "one"
|
||||
test.WithAllDatabases(t, func(t *testing.T, dbType test.DBType) {
|
||||
db, close := MustCreateDatabase(t, dbType)
|
||||
db, close, closeBase := MustCreateDatabase(t, dbType)
|
||||
defer close()
|
||||
defer closeBase()
|
||||
// At this point there should be no messages. We haven't sent anything
|
||||
// yet.
|
||||
_, events, err := db.SendToDeviceUpdatesForSync(ctx, alice.ID, deviceID, 0, 100)
|
||||
|
|
|
@ -185,6 +185,7 @@ type Memberships interface {
|
|||
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
|
||||
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
|
||||
SelectMembershipForUser(ctx context.Context, txn *sql.Tx, roomID, userID string, pos int64) (membership string, topologicalPos int, err error)
|
||||
}
|
||||
|
||||
type NotificationData interface {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue