diff --git a/clientapi/producers/eduserver.go b/clientapi/producers/eduserver.go index 14414ec6..30c40fb7 100644 --- a/clientapi/producers/eduserver.go +++ b/clientapi/producers/eduserver.go @@ -20,7 +20,7 @@ import ( "github.com/matrix-org/gomatrixserverlib" ) -// EDUServerProducer produces events for the typing server to consume +// EDUServerProducer produces events for the EDU server to consume type EDUServerProducer struct { InputAPI api.EDUServerInputAPI } @@ -35,13 +35,13 @@ func NewEDUServerProducer(inputAPI api.EDUServerInputAPI) *EDUServerProducer { // SendTyping sends a typing event to EDU server func (p *EDUServerProducer) SendTyping( ctx context.Context, userID, roomID string, - typing bool, timeout int64, + typing bool, timeoutMS int64, ) error { requestData := api.InputTypingEvent{ UserID: userID, RoomID: roomID, Typing: typing, - Timeout: timeout, + TimeoutMS: timeoutMS, OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()), } diff --git a/cmd/dendrite-federation-api-server/main.go b/cmd/dendrite-federation-api-server/main.go index 367f5dc0..d18926a6 100644 --- a/cmd/dendrite-federation-api-server/main.go +++ b/cmd/dendrite-federation-api-server/main.go @@ -15,8 +15,11 @@ package main import ( + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/eduserver" + "github.com/matrix-org/dendrite/eduserver/cache" "github.com/matrix-org/dendrite/federationapi" ) @@ -34,10 +37,12 @@ func main() { alias, input, query := base.CreateHTTPRoomserverAPIs() asQuery := base.CreateHTTPAppServiceAPIs() + eduInputAPI := eduserver.SetupEDUServerComponent(base, cache.New()) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) federationapi.SetupFederationAPIComponent( base, accountDB, deviceDB, federation, &keyRing, - alias, input, query, asQuery, federationSender, + alias, input, query, asQuery, federationSender, eduProducer, ) base.SetupAndServeHTTP(string(base.Cfg.Bind.FederationAPI), string(base.Cfg.Listen.FederationAPI)) diff --git a/cmd/dendrite-monolith-server/main.go b/cmd/dendrite-monolith-server/main.go index 0aceef02..9f6531ed 100644 --- a/cmd/dendrite-monolith-server/main.go +++ b/cmd/dendrite-monolith-server/main.go @@ -20,6 +20,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/keydb" @@ -67,7 +68,8 @@ func main() { federation, &keyRing, alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, nil) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 0f72dc1e..05802725 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -23,6 +23,7 @@ import ( "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" + "github.com/matrix-org/dendrite/clientapi/producers" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/common/config" @@ -133,7 +134,8 @@ func main() { federation, &keyRing, alias, input, query, eduInputAPI, asQuery, transactions.New(), fedSenderAPI, ) - federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI) + eduProducer := producers.NewEDUServerProducer(eduInputAPI) + federationapi.SetupFederationAPIComponent(base, accountDB, deviceDB, federation, &keyRing, alias, input, query, asQuery, fedSenderAPI, eduProducer) mediaapi.SetupMediaAPIComponent(base, deviceDB) publicroomsapi.SetupPublicRoomsAPIComponent(base, deviceDB, query, federation, p2pPublicRoomProvider) syncapi.SetupSyncAPIComponent(base, deviceDB, accountDB, query, federation, cfg) diff --git a/eduserver/api/input.go b/eduserver/api/input.go index c95acaf1..ad3f1ed5 100644 --- a/eduserver/api/input.go +++ b/eduserver/api/input.go @@ -30,8 +30,8 @@ type InputTypingEvent struct { RoomID string `json:"room_id"` // Typing is true if the user is typing, false if they have stopped. Typing bool `json:"typing"` - // Timeout is the interval for which the user should be marked as typing. - Timeout int64 `json:"timeout"` + // Timeout is the interval in milliseconds for which the user should be marked as typing. + TimeoutMS int64 `json:"timeout"` // OriginServerTS when the server received the update. OriginServerTS gomatrixserverlib.Timestamp `json:"origin_server_ts"` } diff --git a/eduserver/input/input.go b/eduserver/input/input.go index e0cc6922..84590945 100644 --- a/eduserver/input/input.go +++ b/eduserver/input/input.go @@ -46,7 +46,7 @@ func (t *EDUServerInputAPI) InputTypingEvent( if ite.Typing { // user is typing, update our current state of users typing. expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.Timeout) * time.Millisecond, + time.Duration(ite.TimeoutMS) * time.Millisecond, ) t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime) } else { @@ -69,7 +69,7 @@ func (t *EDUServerInputAPI) sendEvent(ite *api.InputTypingEvent) error { if ev.Typing { expireTime := ite.OriginServerTS.Time().Add( - time.Duration(ite.Timeout) * time.Millisecond, + time.Duration(ite.TimeoutMS) * time.Millisecond, ) ote.ExpireTime = &expireTime } diff --git a/federationapi/federationapi.go b/federationapi/federationapi.go index 90db95b3..ed96322b 100644 --- a/federationapi/federationapi.go +++ b/federationapi/federationapi.go @@ -41,12 +41,13 @@ func SetupFederationAPIComponent( queryAPI roomserverAPI.RoomserverQueryAPI, asAPI appserviceAPI.AppServiceQueryAPI, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, + eduProducer *producers.EDUServerProducer, ) { roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI) routing.Setup( base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI, - roomserverProducer, federationSenderAPI, *keyRing, + roomserverProducer, eduProducer, federationSenderAPI, *keyRing, federation, accountsDB, deviceDB, ) } diff --git a/federationapi/routing/routing.go b/federationapi/routing/routing.go index b5c8e53d..9ac53576 100644 --- a/federationapi/routing/routing.go +++ b/federationapi/routing/routing.go @@ -48,6 +48,7 @@ func Setup( aliasAPI roomserverAPI.RoomserverAliasAPI, asAPI appserviceAPI.AppServiceQueryAPI, producer *producers.RoomserverProducer, + eduProducer *producers.EDUServerProducer, federationSenderAPI federationSenderAPI.FederationSenderQueryAPI, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, @@ -79,7 +80,7 @@ func Setup( } return Send( httpReq, request, gomatrixserverlib.TransactionID(vars["txnID"]), - cfg, query, producer, keys, federation, + cfg, query, producer, eduProducer, keys, federation, ) }, )).Methods(http.MethodPut, http.MethodOptions) diff --git a/federationapi/routing/send.go b/federationapi/routing/send.go index 4c92c7e5..1013a44c 100644 --- a/federationapi/routing/send.go +++ b/federationapi/routing/send.go @@ -36,20 +36,22 @@ func Send( cfg *config.Dendrite, query api.RoomserverQueryAPI, producer *producers.RoomserverProducer, + eduProducer *producers.EDUServerProducer, keys gomatrixserverlib.KeyRing, federation *gomatrixserverlib.FederationClient, ) util.JSONResponse { t := txnReq{ - context: httpReq.Context(), - query: query, - producer: producer, - keys: keys, - federation: federation, + context: httpReq.Context(), + query: query, + producer: producer, + eduProducer: eduProducer, + keys: keys, + federation: federation, } var txnEvents struct { - PDUs []json.RawMessage `json:"pdus"` - EDUs []json.RawMessage `json:"edus"` + PDUs []json.RawMessage `json:"pdus"` + EDUs []gomatrixserverlib.EDU `json:"edus"` } if err := json.Unmarshal(request.Content(), &txnEvents); err != nil { @@ -59,7 +61,9 @@ func Send( } } + // TODO: Really we should have a function to convert FederationRequest to txnReq t.PDUs = txnEvents.PDUs + t.EDUs = txnEvents.EDUs t.Origin = request.Origin() t.TransactionID = txnID t.Destination = cfg.Matrix.ServerName @@ -80,11 +84,12 @@ func Send( type txnReq struct { gomatrixserverlib.Transaction - context context.Context - query api.RoomserverQueryAPI - producer *producers.RoomserverProducer - keys gomatrixserverlib.KeyRing - federation *gomatrixserverlib.FederationClient + context context.Context + query api.RoomserverQueryAPI + producer *producers.RoomserverProducer + eduProducer *producers.EDUServerProducer + keys gomatrixserverlib.KeyRing + federation *gomatrixserverlib.FederationClient } func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { @@ -152,7 +157,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) { } } - // TODO: Process the EDUs. + t.processEDUs(t.EDUs) util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID) return &gomatrixserverlib.RespSend{PDUs: results}, nil } @@ -163,6 +168,29 @@ type unknownRoomError struct { func (e unknownRoomError) Error() string { return fmt.Sprintf("unknown room %q", e.roomID) } +func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) { + for _, e := range edus { + switch e.Type { + case gomatrixserverlib.MTyping: + // https://matrix.org/docs/spec/server_server/latest#typing-notifications + var typingPayload struct { + RoomID string `json:"room_id"` + UserID string `json:"user_id"` + Typing bool `json:"typing"` + } + if err := json.Unmarshal(e.Content, &typingPayload); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event") + continue + } + if err := t.eduProducer.SendTyping(t.context, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil { + util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server") + } + default: + util.GetLogger(t.context).WithField("type", e.Type).Warn("unhandled edu") + } + } +} + func (t *txnReq) processEvent(e gomatrixserverlib.Event) error { prevEventIDs := e.PrevEventIDs() diff --git a/federationsender/consumers/eduserver.go b/federationsender/consumers/eduserver.go index ba45db7f..4d2445f3 100644 --- a/federationsender/consumers/eduserver.go +++ b/federationsender/consumers/eduserver.go @@ -73,6 +73,17 @@ func (t *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error return nil } + // only send typing events which originated from us + _, typingServerName, err := gomatrixserverlib.SplitID('@', ote.Event.UserID) + if err != nil { + log.WithError(err).WithField("user_id", ote.Event.UserID).Error("Failed to extract domain from typing sender") + return nil + } + if typingServerName != t.ServerName { + log.WithField("other_server", typingServerName).Info("Suppressing typing notif: originated elsewhere") + return nil + } + joined, err := t.db.GetJoinedHosts(context.TODO(), ote.Event.RoomID) if err != nil { return err