Merge branch 'master' into matthew/peeking

This commit is contained in:
Matthew Hodgson 2020-09-03 21:01:30 +01:00
commit 8712ea337a
79 changed files with 4073 additions and 1928 deletions

View file

@ -342,8 +342,7 @@ func createRoom(
}
// send events to the room server
_, err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil)
if err != nil {
if err = roomserverAPI.SendEvents(req.Context(), rsAPI, builtEvents, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}

View file

@ -41,15 +41,13 @@ type flows struct {
}
type flow struct {
Type string `json:"type"`
Stages []string `json:"stages"`
Type string `json:"type"`
}
func passwordLogin() flows {
f := flows{}
s := flow{
Type: "m.login.password",
Stages: []string{"m.login.password"},
Type: "m.login.password",
}
f.Flows = append(f.Flows, s)
return f

View file

@ -75,13 +75,12 @@ func sendMembership(ctx context.Context, accountDB accounts.Database, device *us
return jsonerror.InternalServerError()
}
_, err = roomserverAPI.SendEvents(
if err = roomserverAPI.SendEvents(
ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{event.Event.Headered(roomVer)},
cfg.Matrix.ServerName,
nil,
)
if err != nil {
); err != nil {
util.GetLogger(ctx).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -270,7 +269,7 @@ func buildMembershipEvent(
return nil, err
}
return eventutil.BuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
return eventutil.QueryAndBuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
}
// loadProfile lookups the profile of a given user from the database and returns

View file

@ -171,7 +171,7 @@ func SetAvatarURL(
return jsonerror.InternalServerError()
}
if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -289,7 +289,7 @@ func SetDisplayName(
return jsonerror.InternalServerError()
}
if _, err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
if err := api.SendEvents(req.Context(), rsAPI, events, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
@ -375,7 +375,7 @@ func buildMembershipEvents(
return nil, err
}
event, err := eventutil.BuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
event, err := eventutil.QueryAndBuildEvent(ctx, &builder, cfg.Matrix, evTime, rsAPI, nil)
if err != nil {
return nil, err
}

View file

@ -0,0 +1,99 @@
package routing
import (
"net/http"
"sync"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/util"
)
type rateLimits struct {
limits map[string]chan struct{}
limitsMutex sync.RWMutex
enabled bool
requestThreshold int64
cooloffDuration time.Duration
}
func newRateLimits(cfg *config.RateLimiting) *rateLimits {
l := &rateLimits{
limits: make(map[string]chan struct{}),
enabled: cfg.Enabled,
requestThreshold: cfg.Threshold,
cooloffDuration: time.Duration(cfg.CooloffMS) * time.Millisecond,
}
if l.enabled {
go l.clean()
}
return l
}
func (l *rateLimits) clean() {
for {
// On a 30 second interval, we'll take an exclusive write
// lock of the entire map and see if any of the channels are
// empty. If they are then we will close and delete them,
// freeing up memory.
time.Sleep(time.Second * 30)
l.limitsMutex.Lock()
for k, c := range l.limits {
if len(c) == 0 {
close(c)
delete(l.limits, k)
}
}
l.limitsMutex.Unlock()
}
}
func (l *rateLimits) rateLimit(req *http.Request) *util.JSONResponse {
// If rate limiting is disabled then do nothing.
if !l.enabled {
return nil
}
// Lock the map long enough to check for rate limiting. We hold it
// for longer here than we really need to but it makes sure that we
// also don't conflict with the cleaner goroutine which might clean
// up a channel after we have retrieved it otherwise.
l.limitsMutex.RLock()
defer l.limitsMutex.RUnlock()
// First of all, work out if X-Forwarded-For was sent to us. If not
// then we'll just use the IP address of the caller.
caller := req.RemoteAddr
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
caller = forwardedFor
}
// Look up the caller's channel, if they have one. If they don't then
// let's create one.
rateLimit, ok := l.limits[caller]
if !ok {
l.limits[caller] = make(chan struct{}, l.requestThreshold)
rateLimit = l.limits[caller]
}
// Check if the user has got free resource slots for this request.
// If they don't then we'll return an error.
select {
case rateLimit <- struct{}{}:
default:
// We hit the rate limit. Tell the client to back off.
return &util.JSONResponse{
Code: http.StatusTooManyRequests,
JSON: jsonerror.LimitExceeded("You are sending too many requests too quickly!", l.cooloffDuration.Milliseconds()),
}
}
// After the time interval, drain a resource from the rate limiting
// channel. This will free up space in the channel for new requests.
go func() {
<-time.After(l.cooloffDuration)
<-rateLimit
}()
return nil
}

View file

@ -115,15 +115,14 @@ func SendRedaction(
}
var queryRes api.QueryLatestEventsAndStateResponse
e, err := eventutil.BuildEvent(req.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, time.Now(), rsAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return util.JSONResponse{
Code: http.StatusNotFound,
JSON: jsonerror.NotFound("Room does not exist"),
}
}
_, err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil)
if err != nil {
if err = roomserverAPI.SendEvents(context.Background(), rsAPI, []gomatrixserverlib.HeaderedEvent{*e}, cfg.Matrix.ServerName, nil); err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to SendEvents")
return jsonerror.InternalServerError()
}

View file

@ -60,6 +60,7 @@ func Setup(
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
rateLimits := newRateLimits(&cfg.RateLimiting)
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
publicAPIMux.Handle("/versions",
@ -92,6 +93,9 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -119,6 +123,9 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/join",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -130,6 +137,9 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/leave",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -150,6 +160,9 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/invite",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -264,14 +277,23 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return Register(req, userAPI, accountDB, cfg)
})).Methods(http.MethodPost, http.MethodOptions)
v1mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return LegacyRegister(req, userAPI, cfg)
})).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/register/available", httputil.MakeExternalAPI("registerAvailable", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return RegisterAvailable(req, cfg, accountDB)
})).Methods(http.MethodGet, http.MethodOptions)
@ -343,6 +365,9 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/typing/{userID}",
httputil.MakeAuthAPI("rooms_typing", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -396,6 +421,9 @@ func Setup(
r0mux.Handle("/account/whoami",
httputil.MakeAuthAPI("whoami", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return Whoami(req, device)
}),
).Methods(http.MethodGet, http.MethodOptions)
@ -404,6 +432,9 @@ func Setup(
r0mux.Handle("/login",
httputil.MakeExternalAPI("login", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return Login(req, accountDB, userAPI, cfg)
}),
).Methods(http.MethodGet, http.MethodPost, http.MethodOptions)
@ -458,6 +489,9 @@ func Setup(
r0mux.Handle("/profile/{userID}/avatar_url",
httputil.MakeAuthAPI("profile_avatar_url", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -480,6 +514,9 @@ func Setup(
r0mux.Handle("/profile/{userID}/displayname",
httputil.MakeAuthAPI("profile_displayname", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
if err != nil {
return util.ErrorResponse(err)
@ -517,6 +554,9 @@ func Setup(
// Riot logs get flooded unless this is handled
r0mux.Handle("/presence/{userID}/status",
httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
// TODO: Set presence (probably the responsibility of a presence server not clientapi)
return util.JSONResponse{
Code: http.StatusOK,
@ -527,6 +567,9 @@ func Setup(
r0mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return RequestTurnServer(req, device, cfg)
}),
).Methods(http.MethodGet, http.MethodOptions)
@ -593,6 +636,9 @@ func Setup(
r0mux.Handle("/user_directory/search",
httputil.MakeAuthAPI("userdirectory_search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
postContent := struct {
SearchString string `json:"search_term"`
Limit int `json:"limit"`
@ -634,6 +680,9 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/read_markers",
httputil.MakeExternalAPI("rooms_read_markers", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
// TODO: return the read_markers.
return util.JSONResponse{Code: http.StatusOK, JSON: struct{}{}}
}),
@ -732,6 +781,9 @@ func Setup(
r0mux.Handle("/capabilities",
httputil.MakeAuthAPI("capabilities", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
return *r
}
return GetCapabilities(req, rsAPI)
}),
).Methods(http.MethodGet)

View file

@ -90,27 +90,26 @@ func SendEvent(
// pass the new event to the roomserver and receive the correct event ID
// event ID in case of duplicate transaction is discarded
eventID, err := api.SendEvents(
if err := api.SendEvents(
req.Context(), rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(verRes.RoomVersion),
},
cfg.Matrix.ServerName,
txnAndSessionID,
)
if err != nil {
); err != nil {
util.GetLogger(req.Context()).WithError(err).Error("SendEvents failed")
return jsonerror.InternalServerError()
}
util.GetLogger(req.Context()).WithFields(logrus.Fields{
"event_id": eventID,
"event_id": e.EventID(),
"room_id": roomID,
"room_version": verRes.RoomVersion,
}).Info("Sent event to roomserver")
res := util.JSONResponse{
Code: http.StatusOK,
JSON: sendEventResponse{eventID},
JSON: sendEventResponse{e.EventID()},
}
// Add response to transactionsCache
if txnID != nil {
@ -158,7 +157,7 @@ func generateSendEvent(
}
var queryRes api.QueryLatestEventsAndStateResponse
e, err := eventutil.BuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes)
e, err := eventutil.QueryAndBuildEvent(req.Context(), &builder, cfg.Matrix, evTime, rsAPI, &queryRes)
if err == eventutil.ErrRoomNoExists {
return nil, &util.JSONResponse{
Code: http.StatusNotFound,

View file

@ -354,12 +354,12 @@ func emit3PIDInviteEvent(
}
queryRes := api.QueryLatestEventsAndStateResponse{}
event, err := eventutil.BuildEvent(ctx, builder, cfg.Matrix, evTime, rsAPI, &queryRes)
event, err := eventutil.QueryAndBuildEvent(ctx, builder, cfg.Matrix, evTime, rsAPI, &queryRes)
if err != nil {
return err
}
_, err = api.SendEvents(
return api.SendEvents(
ctx, rsAPI,
[]gomatrixserverlib.HeaderedEvent{
(*event).Headered(queryRes.RoomVersion),
@ -367,5 +367,4 @@ func emit3PIDInviteEvent(
cfg.Matrix.ServerName,
nil,
)
return err
}