Start implementing /join for room aliases for rooms the server is not in. (#115)

* Start implementing the join room API

* Hacks to get join room working

* Make the TLS fingerprint configurable

* Fix the client API proxy to handle '#' correctly

* Return a 200 OK response

* Write the join event along with current state to the room server

* Comment on the error handling

* Fix typos

* Fix tab

* Add TODO for moving authEventIDs to gomatrixserverlib

* Comment on why we ignore the key ID argument for local keys

* Avoid 'preceeded'

* Neaten the control flow

* Neaten the control flow even more

* Return pointers

* Rename err to lastErr for the loop
This commit is contained in:
Mark Haines 2017-05-25 16:08:28 +01:00 committed by GitHub
parent 445dce14ae
commit 84ad4ff9f6
15 changed files with 528 additions and 66 deletions

View file

@ -45,22 +45,61 @@ func NewRoomserverProducer(kafkaURIs []string, topic string) (*RoomserverProduce
func (c *RoomserverProducer) SendEvents(events []gomatrixserverlib.Event) error {
eventIDs := make([]string, len(events))
ires := make([]api.InputRoomEvent, len(events))
for i := range events {
var authEventIDs []string
for _, ref := range events[i].AuthEvents() {
authEventIDs = append(authEventIDs, ref.EventID)
}
ire := api.InputRoomEvent{
for i, event := range events {
ires[i] = api.InputRoomEvent{
Kind: api.KindNew,
Event: events[i].JSON(),
AuthEventIDs: authEventIDs,
Event: event.JSON(),
AuthEventIDs: authEventIDs(event),
}
ires[i] = ire
eventIDs[i] = events[i].EventID()
eventIDs[i] = event.EventID()
}
return c.SendInputRoomEvents(ires, eventIDs)
}
// SendEventWithState writes an event with KindNew to the roomserver input log
// with the state at the event as KindOutlier before it.
func (c *RoomserverProducer) SendEventWithState(state gomatrixserverlib.RespState, event gomatrixserverlib.Event) error {
outliers, err := state.Events()
if err != nil {
return err
}
eventIDs := make([]string, len(outliers)+1)
ires := make([]api.InputRoomEvent, len(outliers)+1)
for i, outlier := range outliers {
ires[i] = api.InputRoomEvent{
Kind: api.KindOutlier,
Event: outlier.JSON(),
AuthEventIDs: authEventIDs(outlier),
}
eventIDs[i] = outlier.EventID()
}
stateEventIDs := make([]string, len(state.StateEvents))
for i := range state.StateEvents {
stateEventIDs[i] = state.StateEvents[i].EventID()
}
ires[len(outliers)] = api.InputRoomEvent{
Kind: api.KindNew,
Event: event.JSON(),
AuthEventIDs: authEventIDs(event),
HasState: true,
StateEventIDs: stateEventIDs,
}
eventIDs[len(outliers)] = event.EventID()
return c.SendInputRoomEvents(ires, eventIDs)
}
// TODO Make this a method on gomatrixserverlib.Event
func authEventIDs(event gomatrixserverlib.Event) (ids []string) {
for _, ref := range event.AuthEvents() {
ids = append(ids, ref.EventID)
}
return
}
// SendInputRoomEvents writes the given input room events to the roomserver input log. The length of both
// arrays must match, and each element must correspond to the same event.
func (c *RoomserverProducer) SendInputRoomEvents(ires []api.InputRoomEvent, eventIDs []string) error {

View file

@ -28,6 +28,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/writers"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
)
@ -36,8 +37,14 @@ const pathPrefixR0 = "/_matrix/client/r0"
// Setup registers HTTP handlers with the given ServeMux. It also supplies the given http.Client
// to clients which need to make outbound HTTP requests.
func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI, producer *producers.RoomserverProducer,
queryAPI api.RoomserverQueryAPI, accountDB *accounts.Database, deviceDB *devices.Database) {
func Setup(
servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI,
producer *producers.RoomserverProducer, queryAPI api.RoomserverQueryAPI,
accountDB *accounts.Database,
deviceDB *devices.Database,
federation *gomatrixserverlib.FederationClient,
keyRing gomatrixserverlib.KeyRing,
) {
apiMux := mux.NewRouter()
r0mux := apiMux.PathPrefix(pathPrefixR0).Subrouter()
r0mux.Handle("/createRoom",
@ -45,6 +52,14 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg config.ClientAPI
return writers.CreateRoom(req, device, cfg, producer)
}),
)
r0mux.Handle("/join/{roomIDOrAlias}",
common.MakeAuthAPI("join", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)
return writers.JoinRoomByIDOrAlias(
req, device, vars["roomIDOrAlias"], cfg, federation, producer, queryAPI, keyRing,
)
}),
)
r0mux.Handle("/rooms/{roomID}/send/{eventType}/{txnID}",
common.MakeAuthAPI("send_message", deviceDB, func(req *http.Request, device *authtypes.Device) util.JSONResponse {
vars := mux.Vars(req)

View file

@ -0,0 +1,263 @@
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package writers
import (
"fmt"
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/config"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrix"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"net/http"
"strings"
"time"
)
// JoinRoomByIDOrAlias implements the "/join/{roomIDOrAlias}" API.
// https://matrix.org/docs/spec/client_server/r0.2.0.html#post-matrix-client-r0-join-roomidoralias
func JoinRoomByIDOrAlias(
req *http.Request,
device *authtypes.Device,
roomIDOrAlias string,
cfg config.ClientAPI,
federation *gomatrixserverlib.FederationClient,
producer *producers.RoomserverProducer,
queryAPI api.RoomserverQueryAPI,
keyRing gomatrixserverlib.KeyRing,
) util.JSONResponse {
var content map[string]interface{} // must be a JSON object
if resErr := httputil.UnmarshalJSONRequest(req, &content); resErr != nil {
return *resErr
}
content["membership"] = "join"
r := joinRoomReq{req, content, device.UserID, cfg, federation, producer, queryAPI, keyRing}
if strings.HasPrefix(roomIDOrAlias, "!") {
return r.joinRoomByID()
}
if strings.HasPrefix(roomIDOrAlias, "#") {
return r.joinRoomByAlias(roomIDOrAlias)
}
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON("Invalid first character for room ID or alias"),
}
}
type joinRoomReq struct {
req *http.Request
content map[string]interface{}
userID string
cfg config.ClientAPI
federation *gomatrixserverlib.FederationClient
producer *producers.RoomserverProducer
queryAPI api.RoomserverQueryAPI
keyRing gomatrixserverlib.KeyRing
}
// joinRoomByID joins a room by room ID
func (r joinRoomReq) joinRoomByID() util.JSONResponse {
// TODO: Implement joining rooms by ID.
// A client should only join a room by room ID when it has an invite
// to the room. If the server is already in the room then we can
// lookup the invite and process the request as a normal state event.
// If the server is not in the room the we will need to look up the
// remote server the invite came from in order to request a join event
// from that server.
panic(fmt.Errorf("Joining rooms by ID is not implemented"))
}
// joinRoomByAlias joins a room using a room alias.
func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
domain, err := domainFromID(roomAlias)
if err != nil {
return util.JSONResponse{
Code: 400,
JSON: jsonerror.BadJSON("Room alias must be in the form '#localpart:domain'"),
}
}
if domain == r.cfg.ServerName {
// TODO: Implement joining local room aliases.
panic(fmt.Errorf("Joining local room aliases is not implemented"))
} else {
return r.joinRoomByRemoteAlias(domain, roomAlias)
}
}
func (r joinRoomReq) joinRoomByRemoteAlias(
domain gomatrixserverlib.ServerName, roomAlias string,
) util.JSONResponse {
resp, err := r.federation.LookupRoomAlias(domain, roomAlias)
if err != nil {
switch x := err.(type) {
case gomatrix.HTTPError:
if x.Code == 404 {
return util.JSONResponse{
Code: 404,
JSON: jsonerror.NotFound("Room alias not found"),
}
}
}
return httputil.LogThenError(r.req, err)
}
return r.joinRoomUsingServers(resp.RoomID, resp.Servers)
}
func (r joinRoomReq) writeToBuilder(eb *gomatrixserverlib.EventBuilder, roomID string) {
eb.Type = "m.room.member"
eb.SetContent(r.content) // TODO: Set avatar_url / displayname
eb.SetUnsigned(struct{}{})
eb.Sender = r.userID
eb.StateKey = &r.userID
eb.RoomID = roomID
eb.Redacts = ""
}
func (r joinRoomReq) joinRoomUsingServers(
roomID string, servers []gomatrixserverlib.ServerName,
) util.JSONResponse {
var eb gomatrixserverlib.EventBuilder
r.writeToBuilder(&eb, roomID)
needed, err := gomatrixserverlib.StateNeededForEventBuilder(&eb)
if err != nil {
return httputil.LogThenError(r.req, err)
}
// Ask the roomserver for information about this room
queryReq := api.QueryLatestEventsAndStateRequest{
RoomID: roomID,
StateToFetch: needed.Tuples(),
}
var queryRes api.QueryLatestEventsAndStateResponse
if queryErr := r.queryAPI.QueryLatestEventsAndState(&queryReq, &queryRes); queryErr != nil {
return httputil.LogThenError(r.req, queryErr)
}
if queryRes.RoomExists {
// TODO: Implement joining rooms that already the server is already in.
// This should just fall through to the usual event sending code.
panic(fmt.Errorf("Joining rooms that the server already in is not implemented"))
}
if len(servers) == 0 {
return util.JSONResponse{
Code: 404,
JSON: jsonerror.NotFound("No candidate servers found for room"),
}
}
var lastErr error
for _, server := range servers {
var response *util.JSONResponse
response, lastErr = r.joinRoomUsingServer(roomID, server)
if lastErr != nil {
// There was a problem talking to one of the servers.
util.GetLogger(r.req.Context()).WithError(lastErr).WithField("server", server).Warn("Failed to join room using server")
// Try the next server.
continue
}
return *response
}
// Every server we tried to join through resulted in an error.
// We return the error from the last server.
// TODO: Generate the correct HTTP status code for all different
// kinds of errors that could have happened.
// The possible errors include:
// 1) We can't connect to the remote servers.
// 2) None of the servers we could connect to think we are allowed
// to join the room.
// 3) The remote server returned something invalid.
// 4) We couldn't fetch the public keys needed to verify the
// signatures on the state events.
// 5) ...
return httputil.LogThenError(r.req, lastErr)
}
// joinRoomUsingServer tries to join a remote room using a given matrix server.
// If there was a failure communicating with the server or the response from the
// server was invalid this returns an error.
// Otherwise this returns a JSONResponse.
func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) {
respMakeJoin, err := r.federation.MakeJoin(server, roomID, r.userID)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
return nil, err
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
r.writeToBuilder(&respMakeJoin.JoinEvent, roomID)
now := time.Now()
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), r.cfg.ServerName)
event, err := respMakeJoin.JoinEvent.Build(
eventID, now, r.cfg.ServerName, r.cfg.KeyID, r.cfg.PrivateKey,
)
if err != nil {
res := httputil.LogThenError(r.req, err)
return &res, nil
}
respSendJoin, err := r.federation.SendJoin(server, event)
if err != nil {
return nil, err
}
if err = respSendJoin.Check(r.keyRing, event); err != nil {
return nil, err
}
if err = r.producer.SendEventWithState(
gomatrixserverlib.RespState(respSendJoin), event,
); err != nil {
res := httputil.LogThenError(r.req, err)
return &res, nil
}
return &util.JSONResponse{
Code: 200,
// TODO: Put the response struct somewhere common.
JSON: struct {
RoomID string `json:"room_id"`
}{roomID},
}, nil
}
// domainFromID returns everything after the first ":" character to extract
// the domain part of a matrix ID.
// TODO: duplicated from gomatrixserverlib.
func domainFromID(id string) (gomatrixserverlib.ServerName, error) {
// IDs have the format: SIGIL LOCALPART ":" DOMAIN
// Split on the first ":" character since the domain can contain ":"
// characters.
parts := strings.SplitN(id, ":", 2)
if len(parts) != 2 {
// The ID must have a ":" character.
return "", fmt.Errorf("invalid ID: %q", id)
}
// Return everything after the first ":" character.
return gomatrixserverlib.ServerName(parts[1]), nil
}

View file

@ -76,7 +76,11 @@ func makeProxy(targetURL string) (*httputil.ReverseProxy, error) {
"url": targetURL,
"method": req.Method,
}).Print("proxying request")
newURL, err := url.Parse(targetURL + path)
newURL, err := url.Parse(targetURL)
// Set the path separately as we need to preserve '#' characters
// that would otherwise be interpreted as being the start of a URL
// fragment.
newURL.Path += path
if err != nil {
// We already checked that we can parse the URL
// So this shouldn't ever get hit.

View file

@ -26,6 +26,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/routing"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/Sirupsen/logrus"
@ -81,6 +82,8 @@ func main() {
log.Panicf("Failed to setup kafka producers(%s): %s", cfg.KafkaProducerURIs, err)
}
federation := gomatrixserverlib.NewFederationClient(cfg.ServerName, cfg.KeyID, cfg.PrivateKey)
queryAPI := api.NewRoomserverQueryAPIHTTP(cfg.RoomserverURL, nil)
accountDB, err := accounts.NewDatabase(accountDataSource, serverName)
if err != nil {
@ -91,6 +94,32 @@ func main() {
log.Panicf("Failed to setup device database(%s): %s", accountDataSource, err.Error())
}
routing.Setup(http.DefaultServeMux, http.DefaultClient, cfg, roomserverProducer, queryAPI, accountDB, deviceDB)
keyRing := gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{
// TODO: Use perspective key fetchers for production.
&gomatrixserverlib.DirectKeyFetcher{federation.Client},
},
KeyDatabase: &dummyKeyDatabase{},
}
routing.Setup(
http.DefaultServeMux, http.DefaultClient, cfg, roomserverProducer,
queryAPI, accountDB, deviceDB, federation, keyRing,
)
log.Fatal(http.ListenAndServe(bindAddr, nil))
}
// TODO: Implement a proper key database.
type dummyKeyDatabase struct{}
func (d *dummyKeyDatabase) FetchKeys(
requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp,
) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) {
return nil, nil
}
func (d *dummyKeyDatabase) StoreKeys(
map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys,
) error {
return nil
}

View file

@ -15,6 +15,7 @@
package main
import (
"encoding/base64"
"net/http"
"os"
"time"
@ -32,6 +33,14 @@ var (
logDir = os.Getenv("LOG_DIR")
serverName = gomatrixserverlib.ServerName(os.Getenv("SERVER_NAME"))
serverKey = os.Getenv("SERVER_KEY")
// Base64 encoded SHA256 TLS fingerprint of the X509 certificate used by
// the public federation listener for this server.
// Can be generated from a PEM certificate called "server.crt" using:
//
// openssl x509 -noout -fingerprint -sha256 -inform pem -in server.crt |\
// python -c 'print raw_input()[19:].replace(":","").decode("hex").encode("base64").rstrip("=\n")'
//
tlsFingerprint = os.Getenv("TLS_FINGERPRINT")
)
func main() {
@ -44,6 +53,10 @@ func main() {
serverName = "localhost"
}
if tlsFingerprint == "" {
log.Panic("No TLS_FINGERPRINT environment variable found.")
}
cfg := config.FederationAPI{
ServerName: serverName,
// TODO: make the validity period configurable.
@ -56,6 +69,12 @@ func main() {
log.Panicf("Failed to load private key: %s", err)
}
var fingerprintSHA256 []byte
if fingerprintSHA256, err = base64.RawStdEncoding.DecodeString(tlsFingerprint); err != nil {
log.Panicf("Failed to load TLS fingerprint: %s", err)
}
cfg.TLSFingerPrints = []gomatrixserverlib.TLSFingerprint{{fingerprintSHA256}}
routing.Setup(http.DefaultServeMux, cfg)
log.Fatal(http.ListenAndServe(bindAddr, nil))
}

View file

@ -24,7 +24,7 @@ import (
)
const (
pathPrefixV2Keys = "/_matrix/keys/v2"
pathPrefixV2Keys = "/_matrix/key/v2"
)
// Setup registers HTTP handlers with the given ServeMux.
@ -32,11 +32,16 @@ func Setup(servMux *http.ServeMux, cfg config.FederationAPI) {
apiMux := mux.NewRouter()
v2keysmux := apiMux.PathPrefix(pathPrefixV2Keys).Subrouter()
v2keysmux.Handle("/server/",
makeAPI("localkeys", func(req *http.Request) util.JSONResponse {
return readers.LocalKeys(req, cfg)
}),
)
localKeys := makeAPI("localkeys", func(req *http.Request) util.JSONResponse {
return readers.LocalKeys(req, cfg)
})
// Ignore the {keyID} argument as we only have a single server key so we always
// return that key.
// Even if we had more than one server key, we would probably still ignore the
// {keyID} argument and always return a response containing all of the keys.
v2keysmux.Handle("/server/{keyID}", localKeys)
v2keysmux.Handle("/server/", localKeys)
servMux.Handle("/metrics", prometheus.Handler())
servMux.Handle("/api/", http.StripPrefix("/api", apiMux))