mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 14:12:47 +00:00
Merge branch 'master' into neilalexander/federationinput
This commit is contained in:
commit
ad19c2b81a
79 changed files with 1351 additions and 1714 deletions
|
@ -3,7 +3,6 @@ package internal
|
|||
import (
|
||||
"context"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
asAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
fsAPI "github.com/matrix-org/dendrite/federationapi/api"
|
||||
|
@ -16,6 +15,8 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// RoomserverInternalAPI is an implementation of api.RoomserverInternalAPI
|
||||
|
@ -33,20 +34,21 @@ type RoomserverInternalAPI struct {
|
|||
*perform.Forgetter
|
||||
DB storage.Database
|
||||
Cfg *config.RoomServer
|
||||
Producer sarama.SyncProducer
|
||||
Cache caching.RoomServerCaches
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
ServerACLs *acls.ServerACLs
|
||||
fsAPI fsAPI.FederationInternalAPI
|
||||
asAPI asAPI.AppServiceQueryAPI
|
||||
OutputRoomEventTopic string // Kafka topic for new output room events
|
||||
JetStream nats.JetStreamContext
|
||||
InputRoomEventTopic string // JetStream topic for new input room events
|
||||
OutputRoomEventTopic string // JetStream topic for new output room events
|
||||
PerspectiveServerNames []gomatrixserverlib.ServerName
|
||||
}
|
||||
|
||||
func NewRoomserverAPI(
|
||||
cfg *config.RoomServer, roomserverDB storage.Database, producer sarama.SyncProducer,
|
||||
outputRoomEventTopic string, caches caching.RoomServerCaches,
|
||||
cfg *config.RoomServer, roomserverDB storage.Database, consumer nats.JetStreamContext,
|
||||
inputRoomEventTopic, outputRoomEventTopic string, caches caching.RoomServerCaches,
|
||||
perspectiveServerNames []gomatrixserverlib.ServerName,
|
||||
) *RoomserverInternalAPI {
|
||||
serverACLs := acls.NewServerACLs(roomserverDB)
|
||||
|
@ -56,8 +58,9 @@ func NewRoomserverAPI(
|
|||
Cache: caches,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
PerspectiveServerNames: perspectiveServerNames,
|
||||
InputRoomEventTopic: inputRoomEventTopic,
|
||||
OutputRoomEventTopic: outputRoomEventTopic,
|
||||
Producer: producer,
|
||||
JetStream: consumer,
|
||||
ServerACLs: serverACLs,
|
||||
Queryer: &query.Queryer{
|
||||
DB: roomserverDB,
|
||||
|
@ -67,13 +70,17 @@ func NewRoomserverAPI(
|
|||
},
|
||||
Inputer: &input.Inputer{
|
||||
DB: roomserverDB,
|
||||
InputRoomEventTopic: inputRoomEventTopic,
|
||||
OutputRoomEventTopic: outputRoomEventTopic,
|
||||
Producer: producer,
|
||||
JetStream: consumer,
|
||||
ServerName: cfg.Matrix.ServerName,
|
||||
ACLs: serverACLs,
|
||||
},
|
||||
// perform-er structs get initialised when we have a federation sender to use
|
||||
}
|
||||
if err := a.Inputer.Start(); err != nil {
|
||||
logrus.WithError(err).Panic("failed to start roomserver input API")
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
|
@ -86,8 +93,9 @@ func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalA
|
|||
|
||||
r.Inputer = &input.Inputer{
|
||||
DB: r.DB,
|
||||
InputRoomEventTopic: r.InputRoomEventTopic,
|
||||
OutputRoomEventTopic: r.OutputRoomEventTopic,
|
||||
Producer: r.Producer,
|
||||
JetStream: r.JetStream,
|
||||
ServerName: r.Cfg.Matrix.ServerName,
|
||||
FSAPI: fsAPI,
|
||||
KeyRing: keyRing,
|
||||
|
|
|
@ -19,9 +19,8 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/Arceliar/phony"
|
||||
"github.com/getsentry/sentry-go"
|
||||
fedapi "github.com/matrix-org/dendrite/federationapi/api"
|
||||
"github.com/matrix-org/dendrite/internal/hooks"
|
||||
|
@ -29,11 +28,12 @@ import (
|
|||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/roomserver/internal/query"
|
||||
"github.com/matrix-org/dendrite/roomserver/storage"
|
||||
"github.com/matrix-org/dendrite/setup/jetstream"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/tidwall/gjson"
|
||||
"go.uber.org/atomic"
|
||||
)
|
||||
|
||||
var keyContentFields = map[string]string{
|
||||
|
@ -44,109 +44,165 @@ var keyContentFields = map[string]string{
|
|||
|
||||
type Inputer struct {
|
||||
DB storage.Database
|
||||
Producer sarama.SyncProducer
|
||||
JetStream nats.JetStreamContext
|
||||
ServerName gomatrixserverlib.ServerName
|
||||
FSAPI fedapi.FederationInternalAPI
|
||||
KeyRing gomatrixserverlib.JSONVerifier
|
||||
ACLs *acls.ServerACLs
|
||||
InputRoomEventTopic string
|
||||
OutputRoomEventTopic string
|
||||
workers sync.Map // room ID -> *inputWorker
|
||||
workers sync.Map // room ID -> *phony.Inbox
|
||||
|
||||
Queryer *query.Queryer
|
||||
}
|
||||
|
||||
type inputTask struct {
|
||||
ctx context.Context
|
||||
event *api.InputRoomEvent
|
||||
wg *sync.WaitGroup
|
||||
err error // written back by worker, only safe to read when all tasks are done
|
||||
// onMessage is called when a new event arrives in the roomserver input stream.
|
||||
func (r *Inputer) Start() error {
|
||||
_, err := r.JetStream.Subscribe(
|
||||
r.InputRoomEventTopic,
|
||||
// We specifically don't use jetstream.WithJetStreamMessage here because we
|
||||
// queue the task off to a room-specific queue and the ACK needs to be sent
|
||||
// later, possibly with an error response to the inputter if synchronous.
|
||||
func(msg *nats.Msg) {
|
||||
roomID := msg.Header.Get("room_id")
|
||||
defer roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Dec()
|
||||
var inputRoomEvent api.InputRoomEvent
|
||||
if err := json.Unmarshal(msg.Data, &inputRoomEvent); err != nil {
|
||||
_ = msg.Term()
|
||||
return
|
||||
}
|
||||
inbox, _ := r.workers.LoadOrStore(roomID, &phony.Inbox{})
|
||||
inbox.(*phony.Inbox).Act(nil, func() {
|
||||
if err := r.processRoomEvent(context.TODO(), &inputRoomEvent); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
} else {
|
||||
hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
|
||||
}
|
||||
_ = msg.Ack()
|
||||
})
|
||||
},
|
||||
// NATS wants to acknowledge automatically by default when the message is
|
||||
// read from the stream, but we want to override that behaviour by making
|
||||
// sure that we only acknowledge when we're happy we've done everything we
|
||||
// can. This ensures we retry things when it makes sense to do so.
|
||||
nats.ManualAck(),
|
||||
// NATS will try to redeliver things to us automatically if we don't ack
|
||||
// or nak them within a certain amount of time. This stops that from
|
||||
// happening, so we don't end up doing a lot of unnecessary duplicate work.
|
||||
nats.MaxDeliver(0),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
type inputWorker struct {
|
||||
r *Inputer
|
||||
running atomic.Bool
|
||||
input *fifoQueue
|
||||
}
|
||||
|
||||
// Guarded by a CAS on w.running
|
||||
func (w *inputWorker) start() {
|
||||
defer w.running.Store(false)
|
||||
for {
|
||||
select {
|
||||
case <-w.input.wait():
|
||||
task, ok := w.input.pop()
|
||||
if !ok {
|
||||
continue
|
||||
// InputRoomEvents implements api.RoomserverInternalAPI
|
||||
func (r *Inputer) InputRoomEvents(
|
||||
ctx context.Context,
|
||||
request *api.InputRoomEventsRequest,
|
||||
response *api.InputRoomEventsResponse,
|
||||
) {
|
||||
if request.Asynchronous {
|
||||
var err error
|
||||
for _, e := range request.InputRoomEvents {
|
||||
msg := &nats.Msg{
|
||||
Subject: r.InputRoomEventTopic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
roomserverInputBackpressure.With(prometheus.Labels{
|
||||
"room_id": task.event.Event.RoomID(),
|
||||
}).Dec()
|
||||
hooks.Run(hooks.KindNewEventReceived, task.event.Event)
|
||||
_, task.err = w.r.processRoomEvent(task.ctx, task.event)
|
||||
if task.err == nil {
|
||||
hooks.Run(hooks.KindNewEventPersisted, task.event.Event)
|
||||
} else {
|
||||
sentry.CaptureException(task.err)
|
||||
roomID := e.Event.RoomID()
|
||||
msg.Header.Set("room_id", roomID)
|
||||
msg.Data, err = json.Marshal(e)
|
||||
if err != nil {
|
||||
response.ErrMsg = err.Error()
|
||||
return
|
||||
}
|
||||
if _, err = r.JetStream.PublishMsg(msg); err != nil {
|
||||
return
|
||||
}
|
||||
roomserverInputBackpressure.With(prometheus.Labels{"room_id": roomID}).Inc()
|
||||
}
|
||||
} else {
|
||||
responses := make(chan error, len(request.InputRoomEvents))
|
||||
defer close(responses)
|
||||
for _, e := range request.InputRoomEvents {
|
||||
inputRoomEvent := e
|
||||
inbox, _ := r.workers.LoadOrStore(inputRoomEvent.Event.RoomID(), &phony.Inbox{})
|
||||
inbox.(*phony.Inbox).Act(nil, func() {
|
||||
err := r.processRoomEvent(context.TODO(), &inputRoomEvent)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
} else {
|
||||
hooks.Run(hooks.KindNewEventPersisted, inputRoomEvent.Event)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
default:
|
||||
responses <- err
|
||||
}
|
||||
})
|
||||
}
|
||||
for i := 0; i < len(request.InputRoomEvents); i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case err := <-responses:
|
||||
if err != nil {
|
||||
response.ErrMsg = err.Error()
|
||||
return
|
||||
}
|
||||
}
|
||||
task.wg.Done()
|
||||
case <-time.After(time.Second * 5):
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WriteOutputEvents implements OutputRoomEventWriter
|
||||
func (r *Inputer) WriteOutputEvents(roomID string, updates []api.OutputEvent) error {
|
||||
messages := make([]*sarama.ProducerMessage, len(updates))
|
||||
for i := range updates {
|
||||
value, err := json.Marshal(updates[i])
|
||||
var err error
|
||||
for _, update := range updates {
|
||||
msg := &nats.Msg{
|
||||
Subject: r.OutputRoomEventTopic,
|
||||
Header: nats.Header{},
|
||||
}
|
||||
msg.Header.Set(jetstream.RoomID, roomID)
|
||||
msg.Data, err = json.Marshal(update)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logger := log.WithFields(log.Fields{
|
||||
"room_id": roomID,
|
||||
"type": updates[i].Type,
|
||||
"type": update.Type,
|
||||
})
|
||||
if updates[i].NewRoomEvent != nil {
|
||||
eventType := updates[i].NewRoomEvent.Event.Type()
|
||||
if update.NewRoomEvent != nil {
|
||||
eventType := update.NewRoomEvent.Event.Type()
|
||||
logger = logger.WithFields(log.Fields{
|
||||
"event_type": eventType,
|
||||
"event_id": updates[i].NewRoomEvent.Event.EventID(),
|
||||
"adds_state": len(updates[i].NewRoomEvent.AddsStateEventIDs),
|
||||
"removes_state": len(updates[i].NewRoomEvent.RemovesStateEventIDs),
|
||||
"send_as_server": updates[i].NewRoomEvent.SendAsServer,
|
||||
"sender": updates[i].NewRoomEvent.Event.Sender(),
|
||||
"event_id": update.NewRoomEvent.Event.EventID(),
|
||||
"adds_state": len(update.NewRoomEvent.AddsStateEventIDs),
|
||||
"removes_state": len(update.NewRoomEvent.RemovesStateEventIDs),
|
||||
"send_as_server": update.NewRoomEvent.SendAsServer,
|
||||
"sender": update.NewRoomEvent.Event.Sender(),
|
||||
})
|
||||
if updates[i].NewRoomEvent.Event.StateKey() != nil {
|
||||
logger = logger.WithField("state_key", *updates[i].NewRoomEvent.Event.StateKey())
|
||||
if update.NewRoomEvent.Event.StateKey() != nil {
|
||||
logger = logger.WithField("state_key", *update.NewRoomEvent.Event.StateKey())
|
||||
}
|
||||
contentKey := keyContentFields[eventType]
|
||||
if contentKey != "" {
|
||||
value := gjson.GetBytes(updates[i].NewRoomEvent.Event.Content(), contentKey)
|
||||
value := gjson.GetBytes(update.NewRoomEvent.Event.Content(), contentKey)
|
||||
if value.Exists() {
|
||||
logger = logger.WithField("content_value", value.String())
|
||||
}
|
||||
}
|
||||
|
||||
if eventType == "m.room.server_acl" && updates[i].NewRoomEvent.Event.StateKeyEquals("") {
|
||||
ev := updates[i].NewRoomEvent.Event.Unwrap()
|
||||
if eventType == "m.room.server_acl" && update.NewRoomEvent.Event.StateKeyEquals("") {
|
||||
ev := update.NewRoomEvent.Event.Unwrap()
|
||||
defer r.ACLs.OnServerACLUpdate(ev)
|
||||
}
|
||||
}
|
||||
logger.Infof("Producing to topic '%s'", r.OutputRoomEventTopic)
|
||||
messages[i] = &sarama.ProducerMessage{
|
||||
Topic: r.OutputRoomEventTopic,
|
||||
Key: sarama.StringEncoder(roomID),
|
||||
Value: sarama.ByteEncoder(value),
|
||||
logger.Tracef("Producing to topic '%s'", r.OutputRoomEventTopic)
|
||||
if _, err := r.JetStream.PublishMsg(msg); err != nil {
|
||||
logger.WithError(err).Errorf("Failed to produce to topic '%s': %s", r.OutputRoomEventTopic, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
errs := r.Producer.SendMessages(messages)
|
||||
if errs != nil {
|
||||
for _, err := range errs.(sarama.ProducerErrors) {
|
||||
log.WithError(err).WithField("message_bytes", err.Msg.Value.Length()).Error("Write to kafka failed")
|
||||
}
|
||||
}
|
||||
return errs
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -162,67 +218,3 @@ var roomserverInputBackpressure = prometheus.NewGaugeVec(
|
|||
},
|
||||
[]string{"room_id"},
|
||||
)
|
||||
|
||||
// InputRoomEvents implements api.RoomserverInternalAPI
|
||||
func (r *Inputer) InputRoomEvents(
|
||||
_ context.Context,
|
||||
request *api.InputRoomEventsRequest,
|
||||
response *api.InputRoomEventsResponse,
|
||||
) {
|
||||
// Create a wait group. Each task that we dispatch will call Done on
|
||||
// this wait group so that we know when all of our events have been
|
||||
// processed.
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(request.InputRoomEvents))
|
||||
tasks := make([]*inputTask, len(request.InputRoomEvents))
|
||||
|
||||
for i, e := range request.InputRoomEvents {
|
||||
// Work out if we are running per-room workers or if we're just doing
|
||||
// it on a global basis (e.g. SQLite).
|
||||
roomID := "global"
|
||||
if r.DB.SupportsConcurrentRoomInputs() {
|
||||
roomID = e.Event.RoomID()
|
||||
}
|
||||
|
||||
// Look up the worker, or create it if it doesn't exist. This channel
|
||||
// is buffered to reduce the chance that we'll be blocked by another
|
||||
// room - the channel will be quite small as it's just pointer types.
|
||||
w, _ := r.workers.LoadOrStore(roomID, &inputWorker{
|
||||
r: r,
|
||||
input: newFIFOQueue(),
|
||||
})
|
||||
worker := w.(*inputWorker)
|
||||
|
||||
// Create a task. This contains the input event and a reference to
|
||||
// the wait group, so that the worker can notify us when this specific
|
||||
// task has been finished.
|
||||
tasks[i] = &inputTask{
|
||||
ctx: context.Background(),
|
||||
event: &request.InputRoomEvents[i],
|
||||
wg: wg,
|
||||
}
|
||||
|
||||
// Send the task to the worker.
|
||||
if worker.running.CAS(false, true) {
|
||||
go worker.start()
|
||||
}
|
||||
worker.input.push(tasks[i])
|
||||
roomserverInputBackpressure.With(prometheus.Labels{
|
||||
"room_id": roomID,
|
||||
}).Inc()
|
||||
}
|
||||
|
||||
// Wait for all of the workers to return results about our tasks.
|
||||
wg.Wait()
|
||||
|
||||
// If any of the tasks returned an error, we should probably report
|
||||
// that back to the caller.
|
||||
for _, task := range tasks {
|
||||
if task.err != nil {
|
||||
response.ErrMsg = task.err.Error()
|
||||
_, rejected := task.err.(*gomatrixserverlib.NotAllowed)
|
||||
response.NotAllowed = rejected
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
|
|||
func (r *Inputer) processRoomEvent(
|
||||
ctx context.Context,
|
||||
input *api.InputRoomEvent,
|
||||
) (string, error) {
|
||||
) (err error) {
|
||||
// Measure how long it takes to process this event.
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
|
@ -95,11 +95,11 @@ func (r *Inputer) processRoomEvent(
|
|||
case gomatrixserverlib.EventIDFormatV1:
|
||||
if bytes.Equal(event.EventReference().EventSHA256, evs[0].EventReference().EventSHA256) {
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return event.EventID(), nil
|
||||
return nil
|
||||
}
|
||||
default:
|
||||
logger.Debugf("Already processed event; ignoring")
|
||||
return event.EventID(), nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ func (r *Inputer) processRoomEvent(
|
|||
PrevEventIDs: event.PrevEventIDs(),
|
||||
}
|
||||
if err := r.Queryer.QueryMissingAuthPrevEvents(ctx, missingReq, missingRes); err != nil {
|
||||
return "", fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
return fmt.Errorf("r.Queryer.QueryMissingAuthPrevEvents: %w", err)
|
||||
}
|
||||
}
|
||||
if len(missingRes.MissingAuthEventIDs) > 0 || len(missingRes.MissingPrevEventIDs) > 0 {
|
||||
|
@ -122,7 +122,7 @@ func (r *Inputer) processRoomEvent(
|
|||
RoomID: event.RoomID(),
|
||||
}
|
||||
if err := r.FSAPI.QueryJoinedHostServerNamesInRoom(ctx, serverReq, serverRes); err != nil {
|
||||
return "", fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
return fmt.Errorf("r.FSAPI.QueryJoinedHostServerNamesInRoom: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -133,7 +133,7 @@ func (r *Inputer) processRoomEvent(
|
|||
knownEvents := map[string]*types.Event{}
|
||||
logger.Println("Starting to check for missing auth events")
|
||||
if err := r.checkForMissingAuthEvents(ctx, logger, input.Event, &authEvents, knownEvents, serverRes.ServerNames); err != nil {
|
||||
return "", fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
||||
return fmt.Errorf("r.checkForMissingAuthEvents: %w", err)
|
||||
}
|
||||
logger.Println("Checked for missing auth events")
|
||||
|
||||
|
@ -150,7 +150,7 @@ func (r *Inputer) processRoomEvent(
|
|||
authEventNIDs := make([]types.EventNID, 0, len(authEventIDs))
|
||||
for _, authEventID := range authEventIDs {
|
||||
if _, ok := knownEvents[authEventID]; !ok {
|
||||
return "", fmt.Errorf("missing auth event %s", authEventID)
|
||||
return fmt.Errorf("missing auth event %s", authEventID)
|
||||
}
|
||||
authEventNIDs = append(authEventNIDs, knownEvents[authEventID].EventNID)
|
||||
}
|
||||
|
@ -169,14 +169,14 @@ func (r *Inputer) processRoomEvent(
|
|||
// Store the event.
|
||||
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
return fmt.Errorf("r.DB.StoreEvent: %w", err)
|
||||
}
|
||||
|
||||
// if storing this event results in it being redacted then do so.
|
||||
if !isRejected && redactedEventID == event.EventID() {
|
||||
r, rerr := eventutil.RedactEvent(redactionEvent, event)
|
||||
if rerr != nil {
|
||||
return "", fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
return fmt.Errorf("eventutil.RedactEvent: %w", rerr)
|
||||
}
|
||||
event = r
|
||||
}
|
||||
|
@ -186,15 +186,15 @@ func (r *Inputer) processRoomEvent(
|
|||
// notify anyone about it.
|
||||
if input.Kind == api.KindOutlier {
|
||||
logger.Debug("Stored outlier")
|
||||
return event.EventID(), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
roomInfo, err := r.DB.RoomInfo(ctx, event.RoomID())
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.DB.RoomInfo: %w", err)
|
||||
return fmt.Errorf("r.DB.RoomInfo: %w", err)
|
||||
}
|
||||
if roomInfo == nil {
|
||||
return "", fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
|
||||
return fmt.Errorf("r.DB.RoomInfo missing for room %s", event.RoomID())
|
||||
}
|
||||
|
||||
if input.Origin == "" {
|
||||
|
@ -216,7 +216,7 @@ func (r *Inputer) processRoomEvent(
|
|||
haveEvents: map[string]*gomatrixserverlib.HeaderedEvent{},
|
||||
}
|
||||
if err = missingState.processEventWithMissingState(ctx, input.Event.Unwrap(), roomInfo.RoomVersion); err != nil {
|
||||
return "", fmt.Errorf("r.checkForMissingPrevEvents: %w", err)
|
||||
return fmt.Errorf("r.checkForMissingPrevEvents: %w", err)
|
||||
}
|
||||
logger.Println("Checked for missing prev events")
|
||||
}
|
||||
|
@ -226,14 +226,14 @@ func (r *Inputer) processRoomEvent(
|
|||
// Lets calculate one.
|
||||
err = r.calculateAndSetState(ctx, input, *roomInfo, &stateAtEvent, event, isRejected)
|
||||
if err != nil && input.Kind != api.KindOld {
|
||||
return "", fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
return fmt.Errorf("r.calculateAndSetState: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// We stop here if the event is rejected: We've stored it but won't update forward extremities or notify anyone about it.
|
||||
if isRejected || softfail {
|
||||
logger.WithField("soft_fail", softfail).Debug("Stored rejected event")
|
||||
return event.EventID(), rejectionErr
|
||||
return rejectionErr
|
||||
}
|
||||
|
||||
switch input.Kind {
|
||||
|
@ -247,7 +247,7 @@ func (r *Inputer) processRoomEvent(
|
|||
input.TransactionID, // transaction ID
|
||||
input.HasState, // rewrites state?
|
||||
); err != nil {
|
||||
return "", fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
return fmt.Errorf("r.updateLatestEvents: %w", err)
|
||||
}
|
||||
case api.KindOld:
|
||||
err = r.WriteOutputEvents(event.RoomID(), []api.OutputEvent{
|
||||
|
@ -259,7 +259,7 @@ func (r *Inputer) processRoomEvent(
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||
return fmt.Errorf("r.WriteOutputEvents (old): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -278,12 +278,12 @@ func (r *Inputer) processRoomEvent(
|
|||
},
|
||||
})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||
return fmt.Errorf("r.WriteOutputEvents (redactions): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Update the extremities of the event graph for the room
|
||||
return event.EventID(), nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Inputer) checkForMissingAuthEvents(
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
package input
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type fifoQueue struct {
|
||||
tasks []*inputTask
|
||||
count int
|
||||
mutex sync.Mutex
|
||||
notifs chan struct{}
|
||||
}
|
||||
|
||||
func newFIFOQueue() *fifoQueue {
|
||||
q := &fifoQueue{
|
||||
notifs: make(chan struct{}, 1),
|
||||
}
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *fifoQueue) push(frame *inputTask) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
q.tasks = append(q.tasks, frame)
|
||||
q.count++
|
||||
select {
|
||||
case q.notifs <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// pop returns the first item of the queue, if there is one.
|
||||
// The second return value will indicate if a task was returned.
|
||||
// You must check this value, even after calling wait().
|
||||
func (q *fifoQueue) pop() (*inputTask, bool) {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
if q.count == 0 {
|
||||
return nil, false
|
||||
}
|
||||
frame := q.tasks[0]
|
||||
q.tasks[0] = nil
|
||||
q.tasks = q.tasks[1:]
|
||||
q.count--
|
||||
if q.count == 0 {
|
||||
// Force a GC of the underlying array, since it might have
|
||||
// grown significantly if the queue was hammered for some reason
|
||||
q.tasks = nil
|
||||
}
|
||||
return frame, true
|
||||
}
|
||||
|
||||
// wait returns a channel which can be used to detect when an
|
||||
// item is waiting in the queue.
|
||||
func (q *fifoQueue) wait() <-chan struct{} {
|
||||
q.mutex.Lock()
|
||||
defer q.mutex.Unlock()
|
||||
if q.count > 0 && len(q.notifs) == 0 {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch
|
||||
}
|
||||
return q.notifs
|
||||
}
|
|
@ -151,7 +151,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
stateIDs = append(stateIDs, event.EventID())
|
||||
}
|
||||
|
||||
_, err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
Kind: api.KindOld,
|
||||
Event: backwardsExtremity.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
@ -169,7 +169,7 @@ func (t *missingStateReq) processEventWithMissingState(
|
|||
// they will automatically fast-forward based on the room state at the
|
||||
// extremity in the last step.
|
||||
for _, newEvent := range newEvents {
|
||||
_, err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
err = t.inputer.processRoomEvent(ctx, &api.InputRoomEvent{
|
||||
Kind: api.KindOld,
|
||||
Event: newEvent.Headered(roomVersion),
|
||||
Origin: t.origin,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue