Refactor appservices component (#2687)

This PR refactors the app services component. It makes the following changes:

* Each appservice now gets its own NATS JetStream consumer
* The appservice database is now removed entirely, since we just use JetStream as a data source instead
* The entire component is now much simpler and we deleted lots of lines of code 💅

The result is that it should be much lighter and hopefully much more performant.
This commit is contained in:
Neil Alexander 2022-09-01 09:20:40 +01:00 committed by GitHub
parent 175f65407a
commit ad6b902b84
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 231 additions and 1518 deletions

View file

@ -15,14 +15,18 @@
package consumers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"net/http"
"net/url"
"time"
"github.com/matrix-org/gomatrixserverlib"
"github.com/nats-io/nats.go"
"github.com/matrix-org/dendrite/appservice/storage"
"github.com/matrix-org/dendrite/appservice/types"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/jetstream"
@ -33,178 +37,192 @@ import (
// OutputRoomEventConsumer consumes events that originated in the room server.
type OutputRoomEventConsumer struct {
ctx context.Context
jetstream nats.JetStreamContext
durable string
topic string
asDB storage.Database
rsAPI api.AppserviceRoomserverAPI
serverName string
workerStates []types.ApplicationServiceWorkerState
ctx context.Context
cfg *config.AppServiceAPI
client *http.Client
jetstream nats.JetStreamContext
topic string
rsAPI api.AppserviceRoomserverAPI
}
type appserviceState struct {
*config.ApplicationService
backoff int
}
// NewOutputRoomEventConsumer creates a new OutputRoomEventConsumer. Call
// Start() to begin consuming from room servers.
func NewOutputRoomEventConsumer(
process *process.ProcessContext,
cfg *config.Dendrite,
cfg *config.AppServiceAPI,
client *http.Client,
js nats.JetStreamContext,
appserviceDB storage.Database,
rsAPI api.AppserviceRoomserverAPI,
workerStates []types.ApplicationServiceWorkerState,
) *OutputRoomEventConsumer {
return &OutputRoomEventConsumer{
ctx: process.Context(),
jetstream: js,
durable: cfg.Global.JetStream.Durable("AppserviceRoomserverConsumer"),
topic: cfg.Global.JetStream.Prefixed(jetstream.OutputRoomEvent),
asDB: appserviceDB,
rsAPI: rsAPI,
serverName: string(cfg.Global.ServerName),
workerStates: workerStates,
ctx: process.Context(),
cfg: cfg,
client: client,
jetstream: js,
topic: cfg.Matrix.JetStream.Prefixed(jetstream.OutputRoomEvent),
rsAPI: rsAPI,
}
}
// Start consuming from room servers
func (s *OutputRoomEventConsumer) Start() error {
return jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic, s.durable, 1,
s.onMessage, nats.DeliverAll(), nats.ManualAck(),
)
for _, as := range s.cfg.Derived.ApplicationServices {
appsvc := as
state := &appserviceState{
ApplicationService: &appsvc,
}
token := jetstream.Tokenise(as.ID)
if err := jetstream.JetStreamConsumer(
s.ctx, s.jetstream, s.topic,
s.cfg.Matrix.JetStream.Durable("Appservice_"+token),
50, // maximum number of events to send in a single transaction
func(ctx context.Context, msgs []*nats.Msg) bool {
return s.onMessage(ctx, state, msgs)
},
nats.DeliverNew(), nats.ManualAck(),
); err != nil {
return fmt.Errorf("failed to create %q consumer: %w", token, err)
}
}
return nil
}
// onMessage is called when the appservice component receives a new event from
// the room server output log.
func (s *OutputRoomEventConsumer) onMessage(ctx context.Context, msgs []*nats.Msg) bool {
msg := msgs[0] // Guaranteed to exist if onMessage is called
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return true
}
log.WithFields(log.Fields{
"type": output.Type,
}).Debug("Got a message in OutputRoomEventConsumer")
events := []*gomatrixserverlib.HeaderedEvent{}
if output.Type == api.OutputTypeNewRoomEvent && output.NewRoomEvent != nil {
newEventID := output.NewRoomEvent.Event.EventID()
events = append(events, output.NewRoomEvent.Event)
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
eventsReq := &api.QueryEventsByIDRequest{
EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
}
eventsRes := &api.QueryEventsByIDResponse{}
for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
if eventID != newEventID {
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
}
}
if len(eventsReq.EventIDs) > 0 {
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
return false
}
events = append(events, eventsRes.Events...)
}
func (s *OutputRoomEventConsumer) onMessage(
ctx context.Context, state *appserviceState, msgs []*nats.Msg,
) bool {
log.WithField("appservice", state.ID).Tracef("Appservice worker received %d message(s) from roomserver", len(msgs))
events := make([]*gomatrixserverlib.HeaderedEvent, 0, len(msgs))
for _, msg := range msgs {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Data, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithField("appservice", state.ID).WithError(err).Errorf("Appservice failed to parse message, ignoring")
continue
}
} else if output.Type == api.OutputTypeNewInviteEvent && output.NewInviteEvent != nil {
events = append(events, output.NewInviteEvent.Event)
} else {
log.WithFields(log.Fields{
"type": output.Type,
}).Debug("appservice OutputRoomEventConsumer ignoring event", string(msg.Data))
switch output.Type {
case api.OutputTypeNewRoomEvent:
if output.NewRoomEvent == nil || !s.appserviceIsInterestedInEvent(ctx, output.NewRoomEvent.Event, state.ApplicationService) {
continue
}
events = append(events, output.NewRoomEvent.Event)
if len(output.NewRoomEvent.AddsStateEventIDs) > 0 {
newEventID := output.NewRoomEvent.Event.EventID()
eventsReq := &api.QueryEventsByIDRequest{
EventIDs: make([]string, 0, len(output.NewRoomEvent.AddsStateEventIDs)),
}
eventsRes := &api.QueryEventsByIDResponse{}
for _, eventID := range output.NewRoomEvent.AddsStateEventIDs {
if eventID != newEventID {
eventsReq.EventIDs = append(eventsReq.EventIDs, eventID)
}
}
if len(eventsReq.EventIDs) > 0 {
if err := s.rsAPI.QueryEventsByID(s.ctx, eventsReq, eventsRes); err != nil {
log.WithError(err).Errorf("s.rsAPI.QueryEventsByID failed")
return false
}
events = append(events, eventsRes.Events...)
}
}
case api.OutputTypeNewInviteEvent:
if output.NewInviteEvent == nil {
continue
}
events = append(events, output.NewInviteEvent.Event)
default:
continue
}
}
// If there are no events selected for sending then we should
// ack the messages so that we don't get sent them again in the
// future.
if len(events) == 0 {
return true
}
// Send event to any relevant application services
if err := s.filterRoomserverEvents(context.TODO(), events); err != nil {
log.WithError(err).Errorf("roomserver output log: filter error")
return true
}
return true
// Send event to any relevant application services. If we hit
// an error here, return false, so that we negatively ack.
log.WithField("appservice", state.ID).Debugf("Appservice worker sending %d events(s) from roomserver", len(events))
return s.sendEvents(ctx, state, events) == nil
}
// filterRoomserverEvents takes in events and decides whether any of them need
// to be passed on to an external application service. It does this by checking
// each namespace of each registered application service, and if there is a
// match, adds the event to the queue for events to be sent to a particular
// application service.
func (s *OutputRoomEventConsumer) filterRoomserverEvents(
ctx context.Context,
// sendEvents passes events to the appservice by using the transactions
// endpoint. It will block for the backoff period if necessary.
func (s *OutputRoomEventConsumer) sendEvents(
ctx context.Context, state *appserviceState,
events []*gomatrixserverlib.HeaderedEvent,
) error {
for _, ws := range s.workerStates {
for _, event := range events {
// Check if this event is interesting to this application service
if s.appserviceIsInterestedInEvent(ctx, event, ws.AppService) {
// Queue this event to be sent off to the application service
if err := s.asDB.StoreEvent(ctx, ws.AppService.ID, event); err != nil {
log.WithError(err).Warn("failed to insert incoming event into appservices database")
return err
} else {
// Tell our worker to send out new messages by updating remaining message
// count and waking them up with a broadcast
ws.NotifyNewEvents()
}
}
}
// Create the transaction body.
transaction, err := json.Marshal(
gomatrixserverlib.ApplicationServiceTransaction{
Events: gomatrixserverlib.HeaderedToClientEvents(events, gomatrixserverlib.FormatAll),
},
)
if err != nil {
return err
}
// TODO: We should probably be more intelligent and pick something not
// in the control of the event. A NATS timestamp header or something maybe.
txnID := events[0].Event.OriginServerTS()
// Send the transaction to the appservice.
// https://matrix.org/docs/spec/application_service/r0.1.2#put-matrix-app-v1-transactions-txnid
address := fmt.Sprintf("%s/transactions/%d?access_token=%s", state.URL, txnID, url.QueryEscape(state.HSToken))
req, err := http.NewRequestWithContext(ctx, "PUT", address, bytes.NewBuffer(transaction))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := s.client.Do(req)
if err != nil {
return state.backoffAndPause(err)
}
// If the response was fine then we can clear any backoffs in place and
// report that everything was OK. Otherwise, back off for a while.
switch resp.StatusCode {
case http.StatusOK:
state.backoff = 0
default:
return state.backoffAndPause(fmt.Errorf("received HTTP status code %d from appservice", resp.StatusCode))
}
return nil
}
// appserviceJoinedAtEvent returns a boolean depending on whether a given
// appservice has membership at the time a given event was created.
func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
// TODO: This is only checking the current room state, not the state at
// the event in question. Pretty sure this is what Synapse does too, but
// until we have a lighter way of checking the state before the event that
// doesn't involve state res, then this is probably OK.
membershipReq := &api.QueryMembershipsForRoomRequest{
RoomID: event.RoomID(),
JoinedOnly: true,
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
func (s *appserviceState) backoffAndPause(err error) error {
if s.backoff < 6 {
s.backoff++
}
membershipRes := &api.QueryMembershipsForRoomResponse{}
// XXX: This could potentially race if the state for the event is not known yet
// e.g. the event came over federation but we do not have the full state persisted.
if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
for _, ev := range membershipRes.JoinEvents {
var membership gomatrixserverlib.MemberContent
if err = json.Unmarshal(ev.Content, &membership); err != nil || ev.StateKey == nil {
continue
}
if appservice.IsInterestedInUserID(*ev.StateKey) {
return true
}
}
} else {
log.WithFields(log.Fields{
"room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get membership for room")
}
return false
duration := time.Second * time.Duration(math.Pow(2, float64(s.backoff)))
log.WithField("appservice", s.ID).WithError(err).Errorf("Unable to send transaction to appservice, backing off for %s", duration.String())
time.Sleep(duration)
return err
}
// appserviceIsInterestedInEvent returns a boolean depending on whether a given
// event falls within one of a given application service's namespaces.
//
// TODO: This should be cached, see https://github.com/matrix-org/dendrite/issues/1682
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool {
// No reason to queue events if they'll never be sent to the application
// service
if appservice.URL == "" {
func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
switch {
case appservice.URL == "":
return false
}
// Check Room ID and Sender of the event
if appservice.IsInterestedInUserID(event.Sender()) ||
appservice.IsInterestedInRoomID(event.RoomID()) {
case appservice.IsInterestedInUserID(event.Sender()):
return true
case appservice.IsInterestedInRoomID(event.RoomID()):
return true
}
@ -225,10 +243,52 @@ func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Cont
}
} else {
log.WithFields(log.Fields{
"room_id": event.RoomID(),
"appservice": appservice.ID,
"room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get aliases for room")
}
// Check if any of the members in the room match the appservice
return s.appserviceJoinedAtEvent(ctx, event, appservice)
}
// appserviceJoinedAtEvent returns a boolean depending on whether a given
// appservice has membership at the time a given event was created.
func (s *OutputRoomEventConsumer) appserviceJoinedAtEvent(ctx context.Context, event *gomatrixserverlib.HeaderedEvent, appservice *config.ApplicationService) bool {
// TODO: This is only checking the current room state, not the state at
// the event in question. Pretty sure this is what Synapse does too, but
// until we have a lighter way of checking the state before the event that
// doesn't involve state res, then this is probably OK.
membershipReq := &api.QueryMembershipsForRoomRequest{
RoomID: event.RoomID(),
JoinedOnly: true,
}
membershipRes := &api.QueryMembershipsForRoomResponse{}
// XXX: This could potentially race if the state for the event is not known yet
// e.g. the event came over federation but we do not have the full state persisted.
if err := s.rsAPI.QueryMembershipsForRoom(ctx, membershipReq, membershipRes); err == nil {
for _, ev := range membershipRes.JoinEvents {
switch {
case ev.StateKey == nil:
continue
case ev.Type != gomatrixserverlib.MRoomMember:
continue
}
var membership gomatrixserverlib.MemberContent
err = json.Unmarshal(ev.Content, &membership)
switch {
case err != nil:
continue
case membership.Membership == gomatrixserverlib.Join:
return true
}
}
} else {
log.WithFields(log.Fields{
"appservice": appservice.ID,
"room_id": event.RoomID(),
}).WithError(err).Errorf("Unable to get membership for room")
}
return false
}