From 837f50ac89c54416a3fc8a463055e58e5554b6b5 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Wed, 3 Nov 2021 09:53:37 +0000 Subject: [PATCH 1/2] Reduce CPU usage of SelectStateInRange (#2038) --- syncapi/storage/postgres/output_room_events_table.go | 9 +++++---- syncapi/storage/sqlite3/output_room_events_table.go | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/syncapi/storage/postgres/output_room_events_table.go b/syncapi/storage/postgres/output_room_events_table.go index bd7aa018..44de02c9 100644 --- a/syncapi/storage/postgres/output_room_events_table.go +++ b/syncapi/storage/postgres/output_room_events_table.go @@ -116,7 +116,7 @@ const updateEventJSONSQL = "" + // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). const selectStateInRangeSQL = "" + - "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" + @@ -221,13 +221,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool addIDs pq.StringArray delIDs pq.StringArray ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { return nil, nil, err } // Sanity check for deleted state and whine if we see it. We don't need to do anything @@ -243,7 +244,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -258,7 +259,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = types.StreamEvent{ + eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync, diff --git a/syncapi/storage/sqlite3/output_room_events_table.go b/syncapi/storage/sqlite3/output_room_events_table.go index 37f7ea00..afdbe55c 100644 --- a/syncapi/storage/sqlite3/output_room_events_table.go +++ b/syncapi/storage/sqlite3/output_room_events_table.go @@ -81,7 +81,7 @@ const updateEventJSONSQL = "" + "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" const selectStateInRangeSQL = "" + - "SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + " FROM syncapi_output_room_events" + " WHERE (id > $1 AND id <= $2)" + " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" @@ -173,13 +173,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange( for rows.Next() { var ( + eventID string streamPos types.StreamPosition eventBytes []byte excludeFromSync bool addIDsJSON string delIDsJSON string ) - if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { + if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { return nil, nil, err } @@ -201,7 +202,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( // TODO: Handle redacted events var ev gomatrixserverlib.HeaderedEvent - if err := json.Unmarshal(eventBytes, &ev); err != nil { + if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil { return nil, nil, err } needSet := stateNeeded[ev.RoomID()] @@ -216,7 +217,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = types.StreamEvent{ + eventIDToEvent[eventID] = types.StreamEvent{ HeaderedEvent: &ev, StreamPosition: streamPos, ExcludeFromSync: excludeFromSync, From 3c049c068f6162e3708da9a5375dff3b33ce008a Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Thu, 4 Nov 2021 09:09:26 +0000 Subject: [PATCH 2/2] State resolution v2 performance improvements (reduce allocs in Kahn's algorithm, update to matrix-org/gomatrixserverlib@91dadfb) --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 0a223f5a..686e9600 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 - github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1 + github.com/matrix-org/gomatrixserverlib v0.0.0-20211104090712-91dadfbef53b github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 diff --git a/go.sum b/go.sum index 21f4807b..23045726 100644 --- a/go.sum +++ b/go.sum @@ -993,8 +993,8 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1 github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= -github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1 h1:Pv7+98sreiHltpamJ4em6RCX/WPVN1wl53Gli9Cz744= -github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= +github.com/matrix-org/gomatrixserverlib v0.0.0-20211104090712-91dadfbef53b h1:D2Oyvn+OJOtAHcKDWu5bKo6H92f77dBDncFAFViMxrM= +github.com/matrix-org/gomatrixserverlib v0.0.0-20211104090712-91dadfbef53b/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0 h1:HZCzy4oVzz55e+cOMiX/JtSF2UOY1evBl2raaE7ACcU= github.com/matrix-org/naffka v0.0.0-20210623111924-14ff508b58e0/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE= github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM=