mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Add metrics for the various ways of calculating room state (#49)
This commit is contained in:
parent
e347aa05fe
commit
e3f3eb8f3d
2 changed files with 126 additions and 11 deletions
|
@ -5,6 +5,7 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/input"
|
||||
"github.com/matrix-org/dendrite/roomserver/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
sarama "gopkg.in/Shopify/sarama.v1"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
|
@ -71,6 +72,8 @@ func main() {
|
|||
|
||||
queryAPI.SetupHTTP(http.DefaultServeMux)
|
||||
|
||||
http.DefaultServeMux.Handle("/metrics", prometheus.Handler())
|
||||
|
||||
fmt.Println("Started roomserver")
|
||||
|
||||
// TODO: Implement clean shutdown.
|
||||
|
|
|
@ -6,9 +6,106 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/types"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/matrix-org/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"sort"
|
||||
"time"
|
||||
)
|
||||
|
||||
var calculateStateDurations = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "dendrite",
|
||||
Subsystem: "roomserver",
|
||||
Name: "calculate_state_duration_microseconds",
|
||||
Help: "How long it takes to calculate the state after a list of events",
|
||||
},
|
||||
// Takes two labels:
|
||||
// algorithm:
|
||||
// The algorithm used to calculate the state or the step it failed on if it failed.
|
||||
// Labels starting with "_" are used to indicate when the algorithm fails halfway.
|
||||
// outcome:
|
||||
// Whether the state was successfully calculated.
|
||||
//
|
||||
// The possible values for algorithm are:
|
||||
// empty_state -> The list of events was empty so the state is empty.
|
||||
// no_change -> The state hasn't changed.
|
||||
// single_delta -> There was a single event added to the state in a way that can be encoded as a single delta
|
||||
// full_state_no_conflicts -> We created a new copy of the full room state, but didn't enounter any conflicts
|
||||
// while doing so.
|
||||
// full_state_with_conflicts -> We created a new copy of the full room state and had to resolve conflicts to do so.
|
||||
// _load_state_block_nids -> Failed loading the state block nids for a single previous state.
|
||||
// _load_combined_state -> Failed to load the combined state.
|
||||
// _resolve_conflicts -> Failed to resolve conflicts.
|
||||
[]string{"algorithm", "outcome"},
|
||||
)
|
||||
|
||||
var calculateStatePrevEventLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "dendrite",
|
||||
Subsystem: "roomserver",
|
||||
Name: "calculate_state_prev_event_length",
|
||||
Help: "The length of the list of events to calculate the state after",
|
||||
},
|
||||
[]string{"algorithm", "outcome"},
|
||||
)
|
||||
|
||||
var calculateStateFullStateLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "dendrite",
|
||||
Subsystem: "roomserver",
|
||||
Name: "calculate_state_full_state_length",
|
||||
Help: "The length of the full room state.",
|
||||
},
|
||||
[]string{"algorithm", "outcome"},
|
||||
)
|
||||
|
||||
var calculateStateConflictLength = prometheus.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: "dendrite",
|
||||
Subsystem: "roomserver",
|
||||
Name: "calculate_state_conflict_state_length",
|
||||
Help: "The length of the conflicted room state.",
|
||||
},
|
||||
[]string{"algorithm", "outcome"},
|
||||
)
|
||||
|
||||
type calculateStateMetrics struct {
|
||||
algorithm string
|
||||
startTime time.Time
|
||||
prevEventLength int
|
||||
fullStateLength int
|
||||
conflictLength int
|
||||
}
|
||||
|
||||
func (c *calculateStateMetrics) stop(stateNID types.StateSnapshotNID, err error) (types.StateSnapshotNID, error) {
|
||||
var outcome string
|
||||
if err == nil {
|
||||
outcome = "success"
|
||||
} else {
|
||||
outcome = "failure"
|
||||
}
|
||||
endTime := time.Now()
|
||||
calculateStateDurations.WithLabelValues(c.algorithm, outcome).Observe(
|
||||
float64(endTime.Sub(c.startTime).Nanoseconds()) / 1000.,
|
||||
)
|
||||
calculateStatePrevEventLength.WithLabelValues(c.algorithm, outcome).Observe(
|
||||
float64(c.prevEventLength),
|
||||
)
|
||||
calculateStateFullStateLength.WithLabelValues(c.algorithm, outcome).Observe(
|
||||
float64(c.fullStateLength),
|
||||
)
|
||||
calculateStateConflictLength.WithLabelValues(c.algorithm, outcome).Observe(
|
||||
float64(c.conflictLength),
|
||||
)
|
||||
return stateNID, err
|
||||
}
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(
|
||||
calculateStateDurations, calculateStatePrevEventLength,
|
||||
calculateStateFullStateLength, calculateStateConflictLength,
|
||||
)
|
||||
}
|
||||
|
||||
// calculateAndStoreState calculates a snapshot of the state of a room before an event.
|
||||
// Stores the snapshot of the state in the database.
|
||||
// Returns a numeric ID for the snapshot of the state before the event.
|
||||
|
@ -34,10 +131,13 @@ func calculateAndStoreStateBeforeEvent(
|
|||
// calculateAndStoreStateAfterEvents finds the room state after the given events.
|
||||
// Stores the resulting state in the database and returns a numeric ID for that snapshot.
|
||||
func calculateAndStoreStateAfterEvents(db RoomEventDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent) (types.StateSnapshotNID, error) {
|
||||
metrics := calculateStateMetrics{startTime: time.Now(), prevEventLength: len(prevStates)}
|
||||
|
||||
if len(prevStates) == 0 {
|
||||
// 2) There weren't any prev_events for this event so the state is
|
||||
// empty.
|
||||
return db.AddState(roomNID, nil, nil)
|
||||
metrics.algorithm = "empty_state"
|
||||
return metrics.stop(db.AddState(roomNID, nil, nil))
|
||||
}
|
||||
|
||||
if len(prevStates) == 1 {
|
||||
|
@ -47,26 +147,30 @@ func calculateAndStoreStateAfterEvents(db RoomEventDatabase, roomNID types.RoomN
|
|||
// have the same state, so this event has exactly the same state
|
||||
// as the previous events.
|
||||
// This should be the common case.
|
||||
return prevState.BeforeStateSnapshotNID, nil
|
||||
metrics.algorithm = "no_change"
|
||||
return metrics.stop(prevState.BeforeStateSnapshotNID, nil)
|
||||
}
|
||||
// The previous event was a state event so we need to store a copy
|
||||
// of the previous state updated with that event.
|
||||
stateBlockNIDLists, err := db.StateBlockNIDs([]types.StateSnapshotNID{prevState.BeforeStateSnapshotNID})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
metrics.algorithm = "_load_state_blocks"
|
||||
return metrics.stop(0, err)
|
||||
}
|
||||
stateBlockNIDs := stateBlockNIDLists[0].StateBlockNIDs
|
||||
if len(stateBlockNIDs) < maxStateBlockNIDs {
|
||||
// 4) The number of state data blocks is small enough that we can just
|
||||
// add the state event as a block of size one to the end of the blocks.
|
||||
return db.AddState(
|
||||
metrics.algorithm = "single_delta"
|
||||
return metrics.stop(db.AddState(
|
||||
roomNID, stateBlockNIDs, []types.StateEntry{prevState.StateEntry},
|
||||
)
|
||||
))
|
||||
}
|
||||
// If there are too many deltas then we need to calculate the full state
|
||||
// So fall through to calculateAndStoreStateAfterManyEvents
|
||||
}
|
||||
return calculateAndStoreStateAfterManyEvents(db, roomNID, prevStates)
|
||||
|
||||
return calculateAndStoreStateAfterManyEvents(db, roomNID, prevStates, metrics)
|
||||
}
|
||||
|
||||
// maxStateBlockNIDs is the maximum number of state data blocks to use to encode a snapshot of room state.
|
||||
|
@ -79,12 +183,15 @@ const maxStateBlockNIDs = 64
|
|||
// calculateAndStoreStateAfterManyEvents finds the room state after the given events.
|
||||
// This handles the slow path of calculateAndStoreStateAfterEvents for when there is more than one event.
|
||||
// Stores the resulting state and returns a numeric ID for the snapshot.
|
||||
func calculateAndStoreStateAfterManyEvents(db RoomEventDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent) (types.StateSnapshotNID, error) {
|
||||
func calculateAndStoreStateAfterManyEvents(
|
||||
db RoomEventDatabase, roomNID types.RoomNID, prevStates []types.StateAtEvent, metrics calculateStateMetrics,
|
||||
) (types.StateSnapshotNID, error) {
|
||||
// Conflict resolution.
|
||||
// First stage: load the state after each of the prev events.
|
||||
combined, err := state.LoadCombinedStateAfterEvents(db, prevStates)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
metrics.algorithm = "_load_combined_state"
|
||||
return metrics.stop(0, err)
|
||||
}
|
||||
|
||||
// Collect all the entries with the same type and key together.
|
||||
|
@ -98,6 +205,8 @@ func calculateAndStoreStateAfterManyEvents(db RoomEventDatabase, roomNID types.R
|
|||
|
||||
var state []types.StateEntry
|
||||
if len(conflicts) > 0 {
|
||||
metrics.conflictLength = len(conflicts)
|
||||
|
||||
// 5) There are conflicting state events, for each conflict workout
|
||||
// what the appropriate state event is.
|
||||
|
||||
|
@ -111,17 +220,21 @@ func calculateAndStoreStateAfterManyEvents(db RoomEventDatabase, roomNID types.R
|
|||
|
||||
resolved, err := resolveConflicts(db, notConflicted, conflicts)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
metrics.algorithm = "_resolve_conflicts"
|
||||
return metrics.stop(0, err)
|
||||
}
|
||||
metrics.algorithm = "full_state_with_conflicts"
|
||||
state = resolved
|
||||
} else {
|
||||
metrics.algorithm = "full_state_no_conflicts"
|
||||
// 6) There weren't any conflicts
|
||||
state = combined
|
||||
}
|
||||
metrics.fullStateLength = len(state)
|
||||
|
||||
// TODO: Check if we can encode the new state as a delta against the
|
||||
// previous state.
|
||||
return db.AddState(roomNID, nil, state)
|
||||
return metrics.stop(db.AddState(roomNID, nil, state))
|
||||
}
|
||||
|
||||
// loadStateEvents loads the matrix events for a list of state entries.
|
||||
|
@ -191,7 +304,6 @@ func resolveConflicts(db RoomEventDatabase, notConflicted, conflicted []types.St
|
|||
}
|
||||
|
||||
// Resolve the conflicts.
|
||||
fmt.Println("Resolving", len(conflicted), "conflicts with", len(authEvents), "authEvents")
|
||||
resolvedEvents := gomatrixserverlib.ResolveStateConflicts(conflictedEvents, authEvents)
|
||||
|
||||
// Map from the full events back to numeric state entries.
|
||||
|
|
Loading…
Reference in a new issue