mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-03 06:32:47 +00:00
Remodel how device list change IDs are created
Previously we made them using the offset Kafka supplied. We don't run Kafka anymore, so now we make the SQL table assign the change ID via an AUTOINCREMENTing ID. Redesign the `keyserver_key_changes` table to have `UNIQUE(user_id)` so we don't accumulate key changes forevermore, we now have at most 1 row per user which contains the highest change ID. This needs a SQL migration.
This commit is contained in:
parent
31f1810814
commit
5dc360481a
10 changed files with 109 additions and 91 deletions
|
@ -18,43 +18,47 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
eduapi "github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
"github.com/matrix-org/dendrite/keyserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// KeyChange produces key change events for the sync API and federation sender to consume
|
||||
type KeyChange struct {
|
||||
Topic string
|
||||
Producer sarama.SyncProducer
|
||||
DB storage.Database
|
||||
Topic string
|
||||
JetStream nats.JetStreamContext
|
||||
DB storage.Database
|
||||
}
|
||||
|
||||
// ProduceKeyChanges creates new change events for each key
|
||||
func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
|
||||
userToDeviceCount := make(map[string]int)
|
||||
for _, key := range keys {
|
||||
var m sarama.ProducerMessage
|
||||
|
||||
id, err := p.DB.StoreKeyChange(context.Background(), key.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
key.DeviceChangeID = id
|
||||
value, err := json.Marshal(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Topic = string(p.Topic)
|
||||
m.Key = sarama.StringEncoder(key.UserID)
|
||||
m.Value = sarama.ByteEncoder(value)
|
||||
m := &nats.Msg{
|
||||
Subject: p.Topic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
m.Header.Set(jetstream.UserID, key.UserID)
|
||||
m.Data = value
|
||||
|
||||
partition, offset, err := p.Producer.SendMessage(&m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID)
|
||||
_, err = p.JetStream.PublishMsg(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
userToDeviceCount[key.UserID]++
|
||||
}
|
||||
for userID, count := range userToDeviceCount {
|
||||
|
@ -67,7 +71,6 @@ func (p *KeyChange) ProduceKeyChanges(keys []api.DeviceMessage) error {
|
|||
}
|
||||
|
||||
func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) error {
|
||||
var m sarama.ProducerMessage
|
||||
output := &api.DeviceMessage{
|
||||
Type: api.TypeCrossSigningUpdate,
|
||||
OutputCrossSigningKeyUpdate: &eduapi.OutputCrossSigningKeyUpdate{
|
||||
|
@ -75,20 +78,25 @@ func (p *KeyChange) ProduceSigningKeyUpdate(key eduapi.CrossSigningKeyUpdate) er
|
|||
},
|
||||
}
|
||||
|
||||
id, err := p.DB.StoreKeyChange(context.Background(), key.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
output.DeviceChangeID = id
|
||||
|
||||
value, err := json.Marshal(output)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.Topic = string(p.Topic)
|
||||
m.Key = sarama.StringEncoder(key.UserID)
|
||||
m.Value = sarama.ByteEncoder(value)
|
||||
|
||||
partition, offset, err := p.Producer.SendMessage(&m)
|
||||
if err != nil {
|
||||
return err
|
||||
m := &nats.Msg{
|
||||
Subject: p.Topic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
err = p.DB.StoreKeyChange(context.Background(), partition, offset, key.UserID)
|
||||
m.Header.Set(jetstream.UserID, key.UserID)
|
||||
m.Data = value
|
||||
|
||||
_, err = p.JetStream.PublishMsg(m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue