Further room version wiring (#936)

* Room version 2 by default, other wiring updates, update gomatrixserverlib

* Fix nil pointer exception

* Fix some more nil pointer exceptions hopefully

* Update gomatrixserverlib

* Send all room versions when joining, not just stable ones

* Remove room version cquery

* Get room version when getting events from the roomserver database

* Reset default back to room version 2

* Don't generate event IDs unless needed

* Revert "Remove room version cquery"

This reverts commit a170d5873360dd059614460acc8b21ab2cda9767.

* Query room version in federation API, client API as needed

* Improvements to make_join send_join dance

* Make room server producers use headered events

* Lint tweaks

* Update gomatrixserverlib

* Versioned SendJoin

* Query room version in syncapi backfill

* Handle transaction marshalling/unmarshalling within Dendrite

* Sorta fix federation (kinda)

* whoops commit federation API too

* Use NewEventFromTrustedJSON when getting events from the database

* Update gomatrixserverlib

* Strip headers on federationapi endpoints

* Fix bug in clientapi profile room version query

* Update gomatrixserverlib

* Return more useful error if room version query doesn't find the room

* Update gomatrixserverlib

* Update gomatrixserverlib

* Maybe fix federation

* Fix formatting directive

* Update sytest whitelist and blacklist

* Temporarily disable room versions 3 and 4 until gmsl is fixed

* Fix count of EDUs in logging

* Update gomatrixserverlib

* Update gomatrixserverlib

* Update gomatrixserverlib

* Rely on EventBuilder in gmsl to generate the event IDs for us

* Some review comments fixed

* Move function out of common and into gmsl

* Comment in federationsender destinationqueue

* Update gomatrixserverlib
This commit is contained in:
Neil Alexander 2020-03-27 16:28:22 +00:00 committed by GitHub
parent 314da91f1d
commit 05e1ae8745
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 532 additions and 407 deletions

View file

@ -42,7 +42,7 @@ func SetupFederationAPIComponent(
asAPI appserviceAPI.AppServiceQueryAPI,
federationSenderAPI federationSenderAPI.FederationSenderQueryAPI,
) {
roomserverProducer := producers.NewRoomserverProducer(inputAPI)
roomserverProducer := producers.NewRoomserverProducer(inputAPI, queryAPI)
routing.Setup(
base.APIMux, base.Cfg, queryAPI, aliasAPI, asAPI,

View file

@ -15,6 +15,7 @@
package routing
import (
"encoding/json"
"net/http"
"strconv"
"time"
@ -91,9 +92,14 @@ func Backfill(
}
}
var eventJSONs []json.RawMessage
for _, e := range evs {
eventJSONs = append(eventJSONs, e.JSON())
}
txn := gomatrixserverlib.Transaction{
Origin: cfg.Matrix.ServerName,
PDUs: evs,
PDUs: eventJSONs,
OriginServerTS: gomatrixserverlib.AsTimestamp(time.Now()),
}

View file

@ -15,12 +15,13 @@
package routing
import (
"encoding/json"
"context"
"net/http"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
@ -35,10 +36,19 @@ func Invite(
producer *producers.RoomserverProducer,
keys gomatrixserverlib.KeyRing,
) util.JSONResponse {
// Look up the room version for the room.
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.QueryAPI.QueryRoomVersionForRoom(context.Background(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
// Decode the event JSON from the request.
var event gomatrixserverlib.Event
if err := json.Unmarshal(request.Content(), &event); err != nil {
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
@ -70,9 +80,10 @@ func Invite(
}
// Check that the event is signed by the server sending the request.
redacted := event.Redact()
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
ServerName: event.Origin(),
Message: event.Redact().JSON(),
Message: redacted.JSON(),
AtTS: event.OriginServerTS(),
}}
verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests)

View file

@ -15,7 +15,6 @@
package routing
import (
"encoding/json"
"net/http"
"time"
@ -36,6 +35,15 @@ func MakeJoin(
query api.RoomserverQueryAPI,
roomID, userID string,
) util.JSONResponse {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := query.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
_, domain, err := gomatrixserverlib.SplitID('@', userID)
if err != nil {
return util.JSONResponse{
@ -63,7 +71,9 @@ func MakeJoin(
return jsonerror.InternalServerError()
}
var queryRes api.QueryLatestEventsAndStateResponse
queryRes := api.QueryLatestEventsAndStateResponse{
RoomVersion: verRes.RoomVersion,
}
event, err := common.BuildEvent(httpReq.Context(), &builder, cfg, time.Now(), query, &queryRes)
if err == common.ErrRoomNoExists {
return util.JSONResponse{
@ -80,6 +90,7 @@ func MakeJoin(
for i := range queryRes.StateEvents {
stateEvents[i] = &queryRes.StateEvents[i].Event
}
provider := gomatrixserverlib.NewAuthEvents(stateEvents)
if err = gomatrixserverlib.Allowed(*event, &provider); err != nil {
return util.JSONResponse{
@ -90,7 +101,10 @@ func MakeJoin(
return util.JSONResponse{
Code: http.StatusOK,
JSON: map[string]interface{}{"event": builder},
JSON: map[string]interface{}{
"event": builder,
"room_version": verRes.RoomVersion,
},
}
}
@ -104,8 +118,18 @@ func SendJoin(
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
var event gomatrixserverlib.Event
if err := json.Unmarshal(request.Content(), &event); err != nil {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := query.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("query.QueryRoomVersionForRoom failed")
return util.JSONResponse{
Code: http.StatusInternalServerError,
JSON: jsonerror.InternalServerError(),
}
}
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
@ -137,9 +161,10 @@ func SendJoin(
}
// Check that the event is signed by the server sending the request.
redacted := event.Redact()
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
ServerName: event.Origin(),
Message: event.Redact().JSON(),
Message: redacted.JSON(),
AtTS: event.OriginServerTS(),
}}
verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests)
@ -150,7 +175,7 @@ func SendJoin(
if verifyResults[0].Error != nil {
return util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden("The join must be signed by the server it originated on"),
JSON: jsonerror.Forbidden("Signature check failed: " + verifyResults[0].Error.Error()),
}
}
@ -178,7 +203,12 @@ func SendJoin(
// We are responsible for notifying other servers that the user has joined
// the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil,
httpReq.Context(),
[]gomatrixserverlib.HeaderedEvent{
event.Headered(stateAndAuthChainResponse.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
@ -188,8 +218,8 @@ func SendJoin(
return util.JSONResponse{
Code: http.StatusOK,
JSON: map[string]interface{}{
"state": stateAndAuthChainResponse.StateEvents,
"auth_chain": stateAndAuthChainResponse.AuthChainEvents,
"state": gomatrixserverlib.UnwrapEventHeaders(stateAndAuthChainResponse.StateEvents),
"auth_chain": gomatrixserverlib.UnwrapEventHeaders(stateAndAuthChainResponse.AuthChainEvents),
},
}
}

View file

@ -13,7 +13,6 @@
package routing
import (
"encoding/json"
"net/http"
"time"
@ -101,8 +100,18 @@ func SendLeave(
keys gomatrixserverlib.KeyRing,
roomID, eventID string,
) util.JSONResponse {
var event gomatrixserverlib.Event
if err := json.Unmarshal(request.Content(), &event); err != nil {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := producer.QueryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
// Decode the event JSON from the request.
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(request.Content(), verRes.RoomVersion)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
@ -134,9 +143,10 @@ func SendLeave(
}
// Check that the event is signed by the server sending the request.
redacted := event.Redact()
verifyRequests := []gomatrixserverlib.VerifyJSONRequest{{
ServerName: event.Origin(),
Message: event.Redact().JSON(),
Message: redacted.JSON(),
AtTS: event.OriginServerTS(),
}}
verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests)
@ -166,7 +176,14 @@ func SendLeave(
// Send the events to the room server.
// We are responsible for notifying other servers that the user has left
// the room, so set SendAsServer to cfg.Matrix.ServerName
_, err = producer.SendEvents(httpReq.Context(), []gomatrixserverlib.Event{event}, cfg.Matrix.ServerName, nil)
_, err = producer.SendEvents(
httpReq.Context(),
[]gomatrixserverlib.HeaderedEvent{
event.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError()

View file

@ -39,7 +39,6 @@ func Send(
keys gomatrixserverlib.KeyRing,
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
t := txnReq{
context: httpReq.Context(),
query: query,
@ -47,17 +46,26 @@ func Send(
keys: keys,
federation: federation,
}
if err := json.Unmarshal(request.Content(), &t); err != nil {
var txnEvents struct {
PDUs []json.RawMessage `json:"pdus"`
EDUs []json.RawMessage `json:"edus"`
}
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
}
}
t.PDUs = txnEvents.PDUs
t.Origin = request.Origin()
t.TransactionID = txnID
t.Destination = cfg.Matrix.ServerName
util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs))
resp, err := t.processTransaction()
if err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("t.processTransaction failed")
@ -80,15 +88,37 @@ type txnReq struct {
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
// Check the event signatures
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, t.PDUs, t.keys); err != nil {
return nil, err
var pdus []gomatrixserverlib.HeaderedEvent
for _, pdu := range t.PDUs {
var header struct {
RoomID string `json:"room_id"`
}
if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event")
return nil, err
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.query.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID)
return nil, err
}
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion)
if err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
return nil, err
}
if err := gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, err
}
pdus = append(pdus, event.Headered(verRes.RoomVersion))
}
// Process the events.
results := map[string]gomatrixserverlib.PDUResult{}
for _, e := range t.PDUs {
err := t.processEvent(e)
for _, e := range pdus {
err := t.processEvent(e.Unwrap())
if err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
@ -123,7 +153,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
}
// TODO: Process the EDUs.
util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", len(results), t.TransactionID)
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
@ -159,13 +189,13 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
}
if !stateResp.PrevEventsExist {
return t.processEventWithMissingState(e)
return t.processEventWithMissingState(e, stateResp.RoomVersion)
}
// Check that the event is allowed by the state at the event.
var events []gomatrixserverlib.Event
for _, headeredEvent := range stateResp.StateEvents {
events = append(events, headeredEvent.Event)
events = append(events, headeredEvent.Unwrap())
}
if err := checkAllowedByState(e, events); err != nil {
return err
@ -175,7 +205,14 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event) error {
// TODO: Check that the event is allowed by its auth_events.
// pass the event to the roomserver
_, err := t.producer.SendEvents(t.context, []gomatrixserverlib.Event{e}, api.DoNotSendToOtherServers, nil)
_, err := t.producer.SendEvents(
t.context,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
api.DoNotSendToOtherServers,
nil,
)
return err
}
@ -190,7 +227,7 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState)
}
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion) error {
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@ -207,7 +244,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
// need to fallback to /state.
// TODO: Attempt to fill in the gap using /get_missing_events
// TODO: Attempt to fetch the state using /state_ids and /events
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID())
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID(), roomVersion)
if err != nil {
return err
}
@ -225,7 +262,7 @@ retryAllowedState:
if s.EventID() != missing.AuthEventID {
continue
}
err = t.processEventWithMissingState(s)
err = t.processEventWithMissingState(s, roomVersion)
// If there was no error retrieving the event from federation then
// we assume that it succeeded, so retry the original state check
if err == nil {
@ -236,6 +273,7 @@ retryAllowedState:
}
return err
}
// pass the event along with the state to the roomserver
return t.producer.SendEventWithState(t.context, state, e)
return t.producer.SendEventWithState(t.context, state, e.Headered(roomVersion))
}

View file

@ -129,17 +129,9 @@ func getState(
return nil, &util.JSONResponse{Code: http.StatusNotFound, JSON: nil}
}
var stateEvents, authEvents []gomatrixserverlib.Event
for _, headeredEvent := range response.StateEvents {
stateEvents = append(stateEvents, headeredEvent.Event)
}
for _, headeredEvent := range response.AuthChainEvents {
authEvents = append(authEvents, headeredEvent.Event)
}
return &gomatrixserverlib.RespState{
StateEvents: stateEvents,
AuthEvents: authEvents,
StateEvents: gomatrixserverlib.UnwrapEventHeaders(response.StateEvents),
AuthEvents: gomatrixserverlib.UnwrapEventHeaders(response.AuthChainEvents),
}, nil
}

View file

@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"time"
@ -28,6 +27,7 @@ import (
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/roomserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@ -68,8 +68,17 @@ func CreateInvitesFrom3PIDInvites(
return *reqErr
}
evs := []gomatrixserverlib.Event{}
evs := []gomatrixserverlib.HeaderedEvent{}
for _, inv := range body.Invites {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(req.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
event, err := createInviteFrom3PIDInvite(
req.Context(), queryAPI, asAPI, cfg, inv, federation, accountDB,
)
@ -78,7 +87,7 @@ func CreateInvitesFrom3PIDInvites(
return jsonerror.InternalServerError()
}
if event != nil {
evs = append(evs, *event)
evs = append(evs, (*event).Headered(verRes.RoomVersion))
}
}
@ -137,6 +146,15 @@ func ExchangeThirdPartyInvite(
}
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: roomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err = queryAPI.QueryRoomVersionForRoom(httpReq.Context(), &verReq, &verRes); err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.UnsupportedRoomVersion(err.Error()),
}
}
// Auth and build the event from what the remote server sent us
event, err := buildMembershipEvent(httpReq.Context(), &builder, queryAPI, cfg)
if err == errNotInRoom {
@ -159,7 +177,12 @@ func ExchangeThirdPartyInvite(
// Send the event to the roomserver
if _, err = producer.SendEvents(
httpReq.Context(), []gomatrixserverlib.Event{signedEvent.Event}, cfg.Matrix.ServerName, nil,
httpReq.Context(),
[]gomatrixserverlib.HeaderedEvent{
signedEvent.Event.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
nil,
); err != nil {
util.GetLogger(httpReq.Context()).WithError(err).Error("producer.SendEvents failed")
return jsonerror.InternalServerError()
@ -181,6 +204,12 @@ func createInviteFrom3PIDInvite(
inv invite, federation *gomatrixserverlib.FederationClient,
accountDB accounts.Database,
) (*gomatrixserverlib.Event, error) {
verReq := api.QueryRoomVersionForRoomRequest{RoomID: inv.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := queryAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
return nil, err
}
_, server, err := gomatrixserverlib.SplitID('@', inv.MXID)
if err != nil {
return nil, err
@ -280,9 +309,9 @@ func buildMembershipEvent(
}
builder.AuthEvents = refs
eventID := fmt.Sprintf("$%s:%s", util.RandomString(16), cfg.Matrix.ServerName)
event, err := builder.Build(
eventID, time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID, cfg.Matrix.PrivateKey,
time.Now(), cfg.Matrix.ServerName, cfg.Matrix.KeyID,
cfg.Matrix.PrivateKey, queryRes.RoomVersion,
)
return &event, err