Try to close rows more

This commit is contained in:
Neil Alexander 2021-04-19 14:09:27 +01:00
parent 4c2d2dbe03
commit f975caa1b6
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944

View file

@ -25,6 +25,13 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type stateBlockData struct {
StateSnapshotNID types.StateSnapshotNID
RoomNID types.RoomNID
StateBlockNID types.StateBlockNID
EventNIDs types.EventNIDs
}
func LoadStateBlocksRefactor(m *sqlutil.Migrations) { func LoadStateBlocksRefactor(m *sqlutil.Migrations) {
m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor) m.AddMigration(UpStateBlocksRefactor, DownStateBlocksRefactor)
} }
@ -76,10 +83,10 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
var lastsnapshot types.StateSnapshotNID var lastsnapshot types.StateSnapshotNID
var newblocks types.StateBlockNIDs var newblocks types.StateBlockNIDs
var snapshots *sql.Rows var snapshotrows *sql.Rows
for ; batchoffset < snapshotcount; batchoffset += batchsize { for ; batchoffset < snapshotcount; batchoffset += batchsize {
snapshots, err = tx.Query(` snapshotrows, err = tx.Query(`
SELECT SELECT
state_snapshot_nid, state_snapshot_nid,
room_nid, room_nid,
@ -109,24 +116,29 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
return fmt.Errorf("tx.Query: %w", err) return fmt.Errorf("tx.Query: %w", err)
} }
for snapshots.Next() { var snapshots []stateBlockData
logrus.Warnf("Performing %d to %d...", batchoffset, batchoffset+batchsize)
var snapshot types.StateSnapshotNID for snapshotrows.Next() {
var room types.RoomNID logrus.Warnf("Performing %d to %d...", batchoffset, batchoffset+batchsize)
var block types.StateBlockNID var snapshot stateBlockData
var eventsarray pq.Int64Array var eventsarray pq.Int64Array
if err = snapshots.Scan(&snapshot, &room, &block, &eventsarray); err != nil { if err = snapshotrows.Scan(&snapshot.StateSnapshotNID, &snapshot.RoomNID, &snapshot.StateBlockNID, &eventsarray); err != nil {
return fmt.Errorf("rows.Scan: %w", err) return fmt.Errorf("rows.Scan: %w", err)
} }
var events types.EventNIDs
for _, e := range eventsarray { for _, e := range eventsarray {
events = append(events, types.EventNID(e)) snapshot.EventNIDs = append(snapshot.EventNIDs, types.EventNID(e))
} }
events = events[:util.SortAndUnique(events)] snapshot.EventNIDs = snapshot.EventNIDs[:util.SortAndUnique(snapshot.EventNIDs)]
eventsarray = eventsarray[:0] snapshots = append(snapshots, snapshot)
for _, e := range events { }
if err = snapshotrows.Close(); err != nil {
return fmt.Errorf("snapshots.Close: %w", err)
}
for _, snapshot := range snapshots {
var eventsarray pq.Int64Array
for _, e := range snapshot.EventNIDs {
eventsarray = append(eventsarray, int64(e)) eventsarray = append(eventsarray, int64(e))
} }
@ -142,14 +154,14 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
} }
newblocks = append(newblocks, blocknid) newblocks = append(newblocks, blocknid)
if snapshot != lastsnapshot { if snapshot.StateSnapshotNID != lastsnapshot {
var newsnapshot types.StateSnapshotNID var newsnapshot types.StateSnapshotNID
err = tx.QueryRow(` err = tx.QueryRow(`
INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids) INSERT INTO roomserver_state_snapshots (room_nid, state_block_nids)
VALUES ($1, $2) VALUES ($1, $2)
ON CONFLICT (room_nid, state_block_nids) DO UPDATE SET room_nid=$3 ON CONFLICT (room_nid, state_block_nids) DO UPDATE SET room_nid=$1
RETURNING state_snapshot_nid RETURNING state_snapshot_nid
`, room, newblocks, room).Scan(&newsnapshot) `, snapshot.RoomNID, newblocks).Scan(&newsnapshot)
if err != nil { if err != nil {
return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err) return fmt.Errorf("tx.QueryRow.Scan (insert new snapshot): %w", err)
} }
@ -165,13 +177,9 @@ func UpStateBlocksRefactor(tx *sql.Tx) error {
fmt.Println("Rewrote snapshot", snapshot, "to", newsnapshot) fmt.Println("Rewrote snapshot", snapshot, "to", newsnapshot)
newblocks = newblocks[:0] newblocks = newblocks[:0]
lastsnapshot = snapshot lastsnapshot = snapshot.StateSnapshotNID
} }
} }
if err = snapshots.Close(); err != nil {
return fmt.Errorf("snapshots.Close: %w", err)
}
} }
if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil { if _, err = tx.Exec(`DROP TABLE _roomserver_state_snapshots;`); err != nil {