From 8da05cc41394e404a4f9c96991ad8445117da320 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Dec 2017 14:55:27 +0000 Subject: [PATCH 1/5] Add some basic docs about opentracing (#366) --- docs/opentracing.md | 112 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 docs/opentracing.md diff --git a/docs/opentracing.md b/docs/opentracing.md new file mode 100644 index 00000000..a2110bc0 --- /dev/null +++ b/docs/opentracing.md @@ -0,0 +1,112 @@ +Opentracing +=========== + +Dendrite extensively uses the [opentracing.io](http://opentracing.io) framework +to trace work across the different logical components. + +At its most basic opentracing tracks "spans" of work; recording start and end +times as well as any parent span that caused the piece of work. + +A typical example would be a new span being created on an incoming request that +finishes when the response is sent. When the code needs to hit out to a +different component a new span is created with the initial span as its parent. +This would end up looking roughly like: + +``` +Received request Sent response + |<───────────────────────────────────────>| + |<────────────────────>| + RPC call RPC call returns +``` + +This is useful to see where the time is being spent processing a request on a +component. However, opentracing allows tracking of spans across components. This +makes it possible to see exactly what work goes into processing a request: + + +``` +Component 1 |<─────────────────── HTTP ────────────────────>| + |<──────────────── RPC ─────────────────>| +Component 2 |<─ SQL ─>| |<── RPC ───>| +Component 3 |<─ SQL ─>| +``` + +This is achieved by serializing span information during all communication +between components. For HTTP requests, this is achieved by the sender +serializing the span into a HTTP header, and the receiver deserializing the span +on receipt. (Generally a new span is then immediately created with the +deserialized span as the parent). + +A collection of spans that are related is called a trace. + + +Spans are passed through the code via contexts, rather than manually. It is +therefore important that all spans that are created are immediately added to the +current context. Thankfully the opentracing library gives helper functions for +doing this: + +```golang +span, ctx := opentracing.StartSpanFromContext(ctx, spanName) +defer span.Finish() +``` + +This will create a new span, adding any span already in `ctx` as a parent to the +new span. + + +Adding Information +------------------ + +Opentracing allows adding information to a trace via three mechanisms: +- "tags" ─ A span can be tagged with a key/value pair. This is typically + information that relates to the span, e.g. for spans created for incoming HTTP + requests could include the request path and response codes as tags, spans for + SQL could include the query being executed. +- "logs" ─ Key/value pairs can be looged at a particular instance in a trace. + This can be useful to log e.g. any errors that happen. +- "baggage" ─ Arbitrary key/value pairs can be added to a span to which all + child spans have access. Baggage isn't saved and so isn't available when + inspecting the traces, but can be used to add context to logs or tags in child + spans. + + +See +[specification.md](https://github.com/opentracing/specification/blob/master/specification.md) +for some of the common tags and log fields used. + + +Span Relationships +------------------ + +Spans can be related to each other. The most common relation is `childOf`, which +indicates the child span somehow depends on the parent span ─ typically the +parent span cannot complete until all child spans are completed. + +A second relation type is `followsFrom`, where the parent has no dependence on +the child span. This usually indicates some sort of fire and forget behaviour, +e.g. adding a message to a pipeline or inserting into a kafka topic. + + +Jaeger +------ + +Opentracing is just a framework. We use +[jaeger](https://github.com/jaegertracing/jaeger) as the actual implementation. + +Jaeger is responsible for recording, sending and saving traces, as well as +giving a UI for viewing and interacting with traces. + +To enable jaeger a `Tracer` object must be instansiated from the config (as well +as having a jaeger server running somewhere, usually locally). A `Tracer` does +several things: +- Decides which traces to save and send to the server. There are multiple + schemes for doing this, with a simple example being to save a certain fraction + of traces. +- Communicating with the jaeger backend. If not explicitly specified uses the + default port on localhost. +- Associates a service name to all spans created by the tracer. This service + name equates to a logical component, e.g. spans created by clientapi will have + a different service name than ones created by the syncapi. Database access + will also typically use a different service name. + + This means that there is a tracer per service name/component. From bc3dd821f9006e2055e8d3487d5adf507067232c Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 5 Dec 2017 16:16:14 +0000 Subject: [PATCH 2/5] Implemented ReCaptcha registration method (#343) Signed-off-by: Andrew (anoa) --- .../clientapi/auth/authtypes/logintypes.go | 1 + .../dendrite/clientapi/routing/register.go | 113 ++++++++++++++++-- .../dendrite/common/config/config.go | 23 +++- 3 files changed, 123 insertions(+), 14 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/logintypes.go b/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/logintypes.go index ca9c8b38..c4f7b046 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/logintypes.go +++ b/src/github.com/matrix-org/dendrite/clientapi/auth/authtypes/logintypes.go @@ -7,4 +7,5 @@ type LoginType string const ( LoginTypeDummy = "m.login.dummy" LoginTypeSharedSecret = "org.matrix.login.shared_secret" + LoginTypeRecaptcha = "m.login.recaptcha" ) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go index 6ef4ab05..7bd5820f 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/register.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/register.go @@ -19,12 +19,16 @@ import ( "context" "crypto/hmac" "crypto/sha1" + "encoding/json" "errors" "fmt" + "io/ioutil" "net/http" + "net/url" "regexp" "sort" "strings" + "time" "github.com/matrix-org/dendrite/common/config" @@ -74,6 +78,8 @@ type authDict struct { Session string `json:"session"` Mac gomatrixserverlib.HexString `json:"mac"` + // Recaptcha + Response string `json:"response"` // TODO: Lots of custom keys depending on the type } @@ -114,6 +120,14 @@ type registerResponse struct { DeviceID string `json:"device_id"` } +// recaptchaResponse represents the HTTP response from a Google Recaptcha server +type recaptchaResponse struct { + Success bool `json:"success"` + ChallengeTS time.Time `json:"challenge_ts"` + Hostname string `json:"hostname"` + ErrorCodes []int `json:"error-codes"` +} + // validateUserName returns an error response if the username is invalid func validateUserName(username string) *util.JSONResponse { // https://github.com/matrix-org/synapse/blob/v0.20.0/synapse/rest/client/v2_alpha/register.py#L161 @@ -153,6 +167,72 @@ func validatePassword(password string) *util.JSONResponse { return nil } +// validateRecaptcha returns an error response if the captcha response is invalid +func validateRecaptcha( + cfg *config.Dendrite, + response string, + clientip string, +) *util.JSONResponse { + if !cfg.Matrix.RecaptchaEnabled { + return &util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON("Captcha registration is disabled"), + } + } + + if response == "" { + return &util.JSONResponse{ + Code: 400, + JSON: jsonerror.BadJSON("Captcha response is required"), + } + } + + // Make a POST request to Google's API to check the captcha response + resp, err := http.PostForm(cfg.Matrix.RecaptchaSiteVerifyAPI, + url.Values{ + "secret": {cfg.Matrix.RecaptchaPrivateKey}, + "response": {response}, + "remoteip": {clientip}, + }, + ) + + if err != nil { + return &util.JSONResponse{ + Code: 500, + JSON: jsonerror.BadJSON("Error in requesting validation of captcha response"), + } + } + + // Close the request once we're finishing reading from it + defer resp.Body.Close() // nolint: errcheck + + // Grab the body of the response from the captcha server + var r recaptchaResponse + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return &util.JSONResponse{ + Code: 500, + JSON: jsonerror.BadJSON("Error in contacting captcha server" + err.Error()), + } + } + err = json.Unmarshal(body, &r) + if err != nil { + return &util.JSONResponse{ + Code: 500, + JSON: jsonerror.BadJSON("Error in unmarshaling captcha server's response: " + err.Error()), + } + } + + // Check that we received a "success" + if !r.Success { + return &util.JSONResponse{ + Code: 401, + JSON: jsonerror.BadJSON("Invalid captcha response. Please try again."), + } + } + return nil +} + // Register processes a /register request. http://matrix.org/speculator/spec/HEAD/client_server/unstable.html#post-matrix-client-unstable-register func Register( req *http.Request, @@ -221,26 +301,30 @@ func handleRegistrationFlow( // TODO: Handle loading of previous session parameters from database. // TODO: Handle mapping registrationRequest parameters into session parameters - // TODO: email / msisdn / recaptcha auth types. + // TODO: email / msisdn auth types. if cfg.Matrix.RegistrationDisabled && r.Auth.Type != authtypes.LoginTypeSharedSecret { return util.MessageResponse(403, "Registration has been disabled") } switch r.Auth.Type { - case authtypes.LoginTypeSharedSecret: - if cfg.Matrix.RegistrationSharedSecret == "" { - return util.MessageResponse(400, "Shared secret registration is disabled") + case authtypes.LoginTypeRecaptcha: + // Check given captcha response + resErr := validateRecaptcha(cfg, r.Auth.Response, req.RemoteAddr) + if resErr != nil { + return *resErr } - valid, err := isValidMacLogin(r.Username, r.Password, r.Admin, - r.Auth.Mac, cfg.Matrix.RegistrationSharedSecret) + // Add Recaptcha to the list of completed registration stages + sessions[sessionID] = append(sessions[sessionID], authtypes.LoginTypeRecaptcha) + + case authtypes.LoginTypeSharedSecret: + // Check shared secret against config + valid, err := isValidMacLogin(cfg, r.Username, r.Password, r.Admin, r.Auth.Mac) if err != nil { return httputil.LogThenError(req, err) - } - - if !valid { + } else if !valid { return util.MessageResponse(403, "HMAC incorrect") } @@ -303,7 +387,7 @@ func LegacyRegister( return util.MessageResponse(400, "Shared secret registration is disabled") } - valid, err := isValidMacLogin(r.Username, r.Password, r.Admin, r.Mac, cfg.Matrix.RegistrationSharedSecret) + valid, err := isValidMacLogin(cfg, r.Username, r.Password, r.Admin, r.Mac) if err != nil { return httputil.LogThenError(req, err) } @@ -412,11 +496,18 @@ func completeRegistration( // Used for shared secret registration. // Checks if the username, password and isAdmin flag matches the given mac. func isValidMacLogin( + cfg *config.Dendrite, username, password string, isAdmin bool, givenMac []byte, - sharedSecret string, ) (bool, error) { + sharedSecret := cfg.Matrix.RegistrationSharedSecret + + // Check that shared secret registration isn't disabled. + if cfg.Matrix.RegistrationSharedSecret == "" { + return false, errors.New("Shared secret registration is disabled") + } + // Double check that username/password don't contain the HMAC delimiters. We should have // already checked this. if strings.Contains(username, "\x00") { diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index 00217e46..d4a9a2c5 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -83,6 +83,18 @@ type Dendrite struct { // If set, allows registration by anyone who also has the shared // secret, even if registration is otherwise disabled. RegistrationSharedSecret string `yaml:"registration_shared_secret"` + // This Home Server's ReCAPTCHA public key. + RecaptchaPublicKey string `yaml:"recaptcha_public_key"` + // This Home Server's ReCAPTCHA private key. + RecaptchaPrivateKey string `yaml:"recaptcha_private_key"` + // Boolean stating whether catpcha registration is enabled + // and required + RecaptchaEnabled bool `yaml:"enable_registration_captcha"` + // Secret used to bypass the captcha registration entirely + RecaptchaBypassSecret string `yaml:"captcha_bypass_secret"` + // HTTP API endpoint used to verify whether the captcha response + // was successful + RecaptchaSiteVerifyAPI string `yaml:"recaptcha_siteverify_api"` // If set disables new users from registering (except via shared // secrets) RegistrationDisabled bool `yaml:"registration_disabled"` @@ -339,10 +351,15 @@ func (config *Dendrite) derive() { // TODO: Add email auth type // TODO: Add MSISDN auth type - // TODO: Add Recaptcha auth type - config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, - authtypes.Flow{[]authtypes.LoginType{authtypes.LoginTypeDummy}}) + if config.Matrix.RecaptchaEnabled { + config.Derived.Registration.Params[authtypes.LoginTypeRecaptcha] = map[string]string{"public_key": config.Matrix.RecaptchaPublicKey} + config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, + authtypes.Flow{[]authtypes.LoginType{authtypes.LoginTypeRecaptcha}}) + } else { + config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, + authtypes.Flow{[]authtypes.LoginType{authtypes.LoginTypeDummy}}) + } } // setDefaults sets default config values if they are not explicitly set. From 578d8cf49216d82a20c010828d674e7322298dd2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 09:36:50 +0000 Subject: [PATCH 3/5] Add CORS headers to all responses including errors (#364) --- .../cmd/dendrite-client-api-server/main.go | 2 +- .../cmd/dendrite-media-api-server/main.go | 2 +- .../cmd/dendrite-monolith-server/main.go | 2 +- .../dendrite-public-rooms-api-server/main.go | 2 +- .../cmd/dendrite-sync-api-server/main.go | 2 +- .../matrix-org/dendrite/common/httpapi.go | 24 +++++++++++++++++-- 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go index 929fd3b5..8794107f 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-client-api-server/main.go @@ -121,7 +121,7 @@ func main() { queryAPI, aliasAPI, accountDB, deviceDB, federation, keyRing, userUpdateProducer, syncProducer, ) - common.SetupHTTPAPI(http.DefaultServeMux, api) + common.SetupHTTPAPI(http.DefaultServeMux, common.WrapHandlerInCORS(api)) log.Fatal(http.ListenAndServe(string(cfg.Listen.ClientAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go index 5092f427..bc16dee7 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-media-api-server/main.go @@ -70,7 +70,7 @@ func main() { api := mux.NewRouter() routing.Setup(api, cfg, db, deviceDB, client) - common.SetupHTTPAPI(http.DefaultServeMux, api) + common.SetupHTTPAPI(http.DefaultServeMux, common.WrapHandlerInCORS(api)) log.Fatal(http.ListenAndServe(string(cfg.Listen.MediaAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go index 05fc4252..9ecfd60a 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-monolith-server/main.go @@ -103,7 +103,7 @@ func main() { // Expose the matrix APIs directly rather than putting them under a /api path. go func() { log.Info("Listening on ", *httpBindAddr) - log.Fatal(http.ListenAndServe(*httpBindAddr, m.api)) + log.Fatal(http.ListenAndServe(*httpBindAddr, common.WrapHandlerInCORS(m.api))) }() // Handle HTTPS if certificate and key are provided go func() { diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go index 448ede7d..24aae0da 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-public-rooms-api-server/main.go @@ -85,7 +85,7 @@ func main() { api := mux.NewRouter() routing.Setup(api, deviceDB, db) - common.SetupHTTPAPI(http.DefaultServeMux, api) + common.SetupHTTPAPI(http.DefaultServeMux, common.WrapHandlerInCORS(api)) log.Fatal(http.ListenAndServe(string(cfg.Listen.PublicRoomsAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go index e7f83a60..16ae228b 100644 --- a/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go +++ b/src/github.com/matrix-org/dendrite/cmd/dendrite-sync-api-server/main.go @@ -105,7 +105,7 @@ func main() { api := mux.NewRouter() routing.Setup(api, sync.NewRequestPool(db, n, adb), db, deviceDB) - common.SetupHTTPAPI(http.DefaultServeMux, api) + common.SetupHTTPAPI(http.DefaultServeMux, common.WrapHandlerInCORS(api)) log.Fatal(http.ListenAndServe(string(cfg.Listen.SyncAPI), nil)) } diff --git a/src/github.com/matrix-org/dendrite/common/httpapi.go b/src/github.com/matrix-org/dendrite/common/httpapi.go index 76182bf0..b2ef8959 100644 --- a/src/github.com/matrix-org/dendrite/common/httpapi.go +++ b/src/github.com/matrix-org/dendrite/common/httpapi.go @@ -4,7 +4,6 @@ import ( "net/http" "time" - "github.com/gorilla/mux" "github.com/matrix-org/dendrite/clientapi/auth" "github.com/matrix-org/dendrite/clientapi/auth/authtypes" "github.com/matrix-org/gomatrixserverlib" @@ -87,8 +86,29 @@ func MakeFedAPI( // SetupHTTPAPI registers an HTTP API mux under /api and sets up a metrics // listener. -func SetupHTTPAPI(servMux *http.ServeMux, apiMux *mux.Router) { +func SetupHTTPAPI(servMux *http.ServeMux, apiMux http.Handler) { // This is deprecated. servMux.Handle("/metrics", prometheus.Handler()) // nolint: megacheck, staticcheck servMux.Handle("/api/", http.StripPrefix("/api", apiMux)) } + +// WrapHandlerInCORS adds CORS headers to all responses, including all error +// responses. +// Handles OPTIONS requests directly. +func WrapHandlerInCORS(h http.Handler) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, Authorization") + + if r.Method == "OPTIONS" && r.Header.Get("Access-Control-Request-Method") != "" { + // Its easiest just to always return a 200 OK for everything. Whether + // this is technically correct or not is a question, but in the end this + // is what a lot of other people do (including synapse) and the clients + // are perfectly happy with it. + w.WriteHeader(http.StatusOK) + } else { + h.ServeHTTP(w, r) + } + }) +} From 75aa316a6a722bf1d30d837db8f08a9d112b24a5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 09:37:18 +0000 Subject: [PATCH 4/5] Write and read transaction id from sync DB (#367) --- .../dendrite/syncapi/consumers/roomserver.go | 1 + .../storage/output_room_events_table.go | 49 +++++++++++++++---- .../dendrite/syncapi/storage/syncserver.go | 8 ++- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go index 677eeb42..273b6aea 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/consumers/roomserver.go @@ -132,6 +132,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent( addsStateEvents, msg.AddsStateEventIDs, msg.RemovesStateEventIDs, + msg.TransactionID, ) if err != nil { return err diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go index fb00ad84..333f608d 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/output_room_events_table.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" + "github.com/matrix-org/dendrite/roomserver/api" + "github.com/lib/pq" "github.com/matrix-org/dendrite/common" "github.com/matrix-org/dendrite/syncapi/types" @@ -44,7 +46,9 @@ CREATE TABLE IF NOT EXISTS syncapi_output_room_events ( -- A list of event IDs which represent a delta of added/removed room state. This can be NULL -- if there is no delta. add_state_ids TEXT[], - remove_state_ids TEXT[] + remove_state_ids TEXT[], + device_id TEXT, -- The local device that sent the event, if any + transaction_id TEXT -- The transaction id used to send the event, if any ); -- for event selection CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_events(event_id); @@ -52,14 +56,14 @@ CREATE UNIQUE INDEX IF NOT EXISTS syncapi_event_id_idx ON syncapi_output_room_ev const insertEventSQL = "" + "INSERT INTO syncapi_output_room_events (" + - " room_id, event_id, event_json, add_state_ids, remove_state_ids" + - ") VALUES ($1, $2, $3, $4, $5) RETURNING id" + " room_id, event_id, event_json, add_state_ids, remove_state_ids, device_id, transaction_id" + + ") VALUES ($1, $2, $3, $4, $5, $6, $7) RETURNING id" const selectEventsSQL = "" + "SELECT id, event_json FROM syncapi_output_room_events WHERE event_id = ANY($1)" const selectRecentEventsSQL = "" + - "SELECT id, event_json FROM syncapi_output_room_events" + + "SELECT id, event_json, device_id, transaction_id FROM syncapi_output_room_events" + " WHERE room_id = $1 AND id > $2 AND id <= $3" + " ORDER BY id DESC LIMIT $4" @@ -164,7 +168,10 @@ func (s *outputRoomEventsStatements) selectStateInRange( } stateNeeded[ev.RoomID()] = needSet - eventIDToEvent[ev.EventID()] = streamEvent{ev, types.StreamPosition(streamPos)} + eventIDToEvent[ev.EventID()] = streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + } } return stateNeeded, eventIDToEvent, nil @@ -190,7 +197,14 @@ func (s *outputRoomEventsStatements) selectMaxEventID( func (s *outputRoomEventsStatements) insertEvent( ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.Event, addState, removeState []string, + transactionID *api.TransactionID, ) (streamPos int64, err error) { + var deviceID, txnID *string + if transactionID != nil { + deviceID = &transactionID.DeviceID + txnID = &transactionID.TransactionID + } + stmt := common.TxStmt(txn, s.insertEventStmt) err = stmt.QueryRowContext( ctx, @@ -199,6 +213,8 @@ func (s *outputRoomEventsStatements) insertEvent( event.JSON(), pq.StringArray(addState), pq.StringArray(removeState), + deviceID, + txnID, ).Scan(&streamPos) return } @@ -241,10 +257,13 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { var result []streamEvent for rows.Next() { var ( - streamPos int64 - eventBytes []byte + streamPos int64 + eventBytes []byte + deviceID *string + txnID *string + transactionID *api.TransactionID ) - if err := rows.Scan(&streamPos, &eventBytes); err != nil { + if err := rows.Scan(&streamPos, &eventBytes, &deviceID, &txnID); err != nil { return nil, err } // TODO: Handle redacted events @@ -252,7 +271,19 @@ func rowsToStreamEvents(rows *sql.Rows) ([]streamEvent, error) { if err != nil { return nil, err } - result = append(result, streamEvent{ev, types.StreamPosition(streamPos)}) + + if deviceID != nil && txnID != nil { + transactionID = &api.TransactionID{ + DeviceID: *deviceID, + TransactionID: *txnID, + } + } + + result = append(result, streamEvent{ + Event: ev, + streamPosition: types.StreamPosition(streamPos), + transactionID: transactionID, + }) } return result, nil } diff --git a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go index 1a18d937..8a5b9648 100644 --- a/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go +++ b/src/github.com/matrix-org/dendrite/syncapi/storage/syncserver.go @@ -18,6 +18,8 @@ import ( "context" "database/sql" "fmt" + + "github.com/matrix-org/dendrite/roomserver/api" // Import the postgres database driver. _ "github.com/lib/pq" "github.com/matrix-org/dendrite/common" @@ -38,6 +40,7 @@ type stateDelta struct { type streamEvent struct { gomatrixserverlib.Event streamPosition types.StreamPosition + transactionID *api.TransactionID } // SyncServerDatabase represents a sync server database @@ -100,10 +103,11 @@ func (d *SyncServerDatabase) WriteEvent( ev *gomatrixserverlib.Event, addStateEvents []gomatrixserverlib.Event, addStateEventIDs, removeStateEventIDs []string, + transactionID *api.TransactionID, ) (streamPos types.StreamPosition, returnErr error) { returnErr = common.WithTransaction(d.db, func(txn *sql.Tx) error { var err error - pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs) + pos, err := d.events.insertEvent(ctx, txn, ev, addStateEventIDs, removeStateEventIDs, transactionID) if err != nil { return err } @@ -565,7 +569,7 @@ func (d *SyncServerDatabase) getStateDeltas( } s := make([]streamEvent, len(allState)) for i := 0; i < len(s); i++ { - s[i] = streamEvent{allState[i], types.StreamPosition(0)} + s[i] = streamEvent{Event: allState[i], streamPosition: types.StreamPosition(0)} } state[roomID] = s continue // we'll add this room in when we do joined rooms From 16f593f7862c1bdedcf42c54dec1e8b2170e49c9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 6 Dec 2017 13:55:51 +0000 Subject: [PATCH 5/5] Fix some linting errors --- .../matrix-org/dendrite/clientapi/routing/register_test.go | 6 +++--- src/github.com/matrix-org/dendrite/common/config/config.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/clientapi/routing/register_test.go b/src/github.com/matrix-org/dendrite/clientapi/routing/register_test.go index de18c8d2..0fae27e9 100644 --- a/src/github.com/matrix-org/dendrite/clientapi/routing/register_test.go +++ b/src/github.com/matrix-org/dendrite/clientapi/routing/register_test.go @@ -22,15 +22,15 @@ import ( var ( // Registration Flows that the server allows. - allowedFlows []authtypes.Flow = []authtypes.Flow{ + allowedFlows = []authtypes.Flow{ { - []authtypes.LoginType{ + Stages: []authtypes.LoginType{ authtypes.LoginType("stage1"), authtypes.LoginType("stage2"), }, }, { - []authtypes.LoginType{ + Stages: []authtypes.LoginType{ authtypes.LoginType("stage1"), authtypes.LoginType("stage3"), }, diff --git a/src/github.com/matrix-org/dendrite/common/config/config.go b/src/github.com/matrix-org/dendrite/common/config/config.go index d4a9a2c5..f291a076 100644 --- a/src/github.com/matrix-org/dendrite/common/config/config.go +++ b/src/github.com/matrix-org/dendrite/common/config/config.go @@ -355,10 +355,10 @@ func (config *Dendrite) derive() { if config.Matrix.RecaptchaEnabled { config.Derived.Registration.Params[authtypes.LoginTypeRecaptcha] = map[string]string{"public_key": config.Matrix.RecaptchaPublicKey} config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, - authtypes.Flow{[]authtypes.LoginType{authtypes.LoginTypeRecaptcha}}) + authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeRecaptcha}}) } else { config.Derived.Registration.Flows = append(config.Derived.Registration.Flows, - authtypes.Flow{[]authtypes.LoginType{authtypes.LoginTypeDummy}}) + authtypes.Flow{Stages: []authtypes.LoginType{authtypes.LoginTypeDummy}}) } }