// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package sqlite3 import ( "context" "database/sql" "math" "github.com/Shopify/sarama" "github.com/matrix-org/dendrite/internal" "github.com/matrix-org/dendrite/keyserver/storage/tables" ) var keyChangesSchema = ` -- Stores key change information about users. Used to determine when to send updated device lists to clients. CREATE TABLE IF NOT EXISTS keyserver_key_changes ( change_id INTEGER PRIMARY KEY AUTOINCREMENT, -- The key owner user_id TEXT NOT NULL, UNIQUE (user_id) ); ` // Replace based on user ID. We don't care how many times the user's keys have changed, only that they // have changed, hence we can just keep bumping the change ID for this user. const upsertKeyChangeSQL = "" + "INSERT INTO keyserver_key_changes (user_id)" + " VALUES ($1)" + " ON CONFLICT" + // this only works because we rely on a single writer " DO UPDATE SET change_id = change_id + 1" + " RETURNING change_id" // select the highest offset for each user in the range. The grouping by user gives distinct entries and then we just // take the max offset value as the latest offset. const selectKeyChangesSQL = "" + "SELECT user_id, MAX(change_id) FROM keyserver_key_changes WHERE change_id > $1 AND change_id <= $2 GROUP BY user_id" type keyChangesStatements struct { db *sql.DB upsertKeyChangeStmt *sql.Stmt selectKeyChangesStmt *sql.Stmt } func NewSqliteKeyChangesTable(db *sql.DB) (tables.KeyChanges, error) { s := &keyChangesStatements{ db: db, } _, err := db.Exec(keyChangesSchema) if err != nil { return nil, err } if s.upsertKeyChangeStmt, err = db.Prepare(upsertKeyChangeSQL); err != nil { return nil, err } if s.selectKeyChangesStmt, err = db.Prepare(selectKeyChangesSQL); err != nil { return nil, err } return s, nil } func (s *keyChangesStatements) InsertKeyChange(ctx context.Context, userID string) (changeID int64, err error) { err = s.upsertKeyChangeStmt.QueryRowContext(ctx, userID).Scan(&changeID) return } func (s *keyChangesStatements) SelectKeyChanges( ctx context.Context, fromOffset, toOffset int64, ) (userIDs []string, latestOffset int64, err error) { if toOffset == sarama.OffsetNewest { toOffset = math.MaxInt64 } latestOffset = fromOffset rows, err := s.selectKeyChangesStmt.QueryContext(ctx, fromOffset, toOffset) if err != nil { return nil, 0, err } defer internal.CloseAndLogIfError(ctx, rows, "selectKeyChangesStmt: rows.close() failed") for rows.Next() { var userID string var offset int64 if err := rows.Scan(&userID, &offset); err != nil { return nil, 0, err } if offset > latestOffset { latestOffset = offset } userIDs = append(userIDs, userID) } return }