mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Add sync API memberships table (#1726)
This commit is contained in:
parent
c08e38df2c
commit
8fe51019ad
8 changed files with 262 additions and 12 deletions
111
syncapi/storage/postgres/memberships_table.go
Normal file
111
syncapi/storage/postgres/memberships_table.go
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package postgres
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The memberships table is designed to track the last time that
|
||||||
|
// the user was a given state. This allows us to find out the
|
||||||
|
// most recent time that a user was invited to, joined or left
|
||||||
|
// a room, either by choice or otherwise. This is important for
|
||||||
|
// building history visibility.
|
||||||
|
|
||||||
|
const membershipsSchema = `
|
||||||
|
CREATE TABLE IF NOT EXISTS syncapi_memberships (
|
||||||
|
-- The 'room_id' key for the state event.
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
-- The state event ID
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
-- The status of the membership
|
||||||
|
membership TEXT NOT NULL,
|
||||||
|
-- The event ID that last changed the membership
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
-- The stream position of the change
|
||||||
|
stream_pos BIGINT NOT NULL,
|
||||||
|
-- The topological position of the change in the room
|
||||||
|
topological_pos BIGINT NOT NULL,
|
||||||
|
-- Unique index
|
||||||
|
CONSTRAINT syncapi_memberships_unique UNIQUE (room_id, user_id, membership)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const upsertMembershipSQL = "" +
|
||||||
|
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
|
||||||
|
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||||
|
" ON CONFLICT ON CONSTRAINT syncapi_memberships_unique" +
|
||||||
|
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
|
||||||
|
|
||||||
|
const selectMembershipSQL = "" +
|
||||||
|
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
|
||||||
|
" WHERE room_id = $1 AND user_id = $2 AND membership = ANY($3)" +
|
||||||
|
" ORDER BY stream_pos DESC" +
|
||||||
|
" LIMIT 1"
|
||||||
|
|
||||||
|
type membershipsStatements struct {
|
||||||
|
upsertMembershipStmt *sql.Stmt
|
||||||
|
selectMembershipStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
|
s := &membershipsStatements{}
|
||||||
|
_, err := db.Exec(membershipsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if s.selectMembershipStmt, err = db.Prepare(selectMembershipSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) UpsertMembership(
|
||||||
|
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||||
|
streamPos, topologicalPos types.StreamPosition,
|
||||||
|
) error {
|
||||||
|
membership, err := event.Membership()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("event.Membership: %w", err)
|
||||||
|
}
|
||||||
|
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
|
||||||
|
ctx,
|
||||||
|
event.RoomID(),
|
||||||
|
*event.StateKey(),
|
||||||
|
membership,
|
||||||
|
event.EventID(),
|
||||||
|
streamPos,
|
||||||
|
topologicalPos,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) SelectMembership(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
|
||||||
|
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
|
||||||
|
stmt := sqlutil.TxStmt(txn, s.selectMembershipStmt)
|
||||||
|
err = stmt.QueryRowContext(ctx, roomID, userID, memberships).Scan(&eventID, &streamPos, &topologyPos)
|
||||||
|
return
|
||||||
|
}
|
|
@ -44,7 +44,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync
|
||||||
const insertEventInTopologySQL = "" +
|
const insertEventInTopologySQL = "" +
|
||||||
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
|
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
|
||||||
" VALUES ($1, $2, $3, $4)" +
|
" VALUES ($1, $2, $3, $4)" +
|
||||||
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1"
|
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
|
||||||
|
" RETURNING topological_position"
|
||||||
|
|
||||||
const selectEventIDsInRangeASCSQL = "" +
|
const selectEventIDsInRangeASCSQL = "" +
|
||||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||||
|
@ -115,10 +116,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
// on the event's depth.
|
// on the event's depth.
|
||||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||||
) (err error) {
|
) (topoPos types.StreamPosition, err error) {
|
||||||
_, err = s.insertEventInTopologyStmt.ExecContext(
|
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
|
||||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||||
)
|
).Scan(&topoPos)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,6 +87,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
memberships, err := NewPostgresMembershipsTable(d.db)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
m := sqlutil.NewMigrations()
|
m := sqlutil.NewMigrations()
|
||||||
deltas.LoadFixSequences(m)
|
deltas.LoadFixSequences(m)
|
||||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||||
|
@ -106,6 +110,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
||||||
Filter: filter,
|
Filter: filter,
|
||||||
SendToDevice: sendToDevice,
|
SendToDevice: sendToDevice,
|
||||||
Receipts: receipts,
|
Receipts: receipts,
|
||||||
|
Memberships: memberships,
|
||||||
}
|
}
|
||||||
return &d, nil
|
return &d, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,6 +48,7 @@ type Database struct {
|
||||||
SendToDevice tables.SendToDevice
|
SendToDevice tables.SendToDevice
|
||||||
Filter tables.Filter
|
Filter tables.Filter
|
||||||
Receipts tables.Receipts
|
Receipts tables.Receipts
|
||||||
|
Memberships tables.Memberships
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
|
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
|
||||||
|
@ -383,8 +384,8 @@ func (d *Database) WriteEvent(
|
||||||
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
||||||
}
|
}
|
||||||
pduPosition = pos
|
pduPosition = pos
|
||||||
|
var topoPosition types.StreamPosition
|
||||||
if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
if topoPosition, err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
||||||
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
|
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,7 +398,7 @@ func (d *Database) WriteEvent(
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
|
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
|
||||||
})
|
})
|
||||||
|
|
||||||
return pduPosition, returnErr
|
return pduPosition, returnErr
|
||||||
|
@ -409,6 +410,7 @@ func (d *Database) updateRoomState(
|
||||||
removedEventIDs []string,
|
removedEventIDs []string,
|
||||||
addedEvents []*gomatrixserverlib.HeaderedEvent,
|
addedEvents []*gomatrixserverlib.HeaderedEvent,
|
||||||
pduPosition types.StreamPosition,
|
pduPosition types.StreamPosition,
|
||||||
|
topoPosition types.StreamPosition,
|
||||||
) error {
|
) error {
|
||||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||||
for _, eventID := range removedEventIDs {
|
for _, eventID := range removedEventIDs {
|
||||||
|
@ -429,6 +431,9 @@ func (d *Database) updateRoomState(
|
||||||
return fmt.Errorf("event.Membership: %w", err)
|
return fmt.Errorf("event.Membership: %w", err)
|
||||||
}
|
}
|
||||||
membership = &value
|
membership = &value
|
||||||
|
if err = d.Memberships.UpsertMembership(ctx, txn, event, pduPosition, topoPosition); err != nil {
|
||||||
|
return fmt.Errorf("d.Memberships.UpsertMembership: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
|
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
|
||||||
|
|
119
syncapi/storage/sqlite3/memberships_table.go
Normal file
119
syncapi/storage/sqlite3/memberships_table.go
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||||
|
"github.com/matrix-org/dendrite/syncapi/types"
|
||||||
|
"github.com/matrix-org/gomatrixserverlib"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The memberships table is designed to track the last time that
|
||||||
|
// the user was a given state. This allows us to find out the
|
||||||
|
// most recent time that a user was invited to, joined or left
|
||||||
|
// a room, either by choice or otherwise. This is important for
|
||||||
|
// building history visibility.
|
||||||
|
|
||||||
|
const membershipsSchema = `
|
||||||
|
CREATE TABLE IF NOT EXISTS syncapi_memberships (
|
||||||
|
-- The 'room_id' key for the state event.
|
||||||
|
room_id TEXT NOT NULL,
|
||||||
|
-- The state event ID
|
||||||
|
user_id TEXT NOT NULL,
|
||||||
|
-- The status of the membership
|
||||||
|
membership TEXT NOT NULL,
|
||||||
|
-- The event ID that last changed the membership
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
-- The stream position of the change
|
||||||
|
stream_pos BIGINT NOT NULL,
|
||||||
|
-- The topological position of the change in the room
|
||||||
|
topological_pos BIGINT NOT NULL,
|
||||||
|
-- Unique index
|
||||||
|
UNIQUE (room_id, user_id, membership)
|
||||||
|
);
|
||||||
|
`
|
||||||
|
|
||||||
|
const upsertMembershipSQL = "" +
|
||||||
|
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
|
||||||
|
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||||
|
" ON CONFLICT (room_id, user_id, membership)" +
|
||||||
|
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
|
||||||
|
|
||||||
|
const selectMembershipSQL = "" +
|
||||||
|
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
|
||||||
|
" WHERE room_id = $1 AND user_id = $2 AND membership IN ($3)" +
|
||||||
|
" ORDER BY stream_pos DESC" +
|
||||||
|
" LIMIT 1"
|
||||||
|
|
||||||
|
type membershipsStatements struct {
|
||||||
|
db *sql.DB
|
||||||
|
upsertMembershipStmt *sql.Stmt
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||||
|
s := &membershipsStatements{
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
_, err := db.Exec(membershipsSchema)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) UpsertMembership(
|
||||||
|
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||||
|
streamPos, topologicalPos types.StreamPosition,
|
||||||
|
) error {
|
||||||
|
membership, err := event.Membership()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("event.Membership: %w", err)
|
||||||
|
}
|
||||||
|
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
|
||||||
|
ctx,
|
||||||
|
event.RoomID(),
|
||||||
|
*event.StateKey(),
|
||||||
|
membership,
|
||||||
|
event.EventID(),
|
||||||
|
streamPos,
|
||||||
|
topologicalPos,
|
||||||
|
)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *membershipsStatements) SelectMembership(
|
||||||
|
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
|
||||||
|
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
|
||||||
|
params := []interface{}{roomID, userID}
|
||||||
|
for _, membership := range memberships {
|
||||||
|
params = append(params, membership)
|
||||||
|
}
|
||||||
|
orig := strings.Replace(selectMembershipSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
|
||||||
|
stmt, err := s.db.Prepare(orig)
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, 0, err
|
||||||
|
}
|
||||||
|
err = sqlutil.TxStmt(txn, stmt).QueryRowContext(ctx, params...).Scan(&eventID, &streamPos, &topologyPos)
|
||||||
|
return
|
||||||
|
}
|
|
@ -111,12 +111,11 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
||||||
// on the event's depth.
|
// on the event's depth.
|
||||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||||
) (err error) {
|
) (types.StreamPosition, error) {
|
||||||
stmt := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt)
|
_, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
|
||||||
_, err = stmt.ExecContext(
|
|
||||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||||
)
|
)
|
||||||
return
|
return types.StreamPosition(event.Depth()), err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||||
|
|
|
@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
memberships, err := NewSqliteMembershipsTable(d.db)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
m := sqlutil.NewMigrations()
|
m := sqlutil.NewMigrations()
|
||||||
deltas.LoadFixSequences(m)
|
deltas.LoadFixSequences(m)
|
||||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||||
|
@ -119,6 +123,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
||||||
Filter: filter,
|
Filter: filter,
|
||||||
SendToDevice: sendToDevice,
|
SendToDevice: sendToDevice,
|
||||||
Receipts: receipts,
|
Receipts: receipts,
|
||||||
|
Memberships: memberships,
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,7 +70,7 @@ type Events interface {
|
||||||
type Topology interface {
|
type Topology interface {
|
||||||
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
||||||
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
||||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
|
||||||
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
||||||
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
||||||
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
||||||
|
@ -162,3 +162,8 @@ type Receipts interface {
|
||||||
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
|
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
|
||||||
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Memberships interface {
|
||||||
|
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||||
|
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue