Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2020-05-13 16:28:42 +00:00
|
|
|
package tables
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"database/sql"
|
|
|
|
|
2020-05-14 08:53:55 +00:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
2020-05-13 16:28:42 +00:00
|
|
|
"github.com/matrix-org/dendrite/syncapi/types"
|
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
|
|
)
|
|
|
|
|
2020-05-14 08:53:55 +00:00
|
|
|
type AccountData interface {
|
|
|
|
InsertAccountData(ctx context.Context, txn *sql.Tx, userID, roomID, dataType string) (pos types.StreamPosition, err error)
|
2020-05-15 08:41:12 +00:00
|
|
|
// SelectAccountDataInRange returns a map of room ID to a list of `dataType`.
|
|
|
|
SelectAccountDataInRange(ctx context.Context, userID string, r types.Range, accountDataEventFilter *gomatrixserverlib.EventFilter) (data map[string][]string, err error)
|
2020-05-14 08:53:55 +00:00
|
|
|
SelectMaxAccountDataID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
|
|
|
}
|
|
|
|
|
2020-05-13 16:28:42 +00:00
|
|
|
type Invites interface {
|
|
|
|
InsertInviteEvent(ctx context.Context, txn *sql.Tx, inviteEvent gomatrixserverlib.HeaderedEvent) (streamPos types.StreamPosition, err error)
|
2020-06-26 10:07:52 +00:00
|
|
|
DeleteInviteEvent(ctx context.Context, inviteEventID string) (types.StreamPosition, error)
|
2020-05-15 08:41:12 +00:00
|
|
|
// SelectInviteEventsInRange returns a map of room ID to invite events.
|
2020-06-26 10:07:52 +00:00
|
|
|
SelectInviteEventsInRange(ctx context.Context, txn *sql.Tx, targetUserID string, r types.Range) (invites map[string]gomatrixserverlib.HeaderedEvent, retired map[string]gomatrixserverlib.HeaderedEvent, err error)
|
2020-05-13 16:28:42 +00:00
|
|
|
SelectMaxInviteID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
|
|
|
}
|
2020-05-14 08:53:55 +00:00
|
|
|
|
|
|
|
type Events interface {
|
2020-05-15 08:41:12 +00:00
|
|
|
SelectStateInRange(ctx context.Context, txn *sql.Tx, r types.Range, stateFilter *gomatrixserverlib.StateFilter) (map[string]map[string]bool, map[string]types.StreamEvent, error)
|
2020-05-14 08:53:55 +00:00
|
|
|
SelectMaxEventID(ctx context.Context, txn *sql.Tx) (id int64, err error)
|
|
|
|
InsertEvent(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, addState, removeState []string, transactionID *api.TransactionID, excludeFromSync bool) (streamPos types.StreamPosition, err error)
|
2020-05-15 08:41:12 +00:00
|
|
|
// SelectRecentEvents returns events between the two stream positions: exclusive of low and inclusive of high.
|
2020-05-14 16:30:16 +00:00
|
|
|
// If onlySyncEvents has a value of true, only returns the events that aren't marked as to exclude from sync.
|
|
|
|
// Returns up to `limit` events.
|
2020-05-15 08:41:12 +00:00
|
|
|
SelectRecentEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, error)
|
|
|
|
// SelectEarlyEvents returns the earliest events in the given room.
|
|
|
|
SelectEarlyEvents(ctx context.Context, txn *sql.Tx, roomID string, r types.Range, limit int) ([]types.StreamEvent, error)
|
2020-05-14 08:53:55 +00:00
|
|
|
SelectEvents(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
|
|
|
}
|
2020-05-14 15:11:37 +00:00
|
|
|
|
2020-05-14 16:30:16 +00:00
|
|
|
// Topology keeps track of the depths and stream positions for all events.
|
|
|
|
// These positions are used as types.TopologyToken when backfilling events locally.
|
2020-05-14 15:11:37 +00:00
|
|
|
type Topology interface {
|
2020-05-14 16:30:16 +00:00
|
|
|
// InsertEventInTopology inserts the given event in the room's topology, based on the event's depth.
|
|
|
|
// `pos` is the stream position of this event in the events table, and is used to order events which have the same depth.
|
2020-05-14 15:11:37 +00:00
|
|
|
InsertEventInTopology(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, pos types.StreamPosition) (err error)
|
2020-05-14 16:30:16 +00:00
|
|
|
// SelectEventIDsInRange selects the IDs of events whose depths are within a given range in a given room's topological order.
|
|
|
|
// Events with `minDepth` are *exclusive*, as is the event which has exactly `minDepth`,`maxStreamPos`.
|
|
|
|
// `maxStreamPos` is only used when events have the same depth as `maxDepth`, which results in events less than `maxStreamPos` being returned.
|
2020-05-14 15:11:37 +00:00
|
|
|
// Returns an empty slice if no events match the given range.
|
2020-05-14 16:30:16 +00:00
|
|
|
SelectEventIDsInRange(ctx context.Context, txn *sql.Tx, roomID string, minDepth, maxDepth, maxStreamPos types.StreamPosition, limit int, chronologicalOrder bool) (eventIDs []string, err error)
|
|
|
|
// SelectPositionInTopology returns the depth and stream position of a given event in the topology of the room it belongs to.
|
|
|
|
SelectPositionInTopology(ctx context.Context, txn *sql.Tx, eventID string) (depth, spos types.StreamPosition, err error)
|
|
|
|
// SelectMaxPositionInTopology returns the event which has the highest depth, and if there are multiple, the event with the highest stream position.
|
|
|
|
SelectMaxPositionInTopology(ctx context.Context, txn *sql.Tx, roomID string) (depth types.StreamPosition, spos types.StreamPosition, err error)
|
2020-05-14 15:11:37 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type CurrentRoomState interface {
|
|
|
|
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
|
|
|
|
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]types.StreamEvent, error)
|
|
|
|
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, membership *string, addedAt types.StreamPosition) error
|
|
|
|
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
|
|
|
|
// SelectCurrentState returns all the current state events for the given room.
|
|
|
|
SelectCurrentState(ctx context.Context, txn *sql.Tx, roomID string, stateFilter *gomatrixserverlib.StateFilter) ([]gomatrixserverlib.HeaderedEvent, error)
|
|
|
|
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
|
|
|
|
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
|
|
|
|
// SelectJoinedUsers returns a map of room ID to a list of joined user IDs.
|
|
|
|
SelectJoinedUsers(ctx context.Context) (map[string][]string, error)
|
|
|
|
}
|
|
|
|
|
|
|
|
// BackwardsExtremities keeps track of backwards extremities for a room.
|
|
|
|
// Backwards extremities are the earliest (DAG-wise) known events which we have
|
|
|
|
// the entire event JSON. These event IDs are used in federation requests to fetch
|
|
|
|
// even earlier events.
|
|
|
|
//
|
|
|
|
// We persist the previous event IDs as well, one per row, so when we do fetch even
|
|
|
|
// earlier events we can simply delete rows which referenced it. Consider the graph:
|
|
|
|
// A
|
|
|
|
// | Event C has 1 prev_event ID: A.
|
|
|
|
// B C
|
|
|
|
// |___| Event D has 2 prev_event IDs: B and C.
|
|
|
|
// |
|
|
|
|
// D
|
|
|
|
// The earliest known event we have is D, so this table has 2 rows.
|
|
|
|
// A backfill request gives us C but not B. We delete rows where prev_event=C. This
|
|
|
|
// still means that D is a backwards extremity as we do not have event B. However, event
|
|
|
|
// C is *also* a backwards extremity at this point as we do not have event A. Later,
|
|
|
|
// when we fetch event B, we delete rows where prev_event=B. This then removes D as
|
|
|
|
// a backwards extremity because there are no more rows with event_id=B.
|
|
|
|
type BackwardsExtremities interface {
|
|
|
|
// InsertsBackwardExtremity inserts a new backwards extremity.
|
|
|
|
InsertsBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, eventID string, prevEventID string) (err error)
|
2020-05-20 15:04:31 +00:00
|
|
|
// SelectBackwardExtremitiesForRoom retrieves all backwards extremities for the room, as a map of event_id to list of prev_event_ids.
|
|
|
|
SelectBackwardExtremitiesForRoom(ctx context.Context, roomID string) (bwExtrems map[string][]string, err error)
|
2020-05-14 15:11:37 +00:00
|
|
|
// DeleteBackwardExtremity removes a backwards extremity for a room, if one existed.
|
|
|
|
DeleteBackwardExtremity(ctx context.Context, txn *sql.Tx, roomID, knownEventID string) (err error)
|
|
|
|
}
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
|
|
|
|
// SendToDevice tracks send-to-device messages which are sent to individual
|
|
|
|
// clients. Each message gets inserted into this table at the point that we
|
|
|
|
// receive it from the EDU server.
|
|
|
|
//
|
|
|
|
// We're supposed to try and do our best to deliver send-to-device messages
|
|
|
|
// once, but the only way that we can really guarantee that they have been
|
|
|
|
// delivered is if the client successfully requests the next sync as given
|
|
|
|
// in the next_batch. Each time the device syncs, we will request all of the
|
|
|
|
// updates that either haven't been sent yet, along with all updates that we
|
|
|
|
// *have* sent but we haven't confirmed to have been received yet. If it's the
|
|
|
|
// first time we're sending a given update then we update the table to say
|
|
|
|
// what the "since" parameter was when we tried to send it.
|
|
|
|
//
|
|
|
|
// When the client syncs again, if their "since" parameter is *later* than
|
|
|
|
// the recorded one, we drop the entry from the DB as it's "sent". If the
|
|
|
|
// sync parameter isn't later then we will keep including the updates in the
|
|
|
|
// sync response, as the client is seemingly trying to repeat the same /sync.
|
|
|
|
type SendToDevice interface {
|
|
|
|
InsertSendToDeviceMessage(ctx context.Context, txn *sql.Tx, userID, deviceID, content string) (err error)
|
|
|
|
SelectSendToDeviceMessages(ctx context.Context, txn *sql.Tx, userID, deviceID string) (events []types.SendToDeviceEvent, err error)
|
|
|
|
UpdateSentSendToDeviceMessages(ctx context.Context, txn *sql.Tx, token string, nids []types.SendToDeviceNID) (err error)
|
|
|
|
DeleteSendToDeviceMessages(ctx context.Context, txn *sql.Tx, nids []types.SendToDeviceNID) (err error)
|
|
|
|
CountSendToDeviceMessages(ctx context.Context, txn *sql.Tx, userID, deviceID string) (count int, err error)
|
|
|
|
}
|