From 1165b49da78f4e4c92e9cbd5f5a633528a304068 Mon Sep 17 00:00:00 2001 From: Anant Prakash Date: Thu, 2 Aug 2018 22:52:44 +0530 Subject: [PATCH] Implement Typing server (#567) * update gomatrixserverlib * Make removeUser public * Implement api.TypingServerInputAPI * Integrate the typing server component, create kafka topic * Add typing server cmd for multiprocess dendrite --- dendrite-config.yaml | 1 + .../cmd/dendrite-monolith-server/main.go | 10 +- .../cmd/dendrite-typing-server/main.go | 36 +++++++ .../dendrite/common/config/config.go | 3 + .../dendrite/common/config/config_test.go | 1 + .../matrix-org/dendrite/common/test/config.go | 1 + .../dendrite/typingserver/api/output.go | 31 ++++++ .../dendrite/typingserver/cache/cache.go | 6 +- .../dendrite/typingserver/cache/cache_test.go | 4 +- .../dendrite/typingserver/input/input.go | 96 +++++++++++++++++++ .../dendrite/typingserver/typingserver.go | 15 ++- vendor/manifest | 28 +++--- .../matrix-org/gomatrixserverlib/edu.go | 23 +++++ .../matrix-org/gomatrixserverlib/eventauth.go | 2 + .../gomatrixserverlib/hooks/install.sh | 0 .../gomatrixserverlib/hooks/pre-commit | 0 .../matrix-org/gomatrixserverlib/linter.json | 2 +- .../gomatrixserverlib/transaction.go | 5 +- .../matrix-org/gomatrixserverlib/travis.sh | 0 19 files changed, 236 insertions(+), 28 deletions(-) create mode 100644 src/github.com/matrix-org/dendrite/cmd/dendrite-typing-server/main.go create mode 100644 src/github.com/matrix-org/dendrite/typingserver/api/output.go create mode 100644 src/github.com/matrix-org/dendrite/typingserver/input/input.go create mode 100644 vendor/src/github.com/matrix-org/gomatrixserverlib/edu.go mode change 100755 => 100644 vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/install.sh mode change 100755 => 100644 vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/pre-commit mode change 100755 => 100644 vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh diff --git a/dendrite-config.yaml b/dendrite-config.yaml index a838c1bb..d26e7477 100644 --- a/dendrite-config.yaml +++ b/dendrite-config.yaml @@ -86,6 +86,7 @@ kafka: topics: output_room_event: roomserverOutput output_client_data: clientapiOutput + output_typing_event: typingServerOutput user_updates: userUpdates # The postgres connection configs for connecting to the databases e.g a postgres:// URI diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index c6623128..b1ad0910 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -18,20 +18,20 @@ import ( "flag" "net/http" - "github.com/matrix-org/dendrite/common/keydb" - "github.com/matrix-org/dendrite/common/transactions" - "github.com/matrix-org/dendrite/typingserver" - "github.com/matrix-org/dendrite/appservice" "github.com/matrix-org/dendrite/clientapi" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/common/keydb" + "github.com/matrix-org/dendrite/common/transactions" "github.com/matrix-org/dendrite/federationapi" "github.com/matrix-org/dendrite/federationsender" "github.com/matrix-org/dendrite/mediaapi" "github.com/matrix-org/dendrite/publicroomsapi" "github.com/matrix-org/dendrite/roomserver" "github.com/matrix-org/dendrite/syncapi" + "github.com/matrix-org/dendrite/typingserver" + "github.com/matrix-org/dendrite/typingserver/cache" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/sirupsen/logrus" @@ -56,7 +56,7 @@ func main() { keyRing := keydb.CreateKeyRing(federation.Client, keyDB) alias, input, query := roomserver.SetupRoomServerComponent(base) - typingInputAPI := typingserver.SetupTypingServerComponent(base) + typingInputAPI := typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) clientapi.SetupClientAPIComponent( base, deviceDB, accountDB, diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-typing-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-typing-server/main.go new file mode 100644 index 00000000..4eb0823a --- /dev/null +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-typing-server/main.go @@ -0,0 +1,36 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + _ "net/http/pprof" + + "github.com/matrix-org/dendrite/common/basecomponent" + "github.com/matrix-org/dendrite/typingserver" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/sirupsen/logrus" +) + +func main() { + cfg := basecomponent.ParseFlags() + base := basecomponent.NewBaseDendrite(cfg, "TypingServerAPI") + defer func() { + if err := base.Close(); err != nil { + logrus.WithError(err).Warn("BaseDendrite close failed") + } + }() + + typingserver.SetupTypingServerComponent(base, cache.NewTypingCache()) + + base.SetupAndServeHTTP(string(base.Cfg.Listen.TypingServer)) +} diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index f901e01f..16e50aea 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -134,6 +134,8 @@ type Dendrite struct { OutputRoomEvent Topic `yaml:"output_room_event"` // Topic for sending account data from client API to sync API OutputClientData Topic `yaml:"output_client_data"` + // Topic for typingserver/api.OutputTypingEvent events. + OutputTypingEvent Topic `yaml:"output_typing_event"` // Topic for user updates (profile, presence) UserUpdates Topic `yaml:"user_updates"` } @@ -527,6 +529,7 @@ func (config *Dendrite) checkKafka(configErrs *configErrors, monolithic bool) { } checkNotEmpty(configErrs, "kafka.topics.output_room_event", string(config.Kafka.Topics.OutputRoomEvent)) checkNotEmpty(configErrs, "kafka.topics.output_client_data", string(config.Kafka.Topics.OutputClientData)) + checkNotEmpty(configErrs, "kafka.topics.output_typing_event", string(config.Kafka.Topics.OutputTypingEvent)) checkNotEmpty(configErrs, "kafka.topics.user_updates", string(config.Kafka.Topics.UserUpdates)) } diff --git a/src/github.com/matrix-org/dendrite/common/config/config_test.go b/src/github.com/matrix-org/dendrite/common/config/config_test.go index e91e03d6..acc4dbd1 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config_test.go +++ b/src/github.com/matrix-org/dendrite/common/config/config_test.go @@ -45,6 +45,7 @@ kafka: topics: output_room_event: output.room output_client_data: output.client + output_typing_event: output.typing user_updates: output.user database: media_api: "postgresql:///media_api" diff --git a/src/github.com/matrix-org/dendrite/common/test/config.go b/src/github.com/matrix-org/dendrite/common/test/config.go index 2c023b9a..08a1b398 100644 --- a/src/github.com/matrix-org/dendrite/common/test/config.go +++ b/src/github.com/matrix-org/dendrite/common/test/config.go @@ -83,6 +83,7 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con // Make this configurable somehow? cfg.Kafka.Topics.OutputRoomEvent = "test.room.output" cfg.Kafka.Topics.OutputClientData = "test.clientapi.output" + cfg.Kafka.Topics.OutputTypingEvent = "test.typing.output" cfg.Kafka.Topics.UserUpdates = "test.user.output" // TODO: Use different databases for the different schemas. diff --git a/src/github.com/matrix-org/dendrite/typingserver/api/output.go b/src/github.com/matrix-org/dendrite/typingserver/api/output.go new file mode 100644 index 00000000..08f83499 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/typingserver/api/output.go @@ -0,0 +1,31 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +// OutputTypingEvent is an entry in typing server output kafka log. +type OutputTypingEvent struct { + // The Event for the typing edu event. + Event TypingEvent `json:"event"` +} + +// TypingEvent represents a matrix edu event of type 'm.typing'. +type TypingEvent struct { + Type string `json:"type"` + RoomID string `json:"room_id"` + Content TypingEventContent `json:"content"` +} + +// TypingEventContent for TypingEvent +type TypingEventContent struct { + UserIDs []string `json:"user_ids"` +} diff --git a/src/github.com/matrix-org/dendrite/typingserver/cache/cache.go b/src/github.com/matrix-org/dendrite/typingserver/cache/cache.go index b1b9452a..739f60a2 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/cache/cache.go +++ b/src/github.com/matrix-org/dendrite/typingserver/cache/cache.go @@ -85,12 +85,12 @@ func (t *TypingCache) addUser(userID, roomID string, expiryTimer *time.Timer) { // This removes the user. func (t *TypingCache) timeoutCallback(userID, roomID string) func() { return func() { - t.removeUser(userID, roomID) + t.RemoveUser(userID, roomID) } } -// removeUser with mutex lock & stop the timer. -func (t *TypingCache) removeUser(userID, roomID string) { +// RemoveUser with mutex lock & stop the timer. +func (t *TypingCache) RemoveUser(userID, roomID string) { t.Lock() defer t.Unlock() diff --git a/src/github.com/matrix-org/dendrite/typingserver/cache/cache_test.go b/src/github.com/matrix-org/dendrite/typingserver/cache/cache_test.go index 92baebaf..7aa73e92 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/cache/cache_test.go +++ b/src/github.com/matrix-org/dendrite/typingserver/cache/cache_test.go @@ -33,7 +33,7 @@ func TestTypingCache(t *testing.T) { testGetTypingUsers(t, tCache) }) - t.Run("removeUser", func(t *testing.T) { + t.Run("RemoveUser", func(t *testing.T) { testRemoveUser(t, tCache) }) } @@ -90,7 +90,7 @@ func testRemoveUser(t *testing.T, tCache *TypingCache) { } length := len(tt.userIDs) - tCache.removeUser(tt.userIDs[length-1], tt.roomID) + tCache.RemoveUser(tt.userIDs[length-1], tt.roomID) expLeftUsers := tt.userIDs[:length-1] if leftUsers := tCache.GetTypingUsers(tt.roomID); !test.UnsortedStringSliceEqual(leftUsers, expLeftUsers) { t.Errorf("Response after removal is unexpected. Want = %s, got = %s", leftUsers, expLeftUsers) diff --git a/src/github.com/matrix-org/dendrite/typingserver/input/input.go b/src/github.com/matrix-org/dendrite/typingserver/input/input.go new file mode 100644 index 00000000..735c4da6 --- /dev/null +++ b/src/github.com/matrix-org/dendrite/typingserver/input/input.go @@ -0,0 +1,96 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package input + +import ( + "context" + "encoding/json" + "net/http" + "time" + + "github.com/matrix-org/dendrite/common" + "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" + "gopkg.in/Shopify/sarama.v1" +) + +// TypingServerInputAPI implements api.TypingServerInputAPI +type TypingServerInputAPI struct { + // Cache to store the current typing members in each room. + Cache *cache.TypingCache + // The kafka topic to output new typing events to. + OutputTypingEventTopic string + // kafka producer + Producer sarama.SyncProducer +} + +// InputTypingEvent implements api.TypingServerInputAPI +func (t *TypingServerInputAPI) InputTypingEvent( + ctx context.Context, + request *api.InputTypingEventRequest, + response *api.InputTypingEventResponse, +) error { + ite := &request.InputTypingEvent + if ite.Typing { + // user is typing, update our current state of users typing. + expireTime := ite.OriginServerTS.Time().Add( + time.Duration(ite.Timeout) * time.Millisecond, + ) + t.Cache.AddTypingUser(ite.UserID, ite.RoomID, &expireTime) + } else { + t.Cache.RemoveUser(ite.UserID, ite.RoomID) + } + + return t.sendUpdateForRoom(ite.RoomID) +} + +func (t *TypingServerInputAPI) sendUpdateForRoom(roomID string) error { + userIDs := t.Cache.GetTypingUsers(roomID) + event := &api.TypingEvent{ + Type: gomatrixserverlib.MTyping, + RoomID: roomID, + Content: api.TypingEventContent{UserIDs: userIDs}, + } + eventJSON, err := json.Marshal(api.OutputTypingEvent{Event: *event}) + if err != nil { + return err + } + + m := &sarama.ProducerMessage{ + Topic: string(t.OutputTypingEventTopic), + Key: sarama.StringEncoder(roomID), + Value: sarama.ByteEncoder(eventJSON), + } + + _, _, err = t.Producer.SendMessage(m) + return err +} + +// SetupHTTP adds the TypingServerInputAPI handlers to the http.ServeMux. +func (t *TypingServerInputAPI) SetupHTTP(servMux *http.ServeMux) { + servMux.Handle(api.TypingServerInputTypingEventPath, + common.MakeInternalAPI("inputTypingEvents", func(req *http.Request) util.JSONResponse { + var request api.InputTypingEventRequest + var response api.InputTypingEventResponse + if err := json.NewDecoder(req.Body).Decode(&request); err != nil { + return util.MessageResponse(http.StatusBadRequest, err.Error()) + } + if err := t.InputTypingEvent(req.Context(), &request, &response); err != nil { + return util.ErrorResponse(err) + } + return util.JSONResponse{Code: http.StatusOK, JSON: &response} + }), + ) +} diff --git a/src/github.com/matrix-org/dendrite/typingserver/typingserver.go b/src/github.com/matrix-org/dendrite/typingserver/typingserver.go index d611d677..b43f72f7 100644 --- a/src/github.com/matrix-org/dendrite/typingserver/typingserver.go +++ b/src/github.com/matrix-org/dendrite/typingserver/typingserver.go @@ -13,8 +13,12 @@ package typingserver import ( + "net/http" + "github.com/matrix-org/dendrite/common/basecomponent" "github.com/matrix-org/dendrite/typingserver/api" + "github.com/matrix-org/dendrite/typingserver/cache" + "github.com/matrix-org/dendrite/typingserver/input" ) // SetupTypingServerComponent sets up and registers HTTP handlers for the @@ -23,7 +27,14 @@ import ( // APIs directly instead of having to use HTTP. func SetupTypingServerComponent( base *basecomponent.BaseDendrite, + typingCache *cache.TypingCache, ) api.TypingServerInputAPI { - // TODO: implement typing server - return base.CreateHTTPTypingServerAPIs() + inputAPI := &input.TypingServerInputAPI{ + Cache: typingCache, + Producer: base.KafkaProducer, + OutputTypingEventTopic: string(base.Cfg.Kafka.Topics.OutputTypingEvent), + } + + inputAPI.SetupHTTP(http.DefaultServeMux) + return inputAPI } diff --git a/vendor/manifest b/vendor/manifest index 71b834ee..a1504853 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -108,6 +108,19 @@ "revision": "392c28fe23e1c45ddba891b0320b3b5df220beea", "branch": "master" }, + { + "importpath": "github.com/jaegertracing/jaeger-client-go", + "repository": "https://github.com/jaegertracing/jaeger-client-go", + "revision": "3ad49a1d839b517923a6fdac36d81cbf7b744f37", + "branch": "master" + }, + { + "importpath": "github.com/jaegertracing/jaeger-lib/metrics", + "repository": "https://github.com/jaegertracing/jaeger-lib", + "revision": "21a3da6d66fe0e278072676fdc84cd4c9ccb9b67", + "branch": "master", + "path": "/metrics" + }, { "importpath": "github.com/klauspost/crc32", "repository": "https://github.com/klauspost/crc32", @@ -135,7 +148,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "929828872b51e6733166553d6b1a20155b6ab829", + "revision": "677bbe93ffc9ad9ba5de615cd81185d0493f5d25", "branch": "master" }, { @@ -304,19 +317,6 @@ "revision": "54f72d32435d760d5604f17a82e2435b28dc4ba5", "branch": "master" }, - { - "importpath": "github.com/jaegertracing/jaeger-client-go", - "repository": "https://github.com/jaegertracing/jaeger-client-go", - "revision": "3ad49a1d839b517923a6fdac36d81cbf7b744f37", - "branch": "master" - }, - { - "importpath": "github.com/jaegertracing/jaeger-lib/metrics", - "repository": "https://github.com/jaegertracing/jaeger-lib", - "revision": "21a3da6d66fe0e278072676fdc84cd4c9ccb9b67", - "branch": "master", - "path": "/metrics" - }, { "importpath": "github.com/uber/tchannel-go", "repository": "https://github.com/uber/tchannel-go", diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/edu.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/edu.go new file mode 100644 index 00000000..8cd01e03 --- /dev/null +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/edu.go @@ -0,0 +1,23 @@ +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gomatrixserverlib + +// EDU represents a EDU received via federation +// https://matrix.org/docs/spec/server_server/unstable.html#edus +type EDU struct { + Type string `json:"edu_type"` + Origin string `json:"origin"` + Destination string `json:"destination"` + Content RawJSON `json:"content"` +} diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/eventauth.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/eventauth.go index 428e9a0e..43f7b018 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/eventauth.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/eventauth.go @@ -47,6 +47,8 @@ const ( MRoomHistoryVisibility = "m.room.history_visibility" // MRoomRedaction https://matrix.org/docs/spec/client_server/r0.2.0.html#id21 MRoomRedaction = "m.room.redaction" + // MTyping https://matrix.org/docs/spec/client_server/r0.3.0.html#m-typing + MTyping = "m.typing" ) // StateNeeded lists the event types and state_keys needed to authenticate an event. diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/install.sh b/vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/install.sh old mode 100755 new mode 100644 diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/pre-commit b/vendor/src/github.com/matrix-org/gomatrixserverlib/hooks/pre-commit old mode 100755 new mode 100644 diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/linter.json b/vendor/src/github.com/matrix-org/gomatrixserverlib/linter.json index aae74954..2309ddc4 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/linter.json +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/linter.json @@ -11,7 +11,7 @@ "structcheck", "maligned", "ineffassign", - "gas", + "gosec", "misspell", "gosimple", "megacheck", diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/transaction.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/transaction.go index 918c18e5..b97de9ef 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/transaction.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/transaction.go @@ -16,11 +16,14 @@ type Transaction struct { // the destination server. Multiple transactions can be sent by the origin // server to the destination server in parallel so there may be more than // one previous transaction. - PreviousIDs []TransactionID `json:"previous_ids"` + PreviousIDs []TransactionID `json:"previous_ids,omitempty"` // The room events pushed from the origin server to the destination server // by this transaction. The events should either be events that originate // on the origin server or be join m.room.member events. PDUs []Event `json:"pdus"` + // The ephemeral events pushed from origin server to destination server + // by this transaction. The events must orginate at the origin server. + EDUs []EDU `json:"edus,omitempty"` } // A TransactionID identifies a transaction sent by a matrix server to another diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh b/vendor/src/github.com/matrix-org/gomatrixserverlib/travis.sh old mode 100755 new mode 100644