Use a read-only snapshot transaction for calculating sync responses (#236)

* Use a read-only snapshot transaction for calculating sync responses

* gb vendor update github.com/lib/pq
This commit is contained in:
Mark Haines 2017-09-19 16:22:02 +01:00 committed by GitHub
parent 08b9940dde
commit fbc4477be0
20 changed files with 1374 additions and 527 deletions

View file

@ -187,114 +187,145 @@ func (d *SyncServerDatabase) IncrementalSync(
userID string,
fromPos, toPos types.StreamPosition,
numRecentEventsPerRoom int,
) (res *types.Response, returnErr error) {
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
// Work out which rooms to return in the response. This is done by getting not only the currently
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID)
) (*types.Response, error) {
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
return nil, err
}
var succeeded bool
defer common.EndTransaction(txn, &succeeded)
// Work out which rooms to return in the response. This is done by getting not only the currently
// joined rooms, but also which rooms have membership transitions for this user between the 2 stream positions.
// This works out what the 'state' key should be for each room as well as which membership block
// to put the room into.
deltas, err := d.getStateDeltas(ctx, txn, fromPos, toPos, userID)
if err != nil {
return nil, err
}
res := types.NewResponse(toPos)
for _, delta := range deltas {
endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
endPos = delta.membershipPos
}
var recentStreamEvents []streamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
)
if err != nil {
return err
return nil, err
}
recentEvents := streamEventsToEvents(recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
res = types.NewResponse(toPos)
for _, delta := range deltas {
endPos := toPos
if delta.membershipPos > 0 && delta.membership == "leave" {
// make sure we don't leak recent events after the leave event.
// TODO: History visibility makes this somewhat complex to handle correctly. For example:
// TODO: This doesn't work for join -> leave in a single /sync request (see events prior to join).
// TODO: This will fail on join -> leave -> sensitive msg -> join -> leave
// in a single /sync request
// This is all "okay" assuming history_visibility == "shared" which it is by default.
endPos = delta.membershipPos
}
recentStreamEvents, err := d.events.selectRecentEvents(
ctx, txn, delta.roomID, fromPos, endPos, numRecentEventsPerRoom,
)
if err != nil {
return err
}
recentEvents := streamEventsToEvents(recentStreamEvents)
delta.stateEvents = removeDuplicates(delta.stateEvents, recentEvents) // roll back
switch delta.membership {
case "join":
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case "leave":
fallthrough // transitions to leave are the same as ban
case "ban":
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
switch delta.membership {
case "join":
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
jr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[delta.roomID] = *jr
case "leave":
fallthrough // transitions to leave are the same as ban
case "ban":
// TODO: recentEvents may contain events that this user is not allowed to see because they are
// no longer in the room.
lr := types.NewLeaveResponse()
lr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
lr.Timeline.Limited = false // TODO: if len(events) >= numRecents + 1 and then set limited:true
lr.State.Events = gomatrixserverlib.ToClientEvents(delta.stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Leave[delta.roomID] = *lr
}
}
// TODO: This should be done in getStateDeltas
return d.addInvitesToResponse(ctx, txn, userID, res)
})
return
// TODO: This should be done in getStateDeltas
if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil {
return nil, err
}
succeeded = true
return res, nil
}
// CompleteSync a complete /sync API response for the given user.
func (d *SyncServerDatabase) CompleteSync(
ctx context.Context, userID string, numRecentEventsPerRoom int,
) (res *types.Response, returnErr error) {
) (*types.Response, error) {
// This needs to be all done in a transaction as we need to do multiple SELECTs, and we need to have
// a consistent view of the database throughout. This includes extracting the sync stream position.
// This does have the unfortunate side-effect that all the matrixy logic resides in this function,
// but it's better to not hide the fact that this is being done in a transaction.
returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error {
// Get the current stream position which we will base the sync response on.
id, err := d.events.selectMaxID(ctx, txn)
txn, err := d.db.BeginTx(ctx, &txReadOnlySnapshot)
if err != nil {
return nil, err
}
var succeeded bool
defer common.EndTransaction(txn, &succeeded)
// Get the current stream position which we will base the sync response on.
id, err := d.events.selectMaxID(ctx, txn)
if err != nil {
return nil, err
}
pos := types.StreamPosition(id)
// Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
if err != nil {
return nil, err
}
// Build up a /sync response. Add joined rooms.
res := types.NewResponse(pos)
for _, roomID := range roomIDs {
var stateEvents []gomatrixserverlib.Event
stateEvents, err = d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return err
return nil, err
}
pos := types.StreamPosition(id)
// Extract room state and recent events for all rooms the user is joined to.
roomIDs, err := d.roomstate.selectRoomIDsWithMembership(ctx, txn, userID, "join")
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
var recentStreamEvents []streamEvent
recentStreamEvents, err = d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
)
if err != nil {
return err
return nil, err
}
recentEvents := streamEventsToEvents(recentStreamEvents)
// Build up a /sync response. Add joined rooms.
res = types.NewResponse(pos)
for _, roomID := range roomIDs {
stateEvents, err := d.roomstate.selectCurrentState(ctx, txn, roomID)
if err != nil {
return err
}
// TODO: When filters are added, we may need to call this multiple times to get enough events.
// See: https://github.com/matrix-org/synapse/blob/v0.19.3/synapse/handlers/sync.py#L316
recentStreamEvents, err := d.events.selectRecentEvents(
ctx, txn, roomID, types.StreamPosition(0), pos, numRecentEventsPerRoom,
)
if err != nil {
return err
}
recentEvents := streamEventsToEvents(recentStreamEvents)
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
stateEvents = removeDuplicates(stateEvents, recentEvents)
jr := types.NewJoinResponse()
jr.Timeline.Events = gomatrixserverlib.ToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = true
jr.State.Events = gomatrixserverlib.ToClientEvents(stateEvents, gomatrixserverlib.FormatSync)
res.Rooms.Join[roomID] = *jr
}
if err = d.addInvitesToResponse(ctx, txn, userID, res); err != nil {
return nil, err
}
return d.addInvitesToResponse(ctx, txn, userID, res)
})
return
succeeded = true
return res, err
}
var txReadOnlySnapshot = sql.TxOptions{
// Set the isolation level so that we see a snapshot of the database.
// In PostgreSQL repeatable read transactions will see a snapshot taken
// at the first query, and since the transaction is read-only it can't
// run into any serialisation errors.
// https://www.postgresql.org/docs/9.5/static/transaction-iso.html#XACT-REPEATABLE-READ
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
}
// GetAccountDataInRange returns all account data for a given user inserted or