mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Ristretto cache (#2563)
* Try Ristretto cache * Tweak * It's beautiful * Update GMSL * More strict keyable interface * Fix that some more * Make less panicky * Don't enforce mutability checks for now * Determine mutability using deep equality * Tweaks * Namespace keys * Make federation caches mutable * Update cost estimation, add metric * Update GMSL * Estimate cost for metrics better * Reduce counters a bit * Try caching events * Some guards * Try again * Try this * Use separate caches for hopefully better hash distribution * Fix bug with admitting events into cache * Try to fix bugs * Check nil * Try that again * Preserve order jeezo this is messy * thanks VS Code for doing exactly the wrong thing * Try this again * Be more specific * aaaaargh * One more time * That might be better * Stronger sorting * Cache expiries, async publishing of EDUs * Put it back * Use a shared cache again * Cost estimation fixes * Update ristretto * Reduce counters a bit * Clean up a bit * Update GMSL * 1GB * Configurable cache sizees * Tweaks * Add `config.DataUnit` for specifying friendly cache sizes * Various tweaks * Update GMSL * Add back some lazy loading caching * Include key in cost * Include key in cost * Tweak max age handling, config key name * Only register prometheus metrics if requested * Review comments @S7evinK * Don't return errors when creating caches (it is better just to crash since otherwise we'll `nil`-pointer exception everywhere) * Review comments * Update sample configs * Update GHA Workflow * Update Complement images to Go 1.18 * Remove the cache test from the federation API as we no longer guarantee immediate cache admission * Don't check the caches in the renewal test * Possibly fix the upgrade tests * Update to matrix-org/gomatrixserverlib#322 * Update documentation to refer to Go 1.18
This commit is contained in:
parent
eb8dc50a97
commit
3ea21273bc
38 changed files with 603 additions and 764 deletions
|
@ -319,11 +319,9 @@ func (r *Inputer) processRoomEvent(
|
|||
|
||||
// if storing this event results in it being redacted then do so.
|
||||
if !isRejected && redactedEventID == event.EventID() {
|
||||
r, rerr := eventutil.RedactEvent(redactionEvent, event)
|
||||
if rerr != nil {
|
||||
if err = eventutil.RedactEvent(redactionEvent, event); err != nil {
|
||||
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
}
|
||||
event = r
|
||||
}
|
||||
|
||||
// For outliers we can stop after we've stored the event itself as it
|
||||
|
|
|
@ -48,10 +48,6 @@ func TestSingleTransactionOnInput(t *testing.T) {
|
|||
Kind: api.KindOutlier, // don't panic if we generate an output event
|
||||
Event: event.Headered(gomatrixserverlib.RoomVersionV6),
|
||||
}
|
||||
cache, err := caching.NewInMemoryLRUCache(false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db, err := storage.Open(
|
||||
nil,
|
||||
&config.DatabaseOptions{
|
||||
|
@ -59,7 +55,7 @@ func TestSingleTransactionOnInput(t *testing.T) {
|
|||
MaxOpenConnections: 1,
|
||||
MaxIdleConnections: 1,
|
||||
},
|
||||
cache,
|
||||
caching.NewRistrettoCache(8*1024*1024, time.Hour, false),
|
||||
)
|
||||
if err != nil {
|
||||
t.Logf("PostgreSQL not available (%s), skipping", err)
|
||||
|
|
|
@ -593,12 +593,11 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
|
|||
// redacted, which we don't care about since we aren't returning it in this backfill.
|
||||
if redactedEventID == ev.EventID() {
|
||||
eventToRedact := ev.Unwrap()
|
||||
redactedEvent, err := eventutil.RedactEvent(redactionEvent, eventToRedact)
|
||||
if err != nil {
|
||||
if err := eventutil.RedactEvent(redactionEvent, eventToRedact); err != nil {
|
||||
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to redact event")
|
||||
continue
|
||||
}
|
||||
ev = redactedEvent.Headered(ev.RoomVersion)
|
||||
ev = eventToRedact.Headered(ev.RoomVersion)
|
||||
events[j] = ev
|
||||
}
|
||||
backfilledEventMap[ev.EventID()] = types.Event{
|
||||
|
|
|
@ -1027,7 +1027,7 @@ func (v *StateResolution) loadStateEvents(
|
|||
|
||||
result := make([]*gomatrixserverlib.Event, 0, len(entries))
|
||||
eventEntries := make([]types.StateEntry, 0, len(entries))
|
||||
eventNIDs := make([]types.EventNID, 0, len(entries))
|
||||
eventNIDs := make(types.EventNIDs, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if e, ok := v.events[entry.EventNID]; ok {
|
||||
result = append(result, e)
|
||||
|
|
|
@ -439,8 +439,18 @@ func (d *Database) Events(
|
|||
}
|
||||
|
||||
func (d *Database) events(
|
||||
ctx context.Context, txn *sql.Tx, eventNIDs []types.EventNID,
|
||||
ctx context.Context, txn *sql.Tx, inputEventNIDs types.EventNIDs,
|
||||
) ([]types.Event, error) {
|
||||
sort.Sort(inputEventNIDs)
|
||||
events := make(map[types.EventNID]*gomatrixserverlib.Event, len(inputEventNIDs))
|
||||
eventNIDs := make([]types.EventNID, 0, len(inputEventNIDs))
|
||||
for _, nid := range inputEventNIDs {
|
||||
if event, ok := d.Cache.GetRoomServerEvent(nid); ok && event != nil {
|
||||
events[nid] = event
|
||||
} else {
|
||||
eventNIDs = append(eventNIDs, nid)
|
||||
}
|
||||
}
|
||||
eventJSONs, err := d.EventJSONTable.BulkSelectEventJSON(ctx, txn, eventNIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -476,18 +486,29 @@ func (d *Database) events(
|
|||
for n, v := range dbRoomVersions {
|
||||
roomVersions[n] = v
|
||||
}
|
||||
results := make([]types.Event, len(eventJSONs))
|
||||
for i, eventJSON := range eventJSONs {
|
||||
result := &results[i]
|
||||
result.EventNID = eventJSON.EventNID
|
||||
roomNID := roomNIDs[result.EventNID]
|
||||
for _, eventJSON := range eventJSONs {
|
||||
roomNID := roomNIDs[eventJSON.EventNID]
|
||||
roomVersion := roomVersions[roomNID]
|
||||
result.Event, err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
|
||||
events[eventJSON.EventNID], err = gomatrixserverlib.NewEventFromTrustedJSONWithEventID(
|
||||
eventIDs[eventJSON.EventNID], eventJSON.EventJSON, false, roomVersion,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if event := events[eventJSON.EventNID]; event != nil {
|
||||
d.Cache.StoreRoomServerEvent(eventJSON.EventNID, event)
|
||||
}
|
||||
}
|
||||
results := make([]types.Event, 0, len(inputEventNIDs))
|
||||
for _, nid := range inputEventNIDs {
|
||||
event, ok := events[nid]
|
||||
if !ok || event == nil {
|
||||
return nil, fmt.Errorf("event %d missing", nid)
|
||||
}
|
||||
results = append(results, types.Event{
|
||||
EventNID: nid,
|
||||
Event: event,
|
||||
})
|
||||
}
|
||||
if !redactionsArePermanent {
|
||||
d.applyRedactions(results)
|
||||
|
@ -854,7 +875,7 @@ func (d *Database) handleRedactions(
|
|||
|
||||
// mark the event as redacted
|
||||
if redactionsArePermanent {
|
||||
redactedEvent.Event = redactedEvent.Redact()
|
||||
redactedEvent.Redact()
|
||||
}
|
||||
|
||||
err = redactedEvent.SetUnsignedField("redacted_because", redactionEvent)
|
||||
|
@ -926,7 +947,7 @@ func (d *Database) loadRedactionPair(
|
|||
func (d *Database) applyRedactions(events []types.Event) {
|
||||
for i := range events {
|
||||
if result := gjson.GetBytes(events[i].Unsigned(), "redacted_because"); result.Exists() {
|
||||
events[i].Event = events[i].Redact()
|
||||
events[i].Redact()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue