add (broken) postgres; advance streampos whenever sync output changes

This commit is contained in:
Matthew Hodgson 2020-09-08 00:56:50 +01:00
parent 843b7a7d04
commit 55c7f2c892
5 changed files with 252 additions and 15 deletions

View file

@ -408,6 +408,9 @@ func (d *Database) syncPositionTx(
ctx context.Context, txn *sql.Tx,
) (sp types.StreamingToken, err error) {
// XXX: it seems very inefficient to be doing all these aggregate selects
// every time we call /sync...
maxEventID, err := d.OutputEvents.SelectMaxEventID(ctx, txn)
if err != nil {
return sp, err
@ -426,6 +429,13 @@ func (d *Database) syncPositionTx(
if maxInviteID > maxEventID {
maxEventID = maxInviteID
}
maxPeekID, err := d.Peeks.SelectMaxPeekID(ctx, txn)
if err != nil {
return sp, err
}
if maxPeekID > maxEventID {
maxEventID = maxPeekID
}
sp = types.NewStreamToken(types.StreamPosition(maxEventID), types.StreamPosition(d.EDUCache.GetLatestSyncPosition()), nil)
return
}
@ -1045,7 +1055,8 @@ func (d *Database) getStateDeltas(
if newPeeks {
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
return d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
_, err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
return err
})
if err != nil {
return nil, nil, err
@ -1150,7 +1161,8 @@ func (d *Database) getStateDeltasForFullStateSync(
if newPeeks {
err = d.Writer.Do(d.DB, txn, func(txn *sql.Tx) error {
return d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
_, err := d.Peeks.MarkPeeksAsOld(ctx, txn, userID, device.ID)
return err
})
if err != nil {
return nil, nil, err