mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Merge branch 'master' into neilalexander/historyvis
This commit is contained in:
commit
9b2b7a6e28
9 changed files with 332 additions and 113 deletions
|
@ -239,33 +239,23 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
|
|||
// Mark this room as processed.
|
||||
processed[roomID] = true
|
||||
|
||||
// Is the caller currently joined to the room or is the room `world_readable`
|
||||
// If no, skip this room. If yes, continue.
|
||||
if !w.roomExists(roomID) || !w.authorised(roomID) {
|
||||
// attempt to query this room over federation, as either we've never heard of it before
|
||||
// or we've left it and hence are not authorised (but info may be exposed regardless)
|
||||
fedRes, err := w.federatedRoomInfo(roomID)
|
||||
// Collect rooms/events to send back (either locally or fetched via federation)
|
||||
var discoveredRooms []gomatrixserverlib.MSC2946Room
|
||||
var discoveredEvents []gomatrixserverlib.MSC2946StrippedEvent
|
||||
|
||||
// If we know about this room and the caller is authorised (joined/world_readable) then pull
|
||||
// events locally
|
||||
if w.roomExists(roomID) && w.authorised(roomID) {
|
||||
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
|
||||
// all `m.space.child` and `m.space.parent` state events which *point to* (via `state_key` or `content.room_id`)
|
||||
// this room. This requires servers to store reverse lookups.
|
||||
events, err := w.references(roomID)
|
||||
if err != nil {
|
||||
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Errorf("failed to query federated spaces")
|
||||
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Error("failed to extract references for room")
|
||||
continue
|
||||
}
|
||||
if fedRes != nil {
|
||||
res = combineResponses(res, *fedRes)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Get all `m.space.child` and `m.space.parent` state events for the room. *In addition*, get
|
||||
// all `m.space.child` and `m.space.parent` state events which *point to* (via `state_key` or `content.room_id`)
|
||||
// this room. This requires servers to store reverse lookups.
|
||||
refs, err := w.references(roomID)
|
||||
if err != nil {
|
||||
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Error("failed to extract references for room")
|
||||
continue
|
||||
}
|
||||
discoveredEvents = events
|
||||
|
||||
// If this room has not ever been in `rooms` (across multiple requests), extract the
|
||||
// `PublicRoomsChunk` for this room.
|
||||
if !w.alreadySent(roomID) && !w.roomIsExcluded(roomID) {
|
||||
pubRoom := w.publicRoomsChunk(roomID)
|
||||
roomType := ""
|
||||
create := w.stateEvent(roomID, gomatrixserverlib.MRoomCreate, "")
|
||||
|
@ -275,12 +265,31 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
|
|||
}
|
||||
|
||||
// Add the total number of events to `PublicRoomsChunk` under `num_refs`. Add `PublicRoomsChunk` to `rooms`.
|
||||
res.Rooms = append(res.Rooms, gomatrixserverlib.MSC2946Room{
|
||||
discoveredRooms = append(discoveredRooms, gomatrixserverlib.MSC2946Room{
|
||||
PublicRoom: *pubRoom,
|
||||
NumRefs: refs.len(),
|
||||
NumRefs: len(discoveredEvents),
|
||||
RoomType: roomType,
|
||||
})
|
||||
w.markSent(roomID)
|
||||
} else {
|
||||
// attempt to query this room over federation, as either we've never heard of it before
|
||||
// or we've left it and hence are not authorised (but info may be exposed regardless)
|
||||
fedRes, err := w.federatedRoomInfo(roomID)
|
||||
if err != nil {
|
||||
util.GetLogger(w.ctx).WithError(err).WithField("room_id", roomID).Errorf("failed to query federated spaces")
|
||||
continue
|
||||
}
|
||||
if fedRes != nil {
|
||||
discoveredRooms = fedRes.Rooms
|
||||
discoveredEvents = fedRes.Events
|
||||
}
|
||||
}
|
||||
|
||||
// If this room has not ever been in `rooms` (across multiple requests), send it now
|
||||
for _, room := range discoveredRooms {
|
||||
if !w.alreadySent(room.RoomID) && !w.roomIsExcluded(room.RoomID) {
|
||||
res.Rooms = append(res.Rooms, room)
|
||||
w.markSent(room.RoomID)
|
||||
}
|
||||
}
|
||||
|
||||
uniqueRooms := make(set)
|
||||
|
@ -288,45 +297,37 @@ func (w *walker) walk() *gomatrixserverlib.MSC2946SpacesResponse {
|
|||
// If this is the root room from the original request, insert all these events into `events` if
|
||||
// they haven't been added before (across multiple requests).
|
||||
if w.rootRoomID == roomID {
|
||||
for _, ev := range refs.events() {
|
||||
if !w.alreadySent(ev.EventID()) {
|
||||
strip := stripped(ev.Event)
|
||||
if strip == nil {
|
||||
continue
|
||||
}
|
||||
res.Events = append(res.Events, *strip)
|
||||
uniqueRooms[ev.RoomID()] = true
|
||||
uniqueRooms[SpaceTarget(ev)] = true
|
||||
w.markSent(ev.EventID())
|
||||
for _, ev := range discoveredEvents {
|
||||
if !w.alreadySent(eventKey(&ev)) {
|
||||
res.Events = append(res.Events, ev)
|
||||
uniqueRooms[ev.RoomID] = true
|
||||
uniqueRooms[spaceTargetStripped(&ev)] = true
|
||||
w.markSent(eventKey(&ev))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Else add them to `events` honouring the `limit` and `max_rooms_per_space` values. If either
|
||||
// are exceeded, stop adding events. If the event has already been added, do not add it again.
|
||||
numAdded := 0
|
||||
for _, ev := range refs.events() {
|
||||
for _, ev := range discoveredEvents {
|
||||
if w.req.Limit > 0 && len(res.Events) >= w.req.Limit {
|
||||
break
|
||||
}
|
||||
if w.req.MaxRoomsPerSpace > 0 && numAdded >= w.req.MaxRoomsPerSpace {
|
||||
break
|
||||
}
|
||||
if w.alreadySent(ev.EventID()) {
|
||||
if w.alreadySent(eventKey(&ev)) {
|
||||
continue
|
||||
}
|
||||
// Skip the room if it's part of exclude_rooms but ONLY IF the source matches, as we still
|
||||
// want to catch arrows which point to excluded rooms.
|
||||
if w.roomIsExcluded(ev.RoomID()) {
|
||||
if w.roomIsExcluded(ev.RoomID) {
|
||||
continue
|
||||
}
|
||||
strip := stripped(ev.Event)
|
||||
if strip == nil {
|
||||
continue
|
||||
}
|
||||
res.Events = append(res.Events, *strip)
|
||||
uniqueRooms[ev.RoomID()] = true
|
||||
uniqueRooms[SpaceTarget(ev)] = true
|
||||
w.markSent(ev.EventID())
|
||||
res.Events = append(res.Events, ev)
|
||||
uniqueRooms[ev.RoomID] = true
|
||||
uniqueRooms[spaceTargetStripped(&ev)] = true
|
||||
w.markSent(eventKey(&ev))
|
||||
// we don't distinguish between child state events and parent state events for the purposes of
|
||||
// max_rooms_per_space, maybe we should?
|
||||
numAdded++
|
||||
|
@ -521,51 +522,27 @@ func (w *walker) authorisedUser(roomID string) bool {
|
|||
}
|
||||
|
||||
// references returns all references pointing to or from this room.
|
||||
func (w *walker) references(roomID string) (eventLookup, error) {
|
||||
func (w *walker) references(roomID string) ([]gomatrixserverlib.MSC2946StrippedEvent, error) {
|
||||
events, err := w.db.References(w.ctx, roomID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
el := make(eventLookup)
|
||||
el := make([]gomatrixserverlib.MSC2946StrippedEvent, 0, len(events))
|
||||
for _, ev := range events {
|
||||
// only return events that have a `via` key as per MSC1772
|
||||
// else we'll incorrectly walk redacted events (as the link
|
||||
// is in the state_key)
|
||||
if gjson.GetBytes(ev.Content(), "via").Exists() {
|
||||
el.set(ev)
|
||||
strip := stripped(ev.Event)
|
||||
if strip == nil {
|
||||
continue
|
||||
}
|
||||
el = append(el, *strip)
|
||||
}
|
||||
}
|
||||
return el, nil
|
||||
}
|
||||
|
||||
// state event lookup across multiple rooms keyed on event type
|
||||
// NOT THREAD SAFE
|
||||
type eventLookup map[string][]*gomatrixserverlib.HeaderedEvent
|
||||
|
||||
func (el eventLookup) set(ev *gomatrixserverlib.HeaderedEvent) {
|
||||
evs := el[ev.Type()]
|
||||
if evs == nil {
|
||||
evs = make([]*gomatrixserverlib.HeaderedEvent, 0)
|
||||
}
|
||||
evs = append(evs, ev)
|
||||
el[ev.Type()] = evs
|
||||
}
|
||||
|
||||
func (el eventLookup) len() int {
|
||||
sum := 0
|
||||
for _, evs := range el {
|
||||
sum += len(evs)
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
func (el eventLookup) events() (events []*gomatrixserverlib.HeaderedEvent) {
|
||||
for _, evs := range el {
|
||||
events = append(events, evs...)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type set map[string]bool
|
||||
|
||||
func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEvent {
|
||||
|
@ -581,27 +558,19 @@ func stripped(ev *gomatrixserverlib.Event) *gomatrixserverlib.MSC2946StrippedEve
|
|||
}
|
||||
}
|
||||
|
||||
func combineResponses(local, remote gomatrixserverlib.MSC2946SpacesResponse) gomatrixserverlib.MSC2946SpacesResponse {
|
||||
knownRooms := make(set)
|
||||
for _, room := range local.Rooms {
|
||||
knownRooms[room.RoomID] = true
|
||||
}
|
||||
knownEvents := make(set)
|
||||
for _, event := range local.Events {
|
||||
knownEvents[event.RoomID+event.Type+event.StateKey] = true
|
||||
}
|
||||
// mux in remote entries if and only if they aren't present already
|
||||
for _, room := range remote.Rooms {
|
||||
if knownRooms[room.RoomID] {
|
||||
continue
|
||||
}
|
||||
local.Rooms = append(local.Rooms, room)
|
||||
}
|
||||
for _, event := range remote.Events {
|
||||
if knownEvents[event.RoomID+event.Type+event.StateKey] {
|
||||
continue
|
||||
}
|
||||
local.Events = append(local.Events, event)
|
||||
}
|
||||
return local
|
||||
func eventKey(event *gomatrixserverlib.MSC2946StrippedEvent) string {
|
||||
return event.RoomID + "|" + event.Type + "|" + event.StateKey
|
||||
}
|
||||
|
||||
func spaceTargetStripped(event *gomatrixserverlib.MSC2946StrippedEvent) string {
|
||||
if event.StateKey == "" {
|
||||
return "" // no-op
|
||||
}
|
||||
switch event.Type {
|
||||
case ConstSpaceParentEventType:
|
||||
return event.StateKey
|
||||
case ConstSpaceChildEventType:
|
||||
return event.StateKey
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
|
111
syncapi/storage/postgres/memberships_table.go
Normal file
111
syncapi/storage/postgres/memberships_table.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// The memberships table is designed to track the last time that
|
||||
// the user was a given state. This allows us to find out the
|
||||
// most recent time that a user was invited to, joined or left
|
||||
// a room, either by choice or otherwise. This is important for
|
||||
// building history visibility.
|
||||
|
||||
const membershipsSchema = `
|
||||
CREATE TABLE IF NOT EXISTS syncapi_memberships (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
user_id TEXT NOT NULL,
|
||||
-- The status of the membership
|
||||
membership TEXT NOT NULL,
|
||||
-- The event ID that last changed the membership
|
||||
event_id TEXT NOT NULL,
|
||||
-- The stream position of the change
|
||||
stream_pos BIGINT NOT NULL,
|
||||
-- The topological position of the change in the room
|
||||
topological_pos BIGINT NOT NULL,
|
||||
-- Unique index
|
||||
CONSTRAINT syncapi_memberships_unique UNIQUE (room_id, user_id, membership)
|
||||
);
|
||||
`
|
||||
|
||||
const upsertMembershipSQL = "" +
|
||||
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||
" ON CONFLICT ON CONSTRAINT syncapi_memberships_unique" +
|
||||
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
|
||||
|
||||
const selectMembershipSQL = "" +
|
||||
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
|
||||
" WHERE room_id = $1 AND user_id = $2 AND membership = ANY($3)" +
|
||||
" ORDER BY stream_pos DESC" +
|
||||
" LIMIT 1"
|
||||
|
||||
type membershipsStatements struct {
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
selectMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||
s := &membershipsStatements{}
|
||||
_, err := db.Exec(membershipsSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectMembershipStmt, err = db.Prepare(selectMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) UpsertMembership(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||
streamPos, topologicalPos types.StreamPosition,
|
||||
) error {
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
return fmt.Errorf("event.Membership: %w", err)
|
||||
}
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
*event.StateKey(),
|
||||
membership,
|
||||
event.EventID(),
|
||||
streamPos,
|
||||
topologicalPos,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) SelectMembership(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
|
||||
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectMembershipStmt)
|
||||
err = stmt.QueryRowContext(ctx, roomID, userID, memberships).Scan(&eventID, &streamPos, &topologyPos)
|
||||
return
|
||||
}
|
|
@ -44,7 +44,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync
|
|||
const insertEventInTopologySQL = "" +
|
||||
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
|
||||
" VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1"
|
||||
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
|
||||
" RETURNING topological_position"
|
||||
|
||||
const selectEventIDsInRangeASCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
|
@ -115,10 +116,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
|||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||
) (err error) {
|
||||
_, err = s.insertEventInTopologyStmt.ExecContext(
|
||||
) (topoPos types.StreamPosition, err error) {
|
||||
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||
)
|
||||
).Scan(&topoPos)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -87,6 +87,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
memberships, err := NewPostgresMembershipsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
|
@ -106,6 +110,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
Filter: filter,
|
||||
SendToDevice: sendToDevice,
|
||||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ type Database struct {
|
|||
SendToDevice tables.SendToDevice
|
||||
Filter tables.Filter
|
||||
Receipts tables.Receipts
|
||||
Memberships tables.Memberships
|
||||
}
|
||||
|
||||
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
|
||||
|
@ -383,8 +384,8 @@ func (d *Database) WriteEvent(
|
|||
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
||||
}
|
||||
pduPosition = pos
|
||||
|
||||
if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
||||
var topoPosition types.StreamPosition
|
||||
if topoPosition, err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
||||
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
|
||||
}
|
||||
|
||||
|
@ -397,7 +398,7 @@ func (d *Database) WriteEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
|
||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
|
||||
})
|
||||
|
||||
return pduPosition, returnErr
|
||||
|
@ -409,6 +410,7 @@ func (d *Database) updateRoomState(
|
|||
removedEventIDs []string,
|
||||
addedEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
pduPosition types.StreamPosition,
|
||||
topoPosition types.StreamPosition,
|
||||
) error {
|
||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||
for _, eventID := range removedEventIDs {
|
||||
|
@ -429,6 +431,9 @@ func (d *Database) updateRoomState(
|
|||
return fmt.Errorf("event.Membership: %w", err)
|
||||
}
|
||||
membership = &value
|
||||
if err = d.Memberships.UpsertMembership(ctx, txn, event, pduPosition, topoPosition); err != nil {
|
||||
return fmt.Errorf("d.Memberships.UpsertMembership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
|
||||
|
|
119
syncapi/storage/sqlite3/memberships_table.go
Normal file
119
syncapi/storage/sqlite3/memberships_table.go
Normal file
|
@ -0,0 +1,119 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// The memberships table is designed to track the last time that
|
||||
// the user was a given state. This allows us to find out the
|
||||
// most recent time that a user was invited to, joined or left
|
||||
// a room, either by choice or otherwise. This is important for
|
||||
// building history visibility.
|
||||
|
||||
const membershipsSchema = `
|
||||
CREATE TABLE IF NOT EXISTS syncapi_memberships (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
user_id TEXT NOT NULL,
|
||||
-- The status of the membership
|
||||
membership TEXT NOT NULL,
|
||||
-- The event ID that last changed the membership
|
||||
event_id TEXT NOT NULL,
|
||||
-- The stream position of the change
|
||||
stream_pos BIGINT NOT NULL,
|
||||
-- The topological position of the change in the room
|
||||
topological_pos BIGINT NOT NULL,
|
||||
-- Unique index
|
||||
UNIQUE (room_id, user_id, membership)
|
||||
);
|
||||
`
|
||||
|
||||
const upsertMembershipSQL = "" +
|
||||
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||
" ON CONFLICT (room_id, user_id, membership)" +
|
||||
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
|
||||
|
||||
const selectMembershipSQL = "" +
|
||||
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
|
||||
" WHERE room_id = $1 AND user_id = $2 AND membership IN ($3)" +
|
||||
" ORDER BY stream_pos DESC" +
|
||||
" LIMIT 1"
|
||||
|
||||
type membershipsStatements struct {
|
||||
db *sql.DB
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||
s := &membershipsStatements{
|
||||
db: db,
|
||||
}
|
||||
_, err := db.Exec(membershipsSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) UpsertMembership(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||
streamPos, topologicalPos types.StreamPosition,
|
||||
) error {
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
return fmt.Errorf("event.Membership: %w", err)
|
||||
}
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
*event.StateKey(),
|
||||
membership,
|
||||
event.EventID(),
|
||||
streamPos,
|
||||
topologicalPos,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) SelectMembership(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
|
||||
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
|
||||
params := []interface{}{roomID, userID}
|
||||
for _, membership := range memberships {
|
||||
params = append(params, membership)
|
||||
}
|
||||
orig := strings.Replace(selectMembershipSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
|
||||
stmt, err := s.db.Prepare(orig)
|
||||
if err != nil {
|
||||
return "", 0, 0, err
|
||||
}
|
||||
err = sqlutil.TxStmt(txn, stmt).QueryRowContext(ctx, params...).Scan(&eventID, &streamPos, &topologyPos)
|
||||
return
|
||||
}
|
|
@ -111,12 +111,11 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
|||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||
) (err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt)
|
||||
_, err = stmt.ExecContext(
|
||||
) (types.StreamPosition, error) {
|
||||
_, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||
)
|
||||
return
|
||||
return types.StreamPosition(event.Depth()), err
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||
|
|
|
@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
memberships, err := NewSqliteMembershipsTable(d.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
|
@ -119,6 +123,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
Filter: filter,
|
||||
SendToDevice: sendToDevice,
|
||||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ type Events interface {
|
|||
type Topology interface {
|
||||
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
||||
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
|
||||
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
||||
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
||||
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
||||
|
@ -163,3 +163,8 @@ type Receipts interface {
|
|||
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
|
||||
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
}
|
||||
|
||||
type Memberships interface {
|
||||
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue