Rename FS queue package to internal (#997)

This commit is contained in:
Neil Alexander 2020-05-01 13:01:50 +01:00 committed by GitHub
parent 17e046f18f
commit 908108c23e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 6 additions and 6 deletions

View file

@ -0,0 +1,97 @@
package internal
import (
"encoding/json"
"net/http"
"github.com/matrix-org/dendrite/common"
"github.com/matrix-org/dendrite/common/config"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/producers"
"github.com/matrix-org/dendrite/federationsender/storage"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// FederationSenderInternalAPI is an implementation of api.FederationSenderInternalAPI
type FederationSenderInternalAPI struct {
api.FederationSenderInternalAPI
db storage.Database
cfg *config.Dendrite
producer *producers.RoomserverProducer
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
}
func NewFederationSenderInternalAPI(
db storage.Database, cfg *config.Dendrite,
producer *producers.RoomserverProducer,
federation *gomatrixserverlib.FederationClient,
keyRing *gomatrixserverlib.KeyRing,
) *FederationSenderInternalAPI {
return &FederationSenderInternalAPI{
db: db,
cfg: cfg,
producer: producer,
federation: federation,
keyRing: keyRing,
}
}
// SetupHTTP adds the FederationSenderInternalAPI handlers to the http.ServeMux.
func (f *FederationSenderInternalAPI) SetupHTTP(servMux *http.ServeMux) {
servMux.Handle(
api.FederationSenderQueryJoinedHostsInRoomPath,
common.MakeInternalAPI("QueryJoinedHostsInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostsInRoomRequest
var response api.QueryJoinedHostsInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostsInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(
api.FederationSenderQueryJoinedHostServerNamesInRoomPath,
common.MakeInternalAPI("QueryJoinedHostServerNamesInRoom", func(req *http.Request) util.JSONResponse {
var request api.QueryJoinedHostServerNamesInRoomRequest
var response api.QueryJoinedHostServerNamesInRoomResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.ErrorResponse(err)
}
if err := f.QueryJoinedHostServerNamesInRoom(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.FederationSenderPerformJoinRequestPath,
common.MakeInternalAPI("PerformJoinRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformJoinRequest
var response api.PerformJoinResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformJoin(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
servMux.Handle(api.FederationSenderPerformLeaveRequestPath,
common.MakeInternalAPI("PerformLeaveRequest", func(req *http.Request) util.JSONResponse {
var request api.PerformLeaveRequest
var response api.PerformLeaveResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := f.PerformLeave(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -0,0 +1,120 @@
package internal
import (
"context"
"fmt"
"time"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/federationsender/internal/perform"
"github.com/matrix-org/dendrite/roomserver/version"
"github.com/matrix-org/gomatrixserverlib"
)
// PerformJoinRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformJoin(
ctx context.Context,
request *api.PerformJoinRequest,
response *api.PerformJoinResponse,
) (err error) {
// Look up the supported room versions.
var supportedVersions []gomatrixserverlib.RoomVersion
for version := range version.SupportedRoomVersions() {
supportedVersions = append(supportedVersions, version)
}
// Try to perform a make_join using the information supplied in the
// request.
respMakeJoin, err := r.federation.MakeJoin(
ctx,
request.ServerName,
request.RoomID,
request.UserID,
supportedVersions,
)
if err != nil {
// TODO: Check if the user was not allowed to join the room.
return fmt.Errorf("r.federation.MakeJoin: %w", err)
}
// Set all the fields to be what they should be, this should be a no-op
// but it's possible that the remote server returned us something "odd"
respMakeJoin.JoinEvent.Type = "m.room.member"
respMakeJoin.JoinEvent.Sender = request.UserID
respMakeJoin.JoinEvent.StateKey = &request.UserID
respMakeJoin.JoinEvent.RoomID = request.RoomID
respMakeJoin.JoinEvent.Redacts = ""
if request.Content == nil {
request.Content = map[string]interface{}{}
}
request.Content["membership"] = "join"
if err = respMakeJoin.JoinEvent.SetContent(request.Content); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetContent: %w", err)
}
if err = respMakeJoin.JoinEvent.SetUnsigned(struct{}{}); err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.SetUnsigned: %w", err)
}
// Work out if we support the room version that has been supplied in
// the make_join response.
if respMakeJoin.RoomVersion == "" {
respMakeJoin.RoomVersion = gomatrixserverlib.RoomVersionV1
}
if _, err = respMakeJoin.RoomVersion.EventFormat(); err != nil {
return fmt.Errorf("respMakeJoin.RoomVersion.EventFormat: %w", err)
}
// Build the join event.
event, err := respMakeJoin.JoinEvent.Build(
time.Now(),
r.cfg.Matrix.ServerName,
r.cfg.Matrix.KeyID,
r.cfg.Matrix.PrivateKey,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("respMakeJoin.JoinEvent.Build: %w", err)
}
// Try to perform a send_join using the newly built event.
respSendJoin, err := r.federation.SendJoin(
ctx,
request.ServerName,
event,
respMakeJoin.RoomVersion,
)
if err != nil {
return fmt.Errorf("r.federation.SendJoin: %w", err)
}
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
if err = joinCtx.CheckSendJoinResponse(
ctx, event, request.ServerName, respMakeJoin, respSendJoin,
); err != nil {
return fmt.Errorf("perform.JoinRequest.CheckSendJoinResponse: %w", err)
}
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
if err = r.producer.SendEventWithState(
ctx,
respSendJoin.ToRespState(),
event.Headered(respMakeJoin.RoomVersion),
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}
// Everything went to plan.
return nil
}
// PerformLeaveRequest implements api.FederationSenderInternalAPI
func (r *FederationSenderInternalAPI) PerformLeave(
ctx context.Context,
request *api.PerformLeaveRequest,
response *api.PerformLeaveResponse,
) (err error) {
return nil
}

View file

@ -0,0 +1,70 @@
package perform
import (
"context"
"fmt"
"github.com/matrix-org/gomatrixserverlib"
)
// This file contains helpers for the PerformJoin function.
type joinContext struct {
federation *gomatrixserverlib.FederationClient
keyRing *gomatrixserverlib.KeyRing
}
// Returns a new join context.
func JoinContext(f *gomatrixserverlib.FederationClient, k *gomatrixserverlib.KeyRing) *joinContext {
return &joinContext{
federation: f,
keyRing: k,
}
}
// checkSendJoinResponse checks that all of the signatures are correct
// and that the join is allowed by the supplied state.
func (r joinContext) CheckSendJoinResponse(
ctx context.Context,
event gomatrixserverlib.Event,
server gomatrixserverlib.ServerName,
respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) error {
// A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join.
retries := map[string]bool{}
retryCheck:
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
if err := respSendJoin.Check(ctx, r.keyRing, event); err != nil {
switch e := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
// Check that we haven't already retried for this event, prevents
// us from ending up in endless loops
if !retries[e.AuthEventID] {
// Ask the server that we're talking to right now for the event
tx, txerr := r.federation.GetEvent(ctx, server, e.AuthEventID)
if txerr != nil {
return fmt.Errorf("r.federation.GetEvent: %w", txerr)
}
// For each event returned, add it to the auth events.
for _, pdu := range tx.PDUs {
ev, everr := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, respMakeJoin.RoomVersion)
if everr != nil {
return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", everr)
}
respSendJoin.AuthEvents = append(respSendJoin.AuthEvents, ev)
}
// Mark the event as retried and then give the check another go.
retries[e.AuthEventID] = true
goto retryCheck
}
return fmt.Errorf("respSendJoin (after retries): %w", e)
default:
return fmt.Errorf("respSendJoin: %w", err)
}
}
return nil
}

View file

@ -0,0 +1,39 @@
package internal
import (
"context"
"github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/gomatrixserverlib"
)
// QueryJoinedHostsInRoom implements api.FederationSenderInternalAPI
func (f *FederationSenderInternalAPI) QueryJoinedHostsInRoom(
ctx context.Context,
request *api.QueryJoinedHostsInRoomRequest,
response *api.QueryJoinedHostsInRoomResponse,
) (err error) {
response.JoinedHosts, err = f.db.GetJoinedHosts(ctx, request.RoomID)
return
}
// QueryJoinedHostServerNamesInRoom implements api.FederationSenderInternalAPI
func (f *FederationSenderInternalAPI) QueryJoinedHostServerNamesInRoom(
ctx context.Context,
request *api.QueryJoinedHostServerNamesInRoomRequest,
response *api.QueryJoinedHostServerNamesInRoomResponse,
) (err error) {
joinedHosts, err := f.db.GetJoinedHosts(ctx, request.RoomID)
if err != nil {
return
}
response.ServerNames = make([]gomatrixserverlib.ServerName, 0, len(joinedHosts))
for _, host := range joinedHosts {
response.ServerNames = append(response.ServerNames, host.ServerName)
}
// TODO: remove duplicates?
return
}