mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 22:22:46 +00:00
Merge branch 'master' into neilalexander/onoldroomevent
This commit is contained in:
commit
125d63ffe6
121 changed files with 3172 additions and 648 deletions
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -38,14 +39,15 @@ type OutputClientDataConsumer struct {
|
|||
|
||||
// NewOutputClientDataConsumer creates a new OutputClientData consumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputClientDataConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store storage.Database,
|
||||
notifier *notifier.Notifier,
|
||||
stream types.StreamProvider,
|
||||
) *OutputClientDataConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "syncapi/clientapi",
|
||||
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputClientData)),
|
||||
Consumer: kafkaConsumer,
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -39,6 +40,7 @@ type OutputReceiptEventConsumer struct {
|
|||
// NewOutputReceiptEventConsumer creates a new OutputReceiptEventConsumer.
|
||||
// Call Start() to begin consuming from the EDU server.
|
||||
func NewOutputReceiptEventConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store storage.Database,
|
||||
|
@ -47,6 +49,7 @@ func NewOutputReceiptEventConsumer(
|
|||
) *OutputReceiptEventConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "syncapi/eduserver/receipt",
|
||||
Topic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
|
||||
Consumer: kafkaConsumer,
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -42,6 +43,7 @@ type OutputSendToDeviceEventConsumer struct {
|
|||
// NewOutputSendToDeviceEventConsumer creates a new OutputSendToDeviceEventConsumer.
|
||||
// Call Start() to begin consuming from the EDU server.
|
||||
func NewOutputSendToDeviceEventConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store storage.Database,
|
||||
|
@ -50,6 +52,7 @@ func NewOutputSendToDeviceEventConsumer(
|
|||
) *OutputSendToDeviceEventConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "syncapi/eduserver/sendtodevice",
|
||||
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent)),
|
||||
Consumer: kafkaConsumer,
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -39,6 +40,7 @@ type OutputTypingEventConsumer struct {
|
|||
// NewOutputTypingEventConsumer creates a new OutputTypingEventConsumer.
|
||||
// Call Start() to begin consuming from the EDU server.
|
||||
func NewOutputTypingEventConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store storage.Database,
|
||||
|
@ -48,6 +50,7 @@ func NewOutputTypingEventConsumer(
|
|||
) *OutputTypingEventConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "syncapi/eduserver/typing",
|
||||
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent)),
|
||||
Consumer: kafkaConsumer,
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -46,6 +47,7 @@ type OutputKeyChangeEventConsumer struct {
|
|||
// NewOutputKeyChangeEventConsumer creates a new OutputKeyChangeEventConsumer.
|
||||
// Call Start() to begin consuming from the key server.
|
||||
func NewOutputKeyChangeEventConsumer(
|
||||
process *process.ProcessContext,
|
||||
serverName gomatrixserverlib.ServerName,
|
||||
topic string,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
|
@ -57,6 +59,7 @@ func NewOutputKeyChangeEventConsumer(
|
|||
) *OutputKeyChangeEventConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "syncapi/keychange",
|
||||
Topic: topic,
|
||||
Consumer: kafkaConsumer,
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
"github.com/matrix-org/dendrite/syncapi/notifier"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
|
@ -43,6 +44,7 @@ type OutputRoomEventConsumer struct {
|
|||
|
||||
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call Start() to begin consuming from room servers.
|
||||
func NewOutputRoomEventConsumer(
|
||||
process *process.ProcessContext,
|
||||
cfg *config.SyncAPI,
|
||||
kafkaConsumer sarama.Consumer,
|
||||
store storage.Database,
|
||||
|
@ -53,6 +55,7 @@ func NewOutputRoomEventConsumer(
|
|||
) *OutputRoomEventConsumer {
|
||||
|
||||
consumer := internal.ContinualConsumer{
|
||||
Process: process,
|
||||
ComponentName: "syncapi/roomserver",
|
||||
Topic: string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent)),
|
||||
Consumer: kafkaConsumer,
|
||||
|
@ -173,6 +176,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
|||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
"add": msg.AddsStateEventIDs,
|
||||
|
@ -215,6 +219,7 @@ func (s *OutputRoomEventConsumer) onOldRoomEvent(
|
|||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": ev.EventID(),
|
||||
"event": string(ev.JSON()),
|
||||
log.ErrorKey: err,
|
||||
}).Panicf("roomserver output log: write old event failure")
|
||||
|
@ -271,6 +276,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
|||
if err != nil {
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": msg.Event.EventID(),
|
||||
"event": string(msg.Event.JSON()),
|
||||
"pdupos": pduPos,
|
||||
log.ErrorKey: err,
|
||||
|
|
|
@ -367,7 +367,6 @@ func newTestSyncRequest(userID, deviceID string, since types.StreamingToken) typ
|
|||
Timeout: 1 * time.Minute,
|
||||
Since: since,
|
||||
WantFullState: false,
|
||||
Limit: 20,
|
||||
Log: util.GetLogger(context.TODO()),
|
||||
Context: context.TODO(),
|
||||
}
|
||||
|
|
|
@ -235,12 +235,15 @@ func (r *messagesReq) retrieveEvents() (
|
|||
clientEvents []gomatrixserverlib.ClientEvent, start,
|
||||
end types.TopologyToken, err error,
|
||||
) {
|
||||
eventFilter := gomatrixserverlib.DefaultRoomEventFilter()
|
||||
eventFilter.Limit = r.limit
|
||||
|
||||
// Retrieve the events from the local database.
|
||||
var streamEvents []types.StreamEvent
|
||||
if r.fromStream != nil {
|
||||
toStream := r.to.StreamToken()
|
||||
streamEvents, err = r.db.GetEventsInStreamingRange(
|
||||
r.ctx, r.fromStream, &toStream, r.roomID, r.limit, r.backwardOrdering,
|
||||
r.ctx, r.fromStream, &toStream, r.roomID, &eventFilter, r.backwardOrdering,
|
||||
)
|
||||
} else {
|
||||
streamEvents, err = r.db.GetEventsInTopologicalRange(
|
||||
|
|
|
@ -40,7 +40,7 @@ type Database interface {
|
|||
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
|
||||
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
|
||||
|
||||
RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||
RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||
|
||||
GetBackwardTopologyPos(ctx context.Context, events []types.StreamEvent) (types.TopologyToken, error)
|
||||
PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error)
|
||||
|
@ -105,7 +105,7 @@ type Database interface {
|
|||
// Returns an error if there was a problem communicating with the database.
|
||||
DeletePeeks(ctx context.Context, RoomID, UserID string) (types.StreamPosition, error)
|
||||
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the given extremities and limit.
|
||||
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
|
||||
GetEventsInStreamingRange(ctx context.Context, from, to *types.StreamingToken, roomID string, eventFilter *gomatrixserverlib.RoomEventFilter, backwardOrdering bool) (events []types.StreamEvent, err error)
|
||||
// GetEventsInTopologicalRange retrieves all of the events on a given ordering using the given extremities and limit.
|
||||
GetEventsInTopologicalRange(ctx context.Context, from, to *types.TopologyToken, roomID string, limit int, backwardOrdering bool) (events []types.StreamEvent, err error)
|
||||
// EventPositionInTopology returns the depth and stream position of the given event.
|
||||
|
|
|
@ -83,7 +83,7 @@ func (s *filterStatements) SelectFilter(
|
|||
}
|
||||
|
||||
// Unmarshal JSON into Filter struct
|
||||
var filter gomatrixserverlib.Filter
|
||||
filter := gomatrixserverlib.DefaultFilter()
|
||||
if err = json.Unmarshal(filterData, &filter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
111
syncapi/storage/postgres/memberships_table.go
Normal file
111
syncapi/storage/postgres/memberships_table.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// The memberships table is designed to track the last time that
|
||||
// the user was a given state. This allows us to find out the
|
||||
// most recent time that a user was invited to, joined or left
|
||||
// a room, either by choice or otherwise. This is important for
|
||||
// building history visibility.
|
||||
|
||||
const membershipsSchema = `
|
||||
CREATE TABLE IF NOT EXISTS syncapi_memberships (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
user_id TEXT NOT NULL,
|
||||
-- The status of the membership
|
||||
membership TEXT NOT NULL,
|
||||
-- The event ID that last changed the membership
|
||||
event_id TEXT NOT NULL,
|
||||
-- The stream position of the change
|
||||
stream_pos BIGINT NOT NULL,
|
||||
-- The topological position of the change in the room
|
||||
topological_pos BIGINT NOT NULL,
|
||||
-- Unique index
|
||||
CONSTRAINT syncapi_memberships_unique UNIQUE (room_id, user_id, membership)
|
||||
);
|
||||
`
|
||||
|
||||
const upsertMembershipSQL = "" +
|
||||
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||
" ON CONFLICT ON CONSTRAINT syncapi_memberships_unique" +
|
||||
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
|
||||
|
||||
const selectMembershipSQL = "" +
|
||||
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
|
||||
" WHERE room_id = $1 AND user_id = $2 AND membership = ANY($3)" +
|
||||
" ORDER BY stream_pos DESC" +
|
||||
" LIMIT 1"
|
||||
|
||||
type membershipsStatements struct {
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
selectMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||
s := &membershipsStatements{}
|
||||
_, err := db.Exec(membershipsSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectMembershipStmt, err = db.Prepare(selectMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) UpsertMembership(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||
streamPos, topologicalPos types.StreamPosition,
|
||||
) error {
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
return fmt.Errorf("event.Membership: %w", err)
|
||||
}
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
*event.StateKey(),
|
||||
membership,
|
||||
event.EventID(),
|
||||
streamPos,
|
||||
topologicalPos,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) SelectMembership(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
|
||||
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectMembershipStmt)
|
||||
err = stmt.QueryRowContext(ctx, roomID, userID, memberships).Scan(&eventID, &streamPos, &topologyPos)
|
||||
return
|
||||
}
|
|
@ -84,17 +84,29 @@ const selectEventsSQL = "" +
|
|||
const selectRecentEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id DESC LIMIT $4"
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||
" ORDER BY id DESC LIMIT $8"
|
||||
|
||||
const selectRecentEventsForSyncSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
||||
" ORDER BY id DESC LIMIT $4"
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||
" ORDER BY id DESC LIMIT $8"
|
||||
|
||||
const selectEarlyEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id ASC LIMIT $4"
|
||||
" AND ( $4::text[] IS NULL OR sender = ANY($4) )" +
|
||||
" AND ( $5::text[] IS NULL OR NOT(sender = ANY($5)) )" +
|
||||
" AND ( $6::text[] IS NULL OR type LIKE ANY($6) )" +
|
||||
" AND ( $7::text[] IS NULL OR NOT(type LIKE ANY($7)) )" +
|
||||
" ORDER BY id ASC LIMIT $8"
|
||||
|
||||
const selectMaxEventIDSQL = "" +
|
||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||
|
@ -322,7 +334,7 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
|||
// from sync.
|
||||
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomID string, r types.Range, limit int,
|
||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
chronologicalOrder bool, onlySyncEvents bool,
|
||||
) ([]types.StreamEvent, bool, error) {
|
||||
var stmt *sql.Stmt
|
||||
|
@ -331,7 +343,14 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
|||
} else {
|
||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
|
||||
}
|
||||
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
|
||||
rows, err := stmt.QueryContext(
|
||||
ctx, roomID, r.Low(), r.High(),
|
||||
pq.StringArray(eventFilter.Senders),
|
||||
pq.StringArray(eventFilter.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
|
||||
eventFilter.Limit+1,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
@ -350,7 +369,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
|||
}
|
||||
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
||||
limited := false
|
||||
if len(events) > limit {
|
||||
if len(events) > eventFilter.Limit {
|
||||
limited = true
|
||||
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
||||
if chronologicalOrder {
|
||||
|
@ -367,10 +386,17 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
|||
// from a given position, up to a maximum of 'limit'.
|
||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomID string, r types.Range, limit int,
|
||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
) ([]types.StreamEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
|
||||
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
|
||||
rows, err := stmt.QueryContext(
|
||||
ctx, roomID, r.Low(), r.High(),
|
||||
pq.StringArray(eventFilter.Senders),
|
||||
pq.StringArray(eventFilter.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(eventFilter.NotTypes)),
|
||||
eventFilter.Limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -44,7 +44,8 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_topological_position_idx ON sync
|
|||
const insertEventInTopologySQL = "" +
|
||||
"INSERT INTO syncapi_output_room_events_topology (event_id, topological_position, room_id, stream_position)" +
|
||||
" VALUES ($1, $2, $3, $4)" +
|
||||
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1"
|
||||
" ON CONFLICT (topological_position, stream_position, room_id) DO UPDATE SET event_id = $1" +
|
||||
" RETURNING topological_position"
|
||||
|
||||
const selectEventIDsInRangeASCSQL = "" +
|
||||
"SELECT event_id FROM syncapi_output_room_events_topology" +
|
||||
|
@ -115,10 +116,10 @@ func NewPostgresTopologyTable(db *sql.DB) (tables.Topology, error) {
|
|||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||
) (err error) {
|
||||
_, err = s.insertEventInTopologyStmt.ExecContext(
|
||||
) (topoPos types.StreamPosition, err error) {
|
||||
err = sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).QueryRowContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||
)
|
||||
).Scan(&topoPos)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -87,6 +87,10 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
memberships, err := NewPostgresMembershipsTable(d.db)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
|
@ -106,6 +110,7 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
Filter: filter,
|
||||
SendToDevice: sendToDevice,
|
||||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
}
|
||||
return &d, nil
|
||||
}
|
||||
|
|
|
@ -48,6 +48,7 @@ type Database struct {
|
|||
SendToDevice tables.SendToDevice
|
||||
Filter tables.Filter
|
||||
Receipts tables.Receipts
|
||||
Memberships tables.Memberships
|
||||
}
|
||||
|
||||
func (d *Database) readOnlySnapshot(ctx context.Context) (*sql.Tx, error) {
|
||||
|
@ -110,8 +111,8 @@ func (d *Database) RoomIDsWithMembership(ctx context.Context, userID string, mem
|
|||
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
|
||||
}
|
||||
|
||||
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
||||
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, limit, chronologicalOrder, onlySyncEvents)
|
||||
func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
|
||||
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
|
||||
}
|
||||
|
||||
func (d *Database) PositionInTopology(ctx context.Context, eventID string) (pos types.StreamPosition, spos types.StreamPosition, err error) {
|
||||
|
@ -151,7 +152,7 @@ func (d *Database) Events(ctx context.Context, eventIDs []string) ([]*gomatrixse
|
|||
func (d *Database) GetEventsInStreamingRange(
|
||||
ctx context.Context,
|
||||
from, to *types.StreamingToken,
|
||||
roomID string, limit int,
|
||||
roomID string, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
backwardOrdering bool,
|
||||
) (events []types.StreamEvent, err error) {
|
||||
r := types.Range{
|
||||
|
@ -162,14 +163,14 @@ func (d *Database) GetEventsInStreamingRange(
|
|||
if backwardOrdering {
|
||||
// When using backward ordering, we want the most recent events first.
|
||||
if events, _, err = d.OutputEvents.SelectRecentEvents(
|
||||
ctx, nil, roomID, r, limit, false, false,
|
||||
ctx, nil, roomID, r, eventFilter, false, false,
|
||||
); err != nil {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// When using forward ordering, we want the least recent events first.
|
||||
if events, err = d.OutputEvents.SelectEarlyEvents(
|
||||
ctx, nil, roomID, r, limit,
|
||||
ctx, nil, roomID, r, eventFilter,
|
||||
); err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -383,8 +384,8 @@ func (d *Database) WriteEvent(
|
|||
return fmt.Errorf("d.OutputEvents.InsertEvent: %w", err)
|
||||
}
|
||||
pduPosition = pos
|
||||
|
||||
if err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
||||
var topoPosition types.StreamPosition
|
||||
if topoPosition, err = d.Topology.InsertEventInTopology(ctx, txn, ev, pos); err != nil {
|
||||
return fmt.Errorf("d.Topology.InsertEventInTopology: %w", err)
|
||||
}
|
||||
|
||||
|
@ -397,7 +398,7 @@ func (d *Database) WriteEvent(
|
|||
return nil
|
||||
}
|
||||
|
||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition)
|
||||
return d.updateRoomState(ctx, txn, removeStateEventIDs, addStateEvents, pduPosition, topoPosition)
|
||||
})
|
||||
|
||||
return pduPosition, returnErr
|
||||
|
@ -409,6 +410,7 @@ func (d *Database) updateRoomState(
|
|||
removedEventIDs []string,
|
||||
addedEvents []*gomatrixserverlib.HeaderedEvent,
|
||||
pduPosition types.StreamPosition,
|
||||
topoPosition types.StreamPosition,
|
||||
) error {
|
||||
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
|
||||
for _, eventID := range removedEventIDs {
|
||||
|
@ -429,6 +431,9 @@ func (d *Database) updateRoomState(
|
|||
return fmt.Errorf("event.Membership: %w", err)
|
||||
}
|
||||
membership = &value
|
||||
if err = d.Memberships.UpsertMembership(ctx, txn, event, pduPosition, topoPosition); err != nil {
|
||||
return fmt.Errorf("d.Memberships.UpsertMembership: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, membership, pduPosition); err != nil {
|
||||
|
|
|
@ -82,7 +82,7 @@ func (s *accountDataStatements) InsertAccountData(
|
|||
ctx context.Context, txn *sql.Tx,
|
||||
userID, roomID, dataType string,
|
||||
) (pos types.StreamPosition, err error) {
|
||||
pos, err = s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
pos, err = s.streamIDStatements.nextAccountDataID(ctx, txn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
@ -66,13 +67,8 @@ const selectRoomIDsWithMembershipSQL = "" +
|
|||
"SELECT DISTINCT room_id FROM syncapi_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND membership = $2"
|
||||
|
||||
const selectCurrentStateSQL = "" +
|
||||
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1" +
|
||||
" AND ( $2 IS NULL OR sender IN ($2) )" +
|
||||
" AND ( $3 IS NULL OR NOT(sender IN ($3)) )" +
|
||||
" AND ( $4 IS NULL OR type IN ($4) )" +
|
||||
" AND ( $5 IS NULL OR NOT(type IN ($5)) )" +
|
||||
" AND ( $6 IS NULL OR contains_url = $6 )" +
|
||||
" LIMIT $7"
|
||||
"SELECT event_id, headered_event_json FROM syncapi_current_room_state WHERE room_id = $1"
|
||||
// WHEN, ORDER BY and LIMIT will be added by prepareWithFilter
|
||||
|
||||
const selectJoinedUsersSQL = "" +
|
||||
"SELECT room_id, state_key FROM syncapi_current_room_state WHERE type = 'm.room.member' AND membership = 'join'"
|
||||
|
@ -95,7 +91,6 @@ type currentRoomStateStatements struct {
|
|||
deleteRoomStateByEventIDStmt *sql.Stmt
|
||||
DeleteRoomStateForRoomStmt *sql.Stmt
|
||||
selectRoomIDsWithMembershipStmt *sql.Stmt
|
||||
selectCurrentStateStmt *sql.Stmt
|
||||
selectJoinedUsersStmt *sql.Stmt
|
||||
selectStateEventStmt *sql.Stmt
|
||||
}
|
||||
|
@ -121,9 +116,6 @@ func NewSqliteCurrentRoomStateTable(db *sql.DB, streamID *streamIDStatements) (t
|
|||
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectCurrentStateStmt, err = db.Prepare(selectCurrentStateSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectJoinedUsersStmt, err = db.Prepare(selectJoinedUsersSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -185,17 +177,22 @@ func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
|
|||
// CurrentState returns all the current state events for the given room.
|
||||
func (s *currentRoomStateStatements) SelectCurrentState(
|
||||
ctx context.Context, txn *sql.Tx, roomID string,
|
||||
stateFilterPart *gomatrixserverlib.StateFilter,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) ([]*gomatrixserverlib.HeaderedEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectCurrentStateStmt)
|
||||
rows, err := stmt.QueryContext(ctx, roomID,
|
||||
nil, // FIXME: pq.StringArray(stateFilterPart.Senders),
|
||||
nil, // FIXME: pq.StringArray(stateFilterPart.NotSenders),
|
||||
nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
||||
nil, // FIXME: pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
||||
stateFilterPart.ContainsURL,
|
||||
stateFilterPart.Limit,
|
||||
stmt, params, err := prepareWithFilters(
|
||||
s.db, txn, selectCurrentStateSQL,
|
||||
[]interface{}{
|
||||
roomID,
|
||||
},
|
||||
stateFilter.Senders, stateFilter.NotSenders,
|
||||
stateFilter.Types, stateFilter.NotTypes,
|
||||
stateFilter.Limit, FilterOrderNone,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||
}
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, params...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ func (s *filterStatements) SelectFilter(
|
|||
}
|
||||
|
||||
// Unmarshal JSON into Filter struct
|
||||
var filter gomatrixserverlib.Filter
|
||||
filter := gomatrixserverlib.DefaultFilter()
|
||||
if err = json.Unmarshal(filterData, &filter); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
76
syncapi/storage/sqlite3/filtering.go
Normal file
76
syncapi/storage/sqlite3/filtering.go
Normal file
|
@ -0,0 +1,76 @@
|
|||
package sqlite3
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
)
|
||||
|
||||
type FilterOrder int
|
||||
|
||||
const (
|
||||
FilterOrderNone = iota
|
||||
FilterOrderAsc
|
||||
FilterOrderDesc
|
||||
)
|
||||
|
||||
// prepareWithFilters returns a prepared statement with the
|
||||
// relevant filters included. It also includes an []interface{}
|
||||
// list of all the relevant parameters to pass straight to
|
||||
// QueryContext, QueryRowContext etc.
|
||||
// We don't take the filter object directly here because the
|
||||
// fields might come from either a StateFilter or an EventFilter,
|
||||
// and it's easier just to have the caller extract the relevant
|
||||
// parts.
|
||||
func prepareWithFilters(
|
||||
db *sql.DB, txn *sql.Tx, query string, params []interface{},
|
||||
senders, notsenders, types, nottypes []string,
|
||||
limit int, order FilterOrder,
|
||||
) (*sql.Stmt, []interface{}, error) {
|
||||
offset := len(params)
|
||||
if count := len(senders); count > 0 {
|
||||
query += " AND sender IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||
for _, v := range senders {
|
||||
params, offset = append(params, v), offset+1
|
||||
}
|
||||
}
|
||||
if count := len(notsenders); count > 0 {
|
||||
query += " AND sender NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||
for _, v := range notsenders {
|
||||
params, offset = append(params, v), offset+1
|
||||
}
|
||||
}
|
||||
if count := len(types); count > 0 {
|
||||
query += " AND type IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||
for _, v := range types {
|
||||
params, offset = append(params, v), offset+1
|
||||
}
|
||||
}
|
||||
if count := len(nottypes); count > 0 {
|
||||
query += " AND type NOT IN " + sqlutil.QueryVariadicOffset(count, offset)
|
||||
for _, v := range nottypes {
|
||||
params, offset = append(params, v), offset+1
|
||||
}
|
||||
}
|
||||
switch order {
|
||||
case FilterOrderAsc:
|
||||
query += " ORDER BY id ASC"
|
||||
case FilterOrderDesc:
|
||||
query += " ORDER BY id DESC"
|
||||
}
|
||||
query += fmt.Sprintf(" LIMIT $%d", offset+1)
|
||||
params = append(params, limit)
|
||||
|
||||
var stmt *sql.Stmt
|
||||
var err error
|
||||
if txn != nil {
|
||||
stmt, err = txn.Prepare(query)
|
||||
} else {
|
||||
stmt, err = db.Prepare(query)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("s.db.Prepare: %w", err)
|
||||
}
|
||||
return stmt, params, nil
|
||||
}
|
|
@ -93,7 +93,7 @@ func NewSqliteInvitesTable(db *sql.DB, streamID *streamIDStatements) (tables.Inv
|
|||
func (s *inviteEventsStatements) InsertInviteEvent(
|
||||
ctx context.Context, txn *sql.Tx, inviteEvent *gomatrixserverlib.HeaderedEvent,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
streamPos, err = s.streamIDStatements.nextInviteID(ctx, txn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ func (s *inviteEventsStatements) InsertInviteEvent(
|
|||
func (s *inviteEventsStatements) DeleteInviteEvent(
|
||||
ctx context.Context, txn *sql.Tx, inviteEventID string,
|
||||
) (types.StreamPosition, error) {
|
||||
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
streamPos, err := s.streamIDStatements.nextInviteID(ctx, txn)
|
||||
if err != nil {
|
||||
return streamPos, err
|
||||
}
|
||||
|
|
119
syncapi/storage/sqlite3/memberships_table.go
Normal file
119
syncapi/storage/sqlite3/memberships_table.go
Normal file
|
@ -0,0 +1,119 @@
|
|||
// Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package sqlite3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal/sqlutil"
|
||||
"github.com/matrix-org/dendrite/syncapi/storage/tables"
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
// The memberships table is designed to track the last time that
|
||||
// the user was a given state. This allows us to find out the
|
||||
// most recent time that a user was invited to, joined or left
|
||||
// a room, either by choice or otherwise. This is important for
|
||||
// building history visibility.
|
||||
|
||||
const membershipsSchema = `
|
||||
CREATE TABLE IF NOT EXISTS syncapi_memberships (
|
||||
-- The 'room_id' key for the state event.
|
||||
room_id TEXT NOT NULL,
|
||||
-- The state event ID
|
||||
user_id TEXT NOT NULL,
|
||||
-- The status of the membership
|
||||
membership TEXT NOT NULL,
|
||||
-- The event ID that last changed the membership
|
||||
event_id TEXT NOT NULL,
|
||||
-- The stream position of the change
|
||||
stream_pos BIGINT NOT NULL,
|
||||
-- The topological position of the change in the room
|
||||
topological_pos BIGINT NOT NULL,
|
||||
-- Unique index
|
||||
UNIQUE (room_id, user_id, membership)
|
||||
);
|
||||
`
|
||||
|
||||
const upsertMembershipSQL = "" +
|
||||
"INSERT INTO syncapi_memberships (room_id, user_id, membership, event_id, stream_pos, topological_pos)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6)" +
|
||||
" ON CONFLICT (room_id, user_id, membership)" +
|
||||
" DO UPDATE SET event_id = $4, stream_pos = $5, topological_pos = $6"
|
||||
|
||||
const selectMembershipSQL = "" +
|
||||
"SELECT event_id, stream_pos, topological_pos FROM syncapi_memberships" +
|
||||
" WHERE room_id = $1 AND user_id = $2 AND membership IN ($3)" +
|
||||
" ORDER BY stream_pos DESC" +
|
||||
" LIMIT 1"
|
||||
|
||||
type membershipsStatements struct {
|
||||
db *sql.DB
|
||||
upsertMembershipStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
|
||||
s := &membershipsStatements{
|
||||
db: db,
|
||||
}
|
||||
_, err := db.Exec(membershipsSchema)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) UpsertMembership(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent,
|
||||
streamPos, topologicalPos types.StreamPosition,
|
||||
) error {
|
||||
membership, err := event.Membership()
|
||||
if err != nil {
|
||||
return fmt.Errorf("event.Membership: %w", err)
|
||||
}
|
||||
_, err = sqlutil.TxStmt(txn, s.upsertMembershipStmt).ExecContext(
|
||||
ctx,
|
||||
event.RoomID(),
|
||||
*event.StateKey(),
|
||||
membership,
|
||||
event.EventID(),
|
||||
streamPos,
|
||||
topologicalPos,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *membershipsStatements) SelectMembership(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string,
|
||||
) (eventID string, streamPos, topologyPos types.StreamPosition, err error) {
|
||||
params := []interface{}{roomID, userID}
|
||||
for _, membership := range memberships {
|
||||
params = append(params, membership)
|
||||
}
|
||||
orig := strings.Replace(selectMembershipSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
|
||||
stmt, err := s.db.Prepare(orig)
|
||||
if err != nil {
|
||||
return "", 0, 0, err
|
||||
}
|
||||
err = sqlutil.TxStmt(txn, stmt).QueryRowContext(ctx, params...).Scan(&eventID, &streamPos, &topologyPos)
|
||||
return
|
||||
}
|
|
@ -19,6 +19,7 @@ import (
|
|||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
@ -60,18 +61,18 @@ const selectEventsSQL = "" +
|
|||
|
||||
const selectRecentEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id DESC LIMIT $4"
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const selectRecentEventsForSyncSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE" +
|
||||
" ORDER BY id DESC LIMIT $4"
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3 AND exclude_from_sync = FALSE"
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const selectEarlyEventsSQL = "" +
|
||||
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events" +
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3" +
|
||||
" ORDER BY id ASC LIMIT $4"
|
||||
" WHERE room_id = $1 AND id > $2 AND id <= $3"
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const selectMaxEventIDSQL = "" +
|
||||
"SELECT MAX(id) FROM syncapi_output_room_events"
|
||||
|
@ -79,45 +80,24 @@ const selectMaxEventIDSQL = "" +
|
|||
const updateEventJSONSQL = "" +
|
||||
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
|
||||
|
||||
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
|
||||
/*
|
||||
$1 = oldPos,
|
||||
$2 = newPos,
|
||||
$3 = pq.StringArray(stateFilterPart.Senders),
|
||||
$4 = pq.StringArray(stateFilterPart.NotSenders),
|
||||
$5 = pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
||||
$6 = pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
||||
$7 = stateFilterPart.ContainsURL,
|
||||
$8 = stateFilterPart.Limit,
|
||||
*/
|
||||
const selectStateInRangeSQL = "" +
|
||||
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
|
||||
" FROM syncapi_output_room_events" +
|
||||
" WHERE (id > $1 AND id <= $2)" + // old/new pos
|
||||
" AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
|
||||
/* " AND ( $3 IS NULL OR sender IN ($3) )" + // sender
|
||||
" AND ( $4 IS NULL OR NOT(sender IN ($4)) )" + // not sender
|
||||
" AND ( $5 IS NULL OR type IN ($5) )" + // type
|
||||
" AND ( $6 IS NULL OR NOT(type IN ($6)) )" + // not type
|
||||
" AND ( $7 IS NULL OR contains_url = $7)" + // contains URL? */
|
||||
" ORDER BY id ASC" +
|
||||
" LIMIT $8" // limit
|
||||
" WHERE (id > $1 AND id <= $2)" +
|
||||
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
|
||||
// WHEN, ORDER BY and LIMIT are appended by prepareWithFilters
|
||||
|
||||
const deleteEventsForRoomSQL = "" +
|
||||
"DELETE FROM syncapi_output_room_events WHERE room_id = $1"
|
||||
|
||||
type outputRoomEventsStatements struct {
|
||||
db *sql.DB
|
||||
streamIDStatements *streamIDStatements
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
selectMaxEventIDStmt *sql.Stmt
|
||||
selectRecentEventsStmt *sql.Stmt
|
||||
selectRecentEventsForSyncStmt *sql.Stmt
|
||||
selectEarlyEventsStmt *sql.Stmt
|
||||
selectStateInRangeStmt *sql.Stmt
|
||||
updateEventJSONStmt *sql.Stmt
|
||||
deleteEventsForRoomStmt *sql.Stmt
|
||||
db *sql.DB
|
||||
streamIDStatements *streamIDStatements
|
||||
insertEventStmt *sql.Stmt
|
||||
selectEventsStmt *sql.Stmt
|
||||
selectMaxEventIDStmt *sql.Stmt
|
||||
updateEventJSONStmt *sql.Stmt
|
||||
deleteEventsForRoomStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Events, error) {
|
||||
|
@ -138,18 +118,6 @@ func NewSqliteEventsTable(db *sql.DB, streamID *streamIDStatements) (tables.Even
|
|||
if s.selectMaxEventIDStmt, err = db.Prepare(selectMaxEventIDSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectRecentEventsStmt, err = db.Prepare(selectRecentEventsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectRecentEventsForSyncStmt, err = db.Prepare(selectRecentEventsForSyncSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectEarlyEventsStmt, err = db.Prepare(selectEarlyEventsSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.selectStateInRangeStmt, err = db.Prepare(selectStateInRangeSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.updateEventJSONStmt, err = db.Prepare(updateEventJSONSQL); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -173,19 +141,22 @@ func (s *outputRoomEventsStatements) UpdateEventJSON(ctx context.Context, event
|
|||
// two positions, only the most recent state is returned.
|
||||
func (s *outputRoomEventsStatements) SelectStateInRange(
|
||||
ctx context.Context, txn *sql.Tx, r types.Range,
|
||||
stateFilterPart *gomatrixserverlib.StateFilter,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
) (map[string]map[string]bool, map[string]types.StreamEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectStateInRangeStmt)
|
||||
|
||||
rows, err := stmt.QueryContext(
|
||||
ctx, r.Low(), r.High(),
|
||||
/*pq.StringArray(stateFilterPart.Senders),
|
||||
pq.StringArray(stateFilterPart.NotSenders),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.Types)),
|
||||
pq.StringArray(filterConvertTypeWildcardToSQL(stateFilterPart.NotTypes)),
|
||||
stateFilterPart.ContainsURL,*/
|
||||
stateFilterPart.Limit,
|
||||
stmt, params, err := prepareWithFilters(
|
||||
s.db, txn, selectStateInRangeSQL,
|
||||
[]interface{}{
|
||||
r.Low(), r.High(),
|
||||
},
|
||||
stateFilter.Senders, stateFilter.NotSenders,
|
||||
stateFilter.Types, stateFilter.NotTypes,
|
||||
stateFilter.Limit, FilterOrderAsc,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||
}
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, params...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
@ -298,16 +269,21 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
|||
return 0, err
|
||||
}
|
||||
|
||||
addStateJSON, err := json.Marshal(addState)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
var addStateJSON, removeStateJSON []byte
|
||||
if len(addState) > 0 {
|
||||
addStateJSON, err = json.Marshal(addState)
|
||||
}
|
||||
removeStateJSON, err := json.Marshal(removeState)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, fmt.Errorf("json.Marshal(addState): %w", err)
|
||||
}
|
||||
if len(removeState) > 0 {
|
||||
removeStateJSON, err = json.Marshal(removeState)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("json.Marshal(removeState): %w", err)
|
||||
}
|
||||
|
||||
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -333,17 +309,30 @@ func (s *outputRoomEventsStatements) InsertEvent(
|
|||
|
||||
func (s *outputRoomEventsStatements) SelectRecentEvents(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomID string, r types.Range, limit int,
|
||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
chronologicalOrder bool, onlySyncEvents bool,
|
||||
) ([]types.StreamEvent, bool, error) {
|
||||
var stmt *sql.Stmt
|
||||
var query string
|
||||
if onlySyncEvents {
|
||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsForSyncStmt)
|
||||
query = selectRecentEventsForSyncSQL
|
||||
} else {
|
||||
stmt = sqlutil.TxStmt(txn, s.selectRecentEventsStmt)
|
||||
query = selectRecentEventsSQL
|
||||
}
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit+1)
|
||||
stmt, params, err := prepareWithFilters(
|
||||
s.db, txn, query,
|
||||
[]interface{}{
|
||||
roomID, r.Low(), r.High(),
|
||||
},
|
||||
eventFilter.Senders, eventFilter.NotSenders,
|
||||
eventFilter.Types, eventFilter.NotTypes,
|
||||
eventFilter.Limit+1, FilterOrderDesc,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||
}
|
||||
|
||||
rows, err := stmt.QueryContext(ctx, params...)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
@ -362,7 +351,7 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
|||
}
|
||||
// we queried for 1 more than the limit, so if we returned one more mark limited=true
|
||||
limited := false
|
||||
if len(events) > limit {
|
||||
if len(events) > eventFilter.Limit {
|
||||
limited = true
|
||||
// re-slice the extra (oldest) event out: in chronological order this is the first entry, else the last.
|
||||
if chronologicalOrder {
|
||||
|
@ -376,10 +365,21 @@ func (s *outputRoomEventsStatements) SelectRecentEvents(
|
|||
|
||||
func (s *outputRoomEventsStatements) SelectEarlyEvents(
|
||||
ctx context.Context, txn *sql.Tx,
|
||||
roomID string, r types.Range, limit int,
|
||||
roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
) ([]types.StreamEvent, error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.selectEarlyEventsStmt)
|
||||
rows, err := stmt.QueryContext(ctx, roomID, r.Low(), r.High(), limit)
|
||||
stmt, params, err := prepareWithFilters(
|
||||
s.db, txn, selectEarlyEventsSQL,
|
||||
[]interface{}{
|
||||
roomID, r.Low(), r.High(),
|
||||
},
|
||||
eventFilter.Senders, eventFilter.NotSenders,
|
||||
eventFilter.Types, eventFilter.NotTypes,
|
||||
eventFilter.Limit, FilterOrderAsc,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("s.prepareWithFilters: %w", err)
|
||||
}
|
||||
rows, err := stmt.QueryContext(ctx, params...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -111,12 +111,11 @@ func NewSqliteTopologyTable(db *sql.DB) (tables.Topology, error) {
|
|||
// on the event's depth.
|
||||
func (s *outputRoomEventsTopologyStatements) InsertEventInTopology(
|
||||
ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition,
|
||||
) (err error) {
|
||||
stmt := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt)
|
||||
_, err = stmt.ExecContext(
|
||||
) (types.StreamPosition, error) {
|
||||
_, err := sqlutil.TxStmt(txn, s.insertEventInTopologyStmt).ExecContext(
|
||||
ctx, event.EventID(), event.Depth(), event.RoomID(), pos,
|
||||
)
|
||||
return
|
||||
return types.StreamPosition(event.Depth()), err
|
||||
}
|
||||
|
||||
func (s *outputRoomEventsTopologyStatements) SelectEventIDsInRange(
|
||||
|
|
|
@ -108,7 +108,7 @@ func NewSqlitePeeksTable(db *sql.DB, streamID *streamIDStatements) (tables.Peeks
|
|||
func (s *peekStatements) InsertPeek(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ func (s *peekStatements) InsertPeek(
|
|||
func (s *peekStatements) DeletePeek(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID, deviceID string,
|
||||
) (streamPos types.StreamPosition, err error) {
|
||||
streamPos, err = s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
streamPos, err = s.streamIDStatements.nextPDUID(ctx, txn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ func (s *peekStatements) DeletePeek(
|
|||
func (s *peekStatements) DeletePeeks(
|
||||
ctx context.Context, txn *sql.Tx, roomID, userID string,
|
||||
) (types.StreamPosition, error) {
|
||||
streamPos, err := s.streamIDStatements.nextStreamID(ctx, txn)
|
||||
streamPos, err := s.streamIDStatements.nextPDUID(ctx, txn)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
|
@ -20,6 +20,10 @@ INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("global", 0)
|
|||
ON CONFLICT DO NOTHING;
|
||||
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("receipt", 0)
|
||||
ON CONFLICT DO NOTHING;
|
||||
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("accountdata", 0)
|
||||
ON CONFLICT DO NOTHING;
|
||||
INSERT INTO syncapi_stream_id (stream_name, stream_id) VALUES ("invite", 0)
|
||||
ON CONFLICT DO NOTHING;
|
||||
`
|
||||
|
||||
const increaseStreamIDStmt = "" +
|
||||
|
@ -49,7 +53,7 @@ func (s *streamIDStatements) prepare(db *sql.DB) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (s *streamIDStatements) nextStreamID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
|
||||
func (s *streamIDStatements) nextPDUID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
|
||||
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
|
||||
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
|
||||
if _, err = increaseStmt.ExecContext(ctx, "global"); err != nil {
|
||||
|
@ -68,3 +72,23 @@ func (s *streamIDStatements) nextReceiptID(ctx context.Context, txn *sql.Tx) (po
|
|||
err = selectStmt.QueryRowContext(ctx, "receipt").Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamIDStatements) nextInviteID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
|
||||
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
|
||||
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
|
||||
if _, err = increaseStmt.ExecContext(ctx, "invite"); err != nil {
|
||||
return
|
||||
}
|
||||
err = selectStmt.QueryRowContext(ctx, "invite").Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamIDStatements) nextAccountDataID(ctx context.Context, txn *sql.Tx) (pos types.StreamPosition, err error) {
|
||||
increaseStmt := sqlutil.TxStmt(txn, s.increaseStreamIDStmt)
|
||||
selectStmt := sqlutil.TxStmt(txn, s.selectStreamIDStmt)
|
||||
if _, err = increaseStmt.ExecContext(ctx, "accountdata"); err != nil {
|
||||
return
|
||||
}
|
||||
err = selectStmt.QueryRowContext(ctx, "accountdata").Scan(&pos)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -100,6 +100,10 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
memberships, err := NewSqliteMembershipsTable(d.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m := sqlutil.NewMigrations()
|
||||
deltas.LoadFixSequences(m)
|
||||
deltas.LoadRemoveSendToDeviceSentColumn(m)
|
||||
|
@ -119,6 +123,7 @@ func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (er
|
|||
Filter: filter,
|
||||
SendToDevice: sendToDevice,
|
||||
Receipts: receipts,
|
||||
Memberships: memberships,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -56,9 +56,9 @@ type Events interface {
|
|||
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
||||
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
||||
// Returns up to `limit` events. Returns `limited=true` if there are more events in this range but we hit the `limit`.
|
||||
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)
|
||||
// SelectEarlyEvents returns the earliest events in the given room.
|
||||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
|
||||
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter) ([]types.StreamEvent, error)
|
||||
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
||||
UpdateEventJSON(ctx context.Context, event *gomatrixserverlib.HeaderedEvent) error
|
||||
// DeleteEventsForRoom removes all event information for a room. This should only be done when removing the room entirely.
|
||||
|
@ -70,7 +70,7 @@ type Events interface {
|
|||
type Topology interface {
|
||||
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
||||
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
||||
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (topoPos types.StreamPosition, err error)
|
||||
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
||||
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
||||
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
||||
|
@ -162,3 +162,8 @@ type Receipts interface {
|
|||
SelectRoomReceiptsAfter(ctx context.Context, roomIDs []string, streamPos types.StreamPosition) (types.StreamPosition, []eduAPI.OutputReceiptEvent, error)
|
||||
SelectMaxReceiptID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
||||
}
|
||||
|
||||
type Memberships interface {
|
||||
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
|
||||
SelectMembership(ctx context.Context, txn *sql.Tx, roomID, userID, memberships []string) (eventID string, streamPos, topologyPos types.StreamPosition, err error)
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ func (p *DeviceListStreamProvider) CompleteSync(
|
|||
ctx context.Context,
|
||||
req *types.SyncRequest,
|
||||
) types.LogPosition {
|
||||
return p.IncrementalSync(ctx, req, types.LogPosition{}, p.LatestPosition(ctx))
|
||||
return p.LatestPosition(ctx)
|
||||
}
|
||||
|
||||
func (p *DeviceListStreamProvider) IncrementalSync(
|
||||
|
|
|
@ -2,18 +2,54 @@ package streams
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/syncapi/types"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
// The max number of per-room goroutines to have running.
|
||||
// Too high and this will consume lots of CPU, too low and complete
|
||||
// sync responses will take longer to process.
|
||||
const PDU_STREAM_WORKERS = 256
|
||||
|
||||
// The maximum number of tasks that can be queued in total before
|
||||
// backpressure will build up and the rests will start to block.
|
||||
const PDU_STREAM_QUEUESIZE = PDU_STREAM_WORKERS * 8
|
||||
|
||||
type PDUStreamProvider struct {
|
||||
StreamProvider
|
||||
|
||||
tasks chan func()
|
||||
workers atomic.Int32
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) worker() {
|
||||
defer p.workers.Dec()
|
||||
for {
|
||||
select {
|
||||
case f := <-p.tasks:
|
||||
f()
|
||||
case <-time.After(time.Second * 10):
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) queue(f func()) {
|
||||
if p.workers.Load() < PDU_STREAM_WORKERS {
|
||||
p.workers.Inc()
|
||||
go p.worker()
|
||||
}
|
||||
p.tasks <- f
|
||||
}
|
||||
|
||||
func (p *PDUStreamProvider) Setup() {
|
||||
p.StreamProvider.Setup()
|
||||
p.tasks = make(chan func(), PDU_STREAM_QUEUESIZE)
|
||||
|
||||
p.latestMutex.Lock()
|
||||
defer p.latestMutex.Unlock()
|
||||
|
@ -48,22 +84,36 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
return from
|
||||
}
|
||||
|
||||
stateFilter := gomatrixserverlib.DefaultStateFilter() // TODO: use filter provided in request
|
||||
stateFilter := req.Filter.Room.State
|
||||
eventFilter := req.Filter.Room.Timeline
|
||||
|
||||
// Build up a /sync response. Add joined rooms.
|
||||
for _, roomID := range joinedRoomIDs {
|
||||
var jr *types.JoinResponse
|
||||
jr, err = p.getJoinResponseForCompleteSync(
|
||||
ctx, roomID, r, &stateFilter, req.Limit, req.Device,
|
||||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||
return from
|
||||
}
|
||||
req.Response.Rooms.Join[roomID] = *jr
|
||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||
var reqMutex sync.Mutex
|
||||
var reqWaitGroup sync.WaitGroup
|
||||
reqWaitGroup.Add(len(joinedRoomIDs))
|
||||
for _, room := range joinedRoomIDs {
|
||||
roomID := room
|
||||
p.queue(func() {
|
||||
defer reqWaitGroup.Done()
|
||||
|
||||
var jr *types.JoinResponse
|
||||
jr, err = p.getJoinResponseForCompleteSync(
|
||||
ctx, roomID, r, &stateFilter, &eventFilter, req.Device,
|
||||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||
return
|
||||
}
|
||||
|
||||
reqMutex.Lock()
|
||||
defer reqMutex.Unlock()
|
||||
req.Response.Rooms.Join[roomID] = *jr
|
||||
req.Rooms[roomID] = gomatrixserverlib.Join
|
||||
})
|
||||
}
|
||||
|
||||
reqWaitGroup.Wait()
|
||||
|
||||
// Add peeked rooms.
|
||||
peeks, err := p.DB.PeeksInRange(ctx, req.Device.UserID, req.Device.ID, r)
|
||||
if err != nil {
|
||||
|
@ -74,7 +124,7 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
if !peek.Deleted {
|
||||
var jr *types.JoinResponse
|
||||
jr, err = p.getJoinResponseForCompleteSync(
|
||||
ctx, peek.RoomID, r, &stateFilter, req.Limit, req.Device,
|
||||
ctx, peek.RoomID, r, &stateFilter, &eventFilter, req.Device,
|
||||
)
|
||||
if err != nil {
|
||||
req.Log.WithError(err).Error("p.getJoinResponseForCompleteSync failed")
|
||||
|
@ -104,8 +154,8 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
var stateDeltas []types.StateDelta
|
||||
var joinedRooms []string
|
||||
|
||||
// TODO: use filter provided in request
|
||||
stateFilter := gomatrixserverlib.DefaultStateFilter()
|
||||
stateFilter := req.Filter.Room.State
|
||||
eventFilter := req.Filter.Room.Timeline
|
||||
|
||||
if req.WantFullState {
|
||||
if stateDeltas, joinedRooms, err = p.DB.GetStateDeltasForFullStateSync(ctx, req.Device, r, req.Device.UserID, &stateFilter); err != nil {
|
||||
|
@ -124,7 +174,7 @@ func (p *PDUStreamProvider) IncrementalSync(
|
|||
}
|
||||
|
||||
for _, delta := range stateDeltas {
|
||||
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, req.Limit, req.Response); err != nil {
|
||||
if err = p.addRoomDeltaToResponse(ctx, req.Device, r, delta, &eventFilter, req.Response); err != nil {
|
||||
req.Log.WithError(err).Error("d.addRoomDeltaToResponse failed")
|
||||
return newPos
|
||||
}
|
||||
|
@ -138,7 +188,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
device *userapi.Device,
|
||||
r types.Range,
|
||||
delta types.StateDelta,
|
||||
numRecentEventsPerRoom int,
|
||||
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
res *types.Response,
|
||||
) error {
|
||||
if delta.MembershipPos > 0 && delta.Membership == gomatrixserverlib.Leave {
|
||||
|
@ -152,7 +202,7 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
}
|
||||
recentStreamEvents, limited, err := p.DB.RecentEvents(
|
||||
ctx, delta.RoomID, r,
|
||||
numRecentEventsPerRoom, true, true,
|
||||
eventFilter, true, true,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -209,7 +259,8 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
|||
roomID string,
|
||||
r types.Range,
|
||||
stateFilter *gomatrixserverlib.StateFilter,
|
||||
numRecentEventsPerRoom int, device *userapi.Device,
|
||||
eventFilter *gomatrixserverlib.RoomEventFilter,
|
||||
device *userapi.Device,
|
||||
) (jr *types.JoinResponse, err error) {
|
||||
var stateEvents []*gomatrixserverlib.HeaderedEvent
|
||||
stateEvents, err = p.DB.CurrentState(ctx, roomID, stateFilter)
|
||||
|
@ -221,7 +272,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
|||
var recentStreamEvents []types.StreamEvent
|
||||
var limited bool
|
||||
recentStreamEvents, limited, err = p.DB.RecentEvents(
|
||||
ctx, roomID, r, numRecentEventsPerRoom, true, true,
|
||||
ctx, roomID, r, eventFilter, true, true,
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
|
@ -16,6 +16,7 @@ package sync
|
|||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -31,14 +32,6 @@ import (
|
|||
const defaultSyncTimeout = time.Duration(0)
|
||||
const DefaultTimelineLimit = 20
|
||||
|
||||
type filter struct {
|
||||
Room struct {
|
||||
Timeline struct {
|
||||
Limit *int `json:"limit"`
|
||||
} `json:"timeline"`
|
||||
} `json:"room"`
|
||||
}
|
||||
|
||||
func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Database) (*types.SyncRequest, error) {
|
||||
timeout := getTimeout(req.URL.Query().Get("timeout"))
|
||||
fullState := req.URL.Query().Get("full_state")
|
||||
|
@ -51,41 +44,37 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
timelineLimit := DefaultTimelineLimit
|
||||
// TODO: read from stored filters too
|
||||
filter := gomatrixserverlib.DefaultFilter()
|
||||
filterQuery := req.URL.Query().Get("filter")
|
||||
if filterQuery != "" {
|
||||
if filterQuery[0] == '{' {
|
||||
// attempt to parse the timeline limit at least
|
||||
var f filter
|
||||
err := json.Unmarshal([]byte(filterQuery), &f)
|
||||
if err == nil && f.Room.Timeline.Limit != nil {
|
||||
timelineLimit = *f.Room.Timeline.Limit
|
||||
// Parse the filter from the query string
|
||||
if err := json.Unmarshal([]byte(filterQuery), &filter); err != nil {
|
||||
return nil, fmt.Errorf("json.Unmarshal: %w", err)
|
||||
}
|
||||
} else {
|
||||
// attempt to load the filter ID
|
||||
// Try to load the filter from the database
|
||||
localpart, _, err := gomatrixserverlib.SplitID('@', device.UserID)
|
||||
if err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("gomatrixserverlib.SplitID failed")
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("gomatrixserverlib.SplitID: %w", err)
|
||||
}
|
||||
f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery)
|
||||
if err == nil {
|
||||
timelineLimit = f.Room.Timeline.Limit
|
||||
if f, err := syncDB.GetFilter(req.Context(), localpart, filterQuery); err != nil {
|
||||
util.GetLogger(req.Context()).WithError(err).Error("syncDB.GetFilter failed")
|
||||
return nil, fmt.Errorf("syncDB.GetFilter: %w", err)
|
||||
} else {
|
||||
filter = *f
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
filter := gomatrixserverlib.DefaultEventFilter()
|
||||
filter.Limit = timelineLimit
|
||||
// TODO: Additional query params: set_presence, filter
|
||||
|
||||
logger := util.GetLogger(req.Context()).WithFields(logrus.Fields{
|
||||
"user_id": device.UserID,
|
||||
"device_id": device.ID,
|
||||
"since": since,
|
||||
"timeout": timeout,
|
||||
"limit": timelineLimit,
|
||||
"limit": filter.Room.Timeline.Limit,
|
||||
})
|
||||
|
||||
return &types.SyncRequest{
|
||||
|
@ -96,7 +85,6 @@ func newSyncRequest(req *http.Request, device userapi.Device, syncDB storage.Dat
|
|||
Filter: filter, //
|
||||
Since: since, //
|
||||
Timeout: timeout, //
|
||||
Limit: timelineLimit, //
|
||||
Rooms: make(map[string]string), // Populated by the PDU stream
|
||||
WantFullState: wantFullState, //
|
||||
}, nil
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/dendrite/setup/kafka"
|
||||
"github.com/matrix-org/dendrite/setup/process"
|
||||
userapi "github.com/matrix-org/dendrite/userapi/api"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
|
||||
|
@ -39,6 +40,7 @@ import (
|
|||
// AddPublicRoutes sets up and registers HTTP handlers for the SyncAPI
|
||||
// component.
|
||||
func AddPublicRoutes(
|
||||
process *process.ProcessContext,
|
||||
router *mux.Router,
|
||||
userAPI userapi.UserInternalAPI,
|
||||
rsAPI api.RoomserverInternalAPI,
|
||||
|
@ -63,7 +65,7 @@ func AddPublicRoutes(
|
|||
requestPool := sync.NewRequestPool(syncDB, cfg, userAPI, keyAPI, rsAPI, streams, notifier)
|
||||
|
||||
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
|
||||
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
||||
process, cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
|
||||
consumer, keyAPI, rsAPI, syncDB, notifier, streams.DeviceListStreamProvider,
|
||||
)
|
||||
if err = keyChangeConsumer.Start(); err != nil {
|
||||
|
@ -71,7 +73,7 @@ func AddPublicRoutes(
|
|||
}
|
||||
|
||||
roomConsumer := consumers.NewOutputRoomEventConsumer(
|
||||
cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
|
||||
process, cfg, consumer, syncDB, notifier, streams.PDUStreamProvider,
|
||||
streams.InviteStreamProvider, rsAPI,
|
||||
)
|
||||
if err = roomConsumer.Start(); err != nil {
|
||||
|
@ -79,28 +81,28 @@ func AddPublicRoutes(
|
|||
}
|
||||
|
||||
clientConsumer := consumers.NewOutputClientDataConsumer(
|
||||
cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
|
||||
process, cfg, consumer, syncDB, notifier, streams.AccountDataStreamProvider,
|
||||
)
|
||||
if err = clientConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start client data consumer")
|
||||
}
|
||||
|
||||
typingConsumer := consumers.NewOutputTypingEventConsumer(
|
||||
cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
|
||||
process, cfg, consumer, syncDB, eduCache, notifier, streams.TypingStreamProvider,
|
||||
)
|
||||
if err = typingConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start typing consumer")
|
||||
}
|
||||
|
||||
sendToDeviceConsumer := consumers.NewOutputSendToDeviceEventConsumer(
|
||||
cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
|
||||
process, cfg, consumer, syncDB, notifier, streams.SendToDeviceStreamProvider,
|
||||
)
|
||||
if err = sendToDeviceConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start send-to-device consumer")
|
||||
}
|
||||
|
||||
receiptConsumer := consumers.NewOutputReceiptEventConsumer(
|
||||
cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
|
||||
process, cfg, consumer, syncDB, notifier, streams.ReceiptStreamProvider,
|
||||
)
|
||||
if err = receiptConsumer.Start(); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start receipts consumer")
|
||||
|
|
|
@ -14,9 +14,8 @@ type SyncRequest struct {
|
|||
Log *logrus.Entry
|
||||
Device *userapi.Device
|
||||
Response *Response
|
||||
Filter gomatrixserverlib.EventFilter
|
||||
Filter gomatrixserverlib.Filter
|
||||
Since StreamingToken
|
||||
Limit int
|
||||
Timeout time.Duration
|
||||
WantFullState bool
|
||||
|
||||
|
|
|
@ -372,7 +372,7 @@ type Response struct {
|
|||
Leave map[string]LeaveResponse `json:"leave"`
|
||||
} `json:"rooms"`
|
||||
ToDevice struct {
|
||||
Events []gomatrixserverlib.SendToDeviceEvent `json:"events,omitempty"`
|
||||
Events []gomatrixserverlib.SendToDeviceEvent `json:"events"`
|
||||
} `json:"to_device"`
|
||||
DeviceLists struct {
|
||||
Changed []string `json:"changed,omitempty"`
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue