Factor out account data and events table (#1031)

* Factor out account data

* Factor out events table and EDU cache

* linting

* fix npe
This commit is contained in:
Kegsay 2020-05-14 09:53:55 +01:00 committed by GitHub
parent a25d477cdb
commit 9ed68a3125
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 300 additions and 362 deletions

View file

@ -22,7 +22,6 @@ import (
"errors"
"fmt"
"net/url"
"time"
"github.com/sirupsen/logrus"
@ -53,16 +52,13 @@ type stateDelta struct {
// SyncServerDatasource represents a sync server datasource which manages
// both the database for PDUs and caches for EDUs.
type SyncServerDatasource struct {
shared.Database
db *sql.DB
common.PartitionOffsetStatements
streamID streamIDStatements
accountData accountDataStatements
events outputRoomEventsStatements
roomstate currentRoomStateStatements
eduCache *cache.EDUCache
topology outputRoomEventsTopologyStatements
backwardExtremities tables.BackwardsExtremities
shared *shared.Database
}
// NewSyncServerDatasource creates a new sync server database
@ -87,7 +83,6 @@ func NewSyncServerDatasource(dataSourceName string) (*SyncServerDatasource, erro
if err = d.prepare(); err != nil {
return nil, err
}
d.eduCache = cache.New()
return &d, nil
}
@ -98,10 +93,12 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err = d.streamID.prepare(d.db); err != nil {
return err
}
if err = d.accountData.prepare(d.db, &d.streamID); err != nil {
accountData, err := NewSqliteAccountDataTable(d.db, &d.streamID)
if err != nil {
return err
}
if err = d.events.prepare(d.db, &d.streamID); err != nil {
events, err := NewSqliteEventsTable(d.db, &d.streamID)
if err != nil {
return err
}
if err = d.roomstate.prepare(d.db, &d.streamID); err != nil {
@ -118,9 +115,12 @@ func (d *SyncServerDatasource) prepare() (err error) {
if err != nil {
return err
}
d.shared = &shared.Database{
DB: d.db,
Invites: invites,
d.Database = shared.Database{
DB: d.db,
Invites: invites,
AccountData: accountData,
OutputEvents: events,
EDUCache: cache.New(),
}
return nil
}
@ -130,22 +130,6 @@ func (d *SyncServerDatasource) AllJoinedUsersInRooms(ctx context.Context) (map[s
return d.roomstate.selectJoinedUsers(ctx)
}
// Events lookups a list of event by their event ID.
// Returns a list of events matching the requested IDs found in the database.
// If an event is not found in the database then it will be omitted from the list.
// Returns an error if there was a problem talking with the database.
// Does not include any transaction IDs in the returned events.
func (d *SyncServerDatasource) Events(ctx context.Context, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error) {
streamEvents, err := d.events.selectEvents(ctx, nil, eventIDs)
if err != nil {
return nil, err
}
// We don't include a device here as we only include transaction IDs in
// incremental syncs.
return d.StreamEventsToEvents(nil, streamEvents), nil
}
// handleBackwardExtremities adds this event as a backwards extremity if and only if we do not have all of
// the events listed in the event's 'prev_events'. This function also updates the backwards extremities table
// to account for the fact that the given event is no longer a backwards extremity, but may be marked as such.
@ -156,7 +140,7 @@ func (d *SyncServerDatasource) handleBackwardExtremities(ctx context.Context, tx
// Check if we have all of the event's previous events. If an event is
// missing, add it to the room's backward extremities.
prevEvents, err := d.events.selectEvents(ctx, txn, ev.PrevEventIDs())
prevEvents, err := d.Database.OutputEvents.SelectEvents(ctx, txn, ev.PrevEventIDs())
if err != nil {
return err
}
@ -192,7 +176,7 @@ func (d *SyncServerDatasource) WriteEvent(
) (pduPosition types.StreamPosition, returnErr error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
var err error
pos, err := d.events.insertEvent(
pos, err := d.Database.OutputEvents.InsertEvent(
ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID, excludeFromSync,
)
if err != nil {
@ -253,6 +237,19 @@ func (d *SyncServerDatasource) updateRoomState(
return nil
}
// SyncPosition returns the latest positions for syncing.
func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
pos, err := d.syncPositionTx(ctx, txn)
if err != nil {
return err
}
tok = *pos
return nil
})
return
}
// GetStateEvent returns the Matrix state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error
@ -309,46 +306,7 @@ func (d *SyncServerDatasource) GetEventsInTopologicalRange(
}
// Retrieve the events' contents using their IDs.
events, err = d.events.selectEvents(ctx, nil, eIDs)
return
}
// GetEventsInStreamingRange retrieves all of the events on a given ordering using the
// given extremities and limit.
func (d *SyncServerDatasource) GetEventsInStreamingRange(
ctx context.Context,
from, to *types.StreamingToken,
roomID string, limit int,
backwardOrdering bool,
) (events []types.StreamEvent, err error) {
if backwardOrdering {
// When using backward ordering, we want the most recent events first.
if events, err = d.events.selectRecentEvents(
ctx, nil, roomID, to.PDUPosition(), from.PDUPosition(), limit, false, false,
); err != nil {
return
}
} else {
// When using forward ordering, we want the least recent events first.
if events, err = d.events.selectEarlyEvents(
ctx, nil, roomID, from.PDUPosition(), to.PDUPosition(), limit,
); err != nil {
return
}
}
return events, err
}
// SyncPosition returns the latest positions for syncing.
func (d *SyncServerDatasource) SyncPosition(ctx context.Context) (tok types.StreamingToken, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
pos, err := d.syncPositionTx(ctx, txn)
if err != nil {
return err
}
tok = *pos
return nil
})
events, err = d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs)
return
}
@ -378,7 +336,7 @@ func (d *SyncServerDatasource) EventsAtTopologicalPosition(
return nil, err
}
return d.events.selectEvents(ctx, nil, eIDs)
return d.Database.OutputEvents.SelectEvents(ctx, nil, eIDs)
}
func (d *SyncServerDatasource) EventPositionInTopology(
@ -399,18 +357,18 @@ func (d *SyncServerDatasource) SyncStreamPosition(ctx context.Context) (pos type
func (d *SyncServerDatasource) syncStreamPositionTx(
ctx context.Context, txn *sql.Tx,
) (types.StreamPosition, error) {
maxID, err := d.events.selectMaxEventID(ctx, txn)
maxID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn)
if err != nil {
return 0, err
}
maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn)
if err != nil {
return 0, err
}
if maxAccountDataID > maxID {
maxID = maxAccountDataID
}
maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn)
maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn)
if err != nil {
return 0, err
}
@ -424,18 +382,18 @@ func (d *SyncServerDatasource) syncPositionTx(
ctx context.Context, txn *sql.Tx,
) (*types.StreamingToken, error) {
maxEventID, err := d.events.selectMaxEventID(ctx, txn)
maxEventID, err := d.Database.OutputEvents.SelectMaxEventID(ctx, txn)
if err != nil {
return nil, err
}
maxAccountDataID, err := d.accountData.selectMaxAccountDataID(ctx, txn)
maxAccountDataID, err := d.Database.AccountData.SelectMaxAccountDataID(ctx, txn)
if err != nil {
return nil, err
}
if maxAccountDataID > maxEventID {
maxEventID = maxAccountDataID
}
maxInviteID, err := d.shared.Invites.SelectMaxInviteID(ctx, txn)
maxInviteID, err := d.Database.Invites.SelectMaxInviteID(ctx, txn)
if err != nil {
return nil, err
}
@ -444,7 +402,7 @@ func (d *SyncServerDatasource) syncPositionTx(
}
sp := types.NewStreamToken(
types.StreamPosition(maxEventID),
types.StreamPosition(d.eduCache.GetLatestSyncPosition()),
types.StreamPosition(d.Database.EDUCache.GetLatestSyncPosition()),
)
return &sp, nil
}
@ -518,7 +476,7 @@ func (d *SyncServerDatasource) addTypingDeltaToResponse(
var ok bool
var err error
for _, roomID := range joinedRoomIDs {
if typingUsers, updated := d.eduCache.GetTypingUsersIfUpdatedAfter(
if typingUsers, updated := d.Database.EDUCache.GetTypingUsersIfUpdatedAfter(
roomID, int64(since.EDUPosition()),
); updated {
ev := gomatrixserverlib.ClientEvent{
@ -654,7 +612,7 @@ func (d *SyncServerDatasource) getResponseWithPDUsForCompleteSync(
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []types.StreamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
recentStreamEvents, err = d.Database.OutputEvents.SelectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), toPos.PDUPosition(),
numRecentEventsPerRoom, true, true,
)
@ -729,78 +687,13 @@ var txReadOnlySnapshot = sql.TxOptions{
ReadOnly: true,
}
// GetAccountDataInRange returns all account data for a given user inserted or
// updated between two given positions
// Returns a map following the format data[roomID] = []dataTypes
// If no data is retrieved, returns an empty map
// If there was an issue with the retrieval, returns an error
func (d *SyncServerDatasource) GetAccountDataInRange(
ctx context.Context, userID string, oldPos, newPos types.StreamPosition,
accountDataFilterPart *gomatrixserverlib.EventFilter,
) (map[string][]string, error) {
return d.accountData.selectAccountDataInRange(ctx, userID, oldPos, newPos, accountDataFilterPart)
}
// UpsertAccountData keeps track of new or updated account data, by saving the type
// of the new/updated data, and the user ID and room ID the data is related to (empty)
// room ID means the data isn't specific to any room)
// If no data with the given type, user ID and room ID exists in the database,
// creates a new row, else update the existing one
// Returns an error if there was an issue with the upsert
func (d *SyncServerDatasource) UpsertAccountData(
ctx context.Context, userID, roomID, dataType string,
) (sp types.StreamPosition, err error) {
err = common.WithTransaction(d.db, func(txn *sql.Tx) error {
sp, err = d.accountData.insertAccountData(ctx, txn, userID, roomID, dataType)
return err
})
return
}
// AddInviteEvent stores a new invite event for a user.
// If the invite was successfully stored this returns the stream ID it was stored at.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) AddInviteEvent(
ctx context.Context, inviteEvent gomatrixserverlib.HeaderedEvent,
) (sp types.StreamPosition, err error) {
return d.shared.AddInviteEvent(ctx, inviteEvent)
}
// RetireInviteEvent removes an old invite event from the database.
// Returns an error if there was a problem communicating with the database.
func (d *SyncServerDatasource) RetireInviteEvent(
ctx context.Context, inviteEventID string,
) error {
return d.shared.RetireInviteEvent(ctx, inviteEventID)
}
func (d *SyncServerDatasource) SetTypingTimeoutCallback(fn cache.TimeoutCallbackFn) {
d.eduCache.SetTimeoutCallback(fn)
}
// AddTypingUser adds a typing user to the typing cache.
// Returns the newly calculated sync position for typing notifications.
func (d *SyncServerDatasource) AddTypingUser(
userID, roomID string, expireTime *time.Time,
) types.StreamPosition {
return types.StreamPosition(d.eduCache.AddTypingUser(userID, roomID, expireTime))
}
// RemoveTypingUser removes a typing user from the typing cache.
// Returns the newly calculated sync position for typing notifications.
func (d *SyncServerDatasource) RemoveTypingUser(
userID, roomID string,
) types.StreamPosition {
return types.StreamPosition(d.eduCache.RemoveUser(userID, roomID))
}
func (d *SyncServerDatasource) addInvitesToResponse(
ctx context.Context, txn *sql.Tx,
userID string,
fromPos, toPos types.StreamPosition,
res *types.Response,
) error {
invites, err := d.shared.Invites.SelectInviteEventsInRange(
invites, err := d.Database.Invites.SelectInviteEventsInRange(
ctx, txn, userID, fromPos, toPos,
)
if err != nil {
@ -853,7 +746,7 @@ func (d *SyncServerDatasource) addRoomDeltaToResponse(
// This is all "okay" assuming history_visibility == "shared" which it is by default.
endPos = delta.membershipPos
}
recentStreamEvents, err := d.events.selectRecentEvents(
recentStreamEvents, err := d.Database.OutputEvents.SelectRecentEvents(
ctx, txn, delta.roomID, types.StreamPosition(fromPos), types.StreamPosition(endPos),
numRecentEventsPerRoom, true, true,
)
@ -943,7 +836,7 @@ func (d *SyncServerDatasource) fetchMissingStateEvents(
) ([]types.StreamEvent, error) {
// Fetch from the events table first so we pick up the stream ID for the
// event.
events, err := d.events.selectEvents(ctx, txn, eventIDs)
events, err := d.Database.OutputEvents.SelectEvents(ctx, txn, eventIDs)
if err != nil {
return nil, err
}
@ -997,7 +890,7 @@ func (d *SyncServerDatasource) getStateDeltas(
var deltas []stateDelta
// get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
if err != nil {
return nil, nil, err
}
@ -1083,7 +976,7 @@ func (d *SyncServerDatasource) getStateDeltasForFullStateSync(
}
// Get all the state events ever between these two positions
stateNeeded, eventMap, err := d.events.selectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
stateNeeded, eventMap, err := d.Database.OutputEvents.SelectStateInRange(ctx, txn, fromPos, toPos, stateFilterPart)
if err != nil {
return nil, nil, err
}