Merge branch 'master' into nats

This commit is contained in:
Neil Alexander 2021-12-03 17:35:36 +00:00
commit 9bc0731ec4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
30 changed files with 281 additions and 123 deletions

View file

@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"github.com/matrix-org/dendrite/userapi/api"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)
@ -72,6 +73,9 @@ func PostJSON(
var errorBody struct {
Message string `json:"message"`
}
if _, ok := response.(*api.PerformKeyBackupResponse); ok { // TODO: remove this, once cross-boundary errors are a thing
return nil
}
if msgerr := json.NewDecoder(res.Body).Decode(&errorBody); msgerr == nil {
return fmt.Errorf("internal API: %d from %s: %s", res.StatusCode, apiURL, errorBody.Message)
}

View file

@ -0,0 +1,109 @@
package httputil
import (
"net/http"
"sync"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/util"
)
type RateLimits struct {
limits map[string]chan struct{}
limitsMutex sync.RWMutex
cleanMutex sync.RWMutex
enabled bool
requestThreshold int64
cooloffDuration time.Duration
}
func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
l := &RateLimits{
limits: make(map[string]chan struct{}),
enabled: cfg.Enabled,
requestThreshold: cfg.Threshold,
cooloffDuration: time.Duration(cfg.CooloffMS) * time.Millisecond,
}
if l.enabled {
go l.clean()
}
return l
}
func (l *RateLimits) clean() {
for {
// On a 30 second interval, we'll take an exclusive write
// lock of the entire map and see if any of the channels are
// empty. If they are then we will close and delete them,
// freeing up memory.
time.Sleep(time.Second * 30)
l.cleanMutex.Lock()
l.limitsMutex.Lock()
for k, c := range l.limits {
if len(c) == 0 {
close(c)
delete(l.limits, k)
}
}
l.limitsMutex.Unlock()
l.cleanMutex.Unlock()
}
}
func (l *RateLimits) Limit(req *http.Request) *util.JSONResponse {
// If rate limiting is disabled then do nothing.
if !l.enabled {
return nil
}
// Take a read lock out on the cleaner mutex. The cleaner expects to
// be able to take a write lock, which isn't possible while there are
// readers, so this has the effect of blocking the cleaner goroutine
// from doing its work until there are no requests in flight.
l.cleanMutex.RLock()
defer l.cleanMutex.RUnlock()
// First of all, work out if X-Forwarded-For was sent to us. If not
// then we'll just use the IP address of the caller.
caller := req.RemoteAddr
if forwardedFor := req.Header.Get("X-Forwarded-For"); forwardedFor != "" {
caller = forwardedFor
}
// Look up the caller's channel, if they have one.
l.limitsMutex.RLock()
rateLimit, ok := l.limits[caller]
l.limitsMutex.RUnlock()
// If the caller doesn't have a channel, create one and write it
// back to the map.
if !ok {
rateLimit = make(chan struct{}, l.requestThreshold)
l.limitsMutex.Lock()
l.limits[caller] = rateLimit
l.limitsMutex.Unlock()
}
// Check if the user has got free resource slots for this request.
// If they don't then we'll return an error.
select {
case rateLimit <- struct{}{}:
default:
// We hit the rate limit. Tell the client to back off.
return &util.JSONResponse{
Code: http.StatusTooManyRequests,
JSON: jsonerror.LimitExceeded("You are sending too many requests too quickly!", l.cooloffDuration.Milliseconds()),
}
}
// After the time interval, drain a resource from the rate limiting
// channel. This will free up space in the channel for new requests.
go func() {
<-time.After(l.cooloffDuration)
<-rateLimit
}()
return nil
}