mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-07-29 12:42:46 +00:00
Tweak AS registration check and AS component HTTP clients (#1785)
* Tweak AS registration check * Check appservice usernames using correct function * Update sytest-whitelist * Use gomatrixserverlib.Client since that allows us to disable TLS validation using the config * Add appservice-specific client and ability to control TLS validation for appservices only * Set timeout on appservice client * Review comments * Remove dead code * Enforce LoginTypeApplicationService after all * Check correct auth type field
This commit is contained in:
parent
9557ccada4
commit
1ad96e2e2d
9 changed files with 64 additions and 47 deletions
|
@ -16,9 +16,7 @@ package appservice
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
|
||||
|
@ -48,6 +46,7 @@ func NewInternalAPI(
|
|||
userAPI userapi.UserInternalAPI,
|
||||
rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
) appserviceAPI.AppServiceQueryAPI {
|
||||
client := base.CreateAppserviceClient()
|
||||
consumer, _ := kafka.SetupConsumerProducer(&base.Cfg.Global.Kafka)
|
||||
|
||||
// Create a connection to the appservice postgres DB
|
||||
|
@ -79,10 +78,8 @@ func NewInternalAPI(
|
|||
// Create appserivce query API with an HTTP client that will be used for all
|
||||
// outbound and inbound requests (inbound only for the internal API)
|
||||
appserviceQueryAPI := &query.AppServiceQueryAPI{
|
||||
HTTPClient: &http.Client{
|
||||
Timeout: time.Second * 30,
|
||||
},
|
||||
Cfg: base.Cfg,
|
||||
HTTPClient: client,
|
||||
Cfg: base.Cfg,
|
||||
}
|
||||
|
||||
// Only consume if we actually have ASes to track, else we'll just chew cycles needlessly.
|
||||
|
@ -98,7 +95,7 @@ func NewInternalAPI(
|
|||
}
|
||||
|
||||
// Create application service transaction workers
|
||||
if err := workers.SetupTransactionWorkers(appserviceDB, workerStates); err != nil {
|
||||
if err := workers.SetupTransactionWorkers(client, appserviceDB, workerStates); err != nil {
|
||||
logrus.WithError(err).Panicf("failed to start app service transaction workers")
|
||||
}
|
||||
return appserviceQueryAPI
|
||||
|
|
|
@ -20,10 +20,10 @@ import (
|
|||
"context"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/matrix-org/dendrite/appservice/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
opentracing "github.com/opentracing/opentracing-go"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
@ -33,7 +33,7 @@ const userIDExistsPath = "/users/"
|
|||
|
||||
// AppServiceQueryAPI is an implementation of api.AppServiceQueryAPI
|
||||
type AppServiceQueryAPI struct {
|
||||
HTTPClient *http.Client
|
||||
HTTPClient *gomatrixserverlib.Client
|
||||
Cfg *config.Dendrite
|
||||
}
|
||||
|
||||
|
@ -47,11 +47,6 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
|||
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceRoomAlias")
|
||||
defer span.Finish()
|
||||
|
||||
// Create an HTTP client if one does not already exist
|
||||
if a.HTTPClient == nil {
|
||||
a.HTTPClient = makeHTTPClient()
|
||||
}
|
||||
|
||||
// Determine which application service should handle this request
|
||||
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
||||
if appservice.URL != "" && appservice.IsInterestedInRoomAlias(request.Alias) {
|
||||
|
@ -68,7 +63,7 @@ func (a *AppServiceQueryAPI) RoomAliasExists(
|
|||
}
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := a.HTTPClient.Do(req)
|
||||
resp, err := a.HTTPClient.DoHTTPRequest(ctx, req)
|
||||
if resp != nil {
|
||||
defer func() {
|
||||
err = resp.Body.Close()
|
||||
|
@ -115,11 +110,6 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
|||
span, ctx := opentracing.StartSpanFromContext(ctx, "ApplicationServiceUserID")
|
||||
defer span.Finish()
|
||||
|
||||
// Create an HTTP client if one does not already exist
|
||||
if a.HTTPClient == nil {
|
||||
a.HTTPClient = makeHTTPClient()
|
||||
}
|
||||
|
||||
// Determine which application service should handle this request
|
||||
for _, appservice := range a.Cfg.Derived.ApplicationServices {
|
||||
if appservice.URL != "" && appservice.IsInterestedInUserID(request.UserID) {
|
||||
|
@ -134,7 +124,7 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, err := a.HTTPClient.Do(req.WithContext(ctx))
|
||||
resp, err := a.HTTPClient.DoHTTPRequest(ctx, req)
|
||||
if resp != nil {
|
||||
defer func() {
|
||||
err = resp.Body.Close()
|
||||
|
@ -169,10 +159,3 @@ func (a *AppServiceQueryAPI) UserIDExists(
|
|||
response.UserIDExists = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// makeHTTPClient creates an HTTP client with certain options that will be used for all query requests to application services
|
||||
func makeHTTPClient() *http.Client {
|
||||
return &http.Client{
|
||||
Timeout: time.Second * 30,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,8 +34,6 @@ import (
|
|||
var (
|
||||
// Maximum size of events sent in each transaction.
|
||||
transactionBatchSize = 50
|
||||
// Timeout for sending a single transaction to an application service.
|
||||
transactionTimeout = time.Second * 60
|
||||
)
|
||||
|
||||
// SetupTransactionWorkers spawns a separate goroutine for each application
|
||||
|
@ -44,6 +42,7 @@ var (
|
|||
// size), then send that off to the AS's /transactions/{txnID} endpoint. It also
|
||||
// handles exponentially backing off in case the AS isn't currently available.
|
||||
func SetupTransactionWorkers(
|
||||
client *gomatrixserverlib.Client,
|
||||
appserviceDB storage.Database,
|
||||
workerStates []types.ApplicationServiceWorkerState,
|
||||
) error {
|
||||
|
@ -51,7 +50,7 @@ func SetupTransactionWorkers(
|
|||
for _, workerState := range workerStates {
|
||||
// Don't create a worker if this AS doesn't want to receive events
|
||||
if workerState.AppService.URL != "" {
|
||||
go worker(appserviceDB, workerState)
|
||||
go worker(client, appserviceDB, workerState)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -59,17 +58,12 @@ func SetupTransactionWorkers(
|
|||
|
||||
// worker is a goroutine that sends any queued events to the application service
|
||||
// it is given.
|
||||
func worker(db storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||
func worker(client *gomatrixserverlib.Client, db storage.Database, ws types.ApplicationServiceWorkerState) {
|
||||
log.WithFields(log.Fields{
|
||||
"appservice": ws.AppService.ID,
|
||||
}).Info("Starting application service")
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a HTTP client for sending requests to app services
|
||||
client := &http.Client{
|
||||
Timeout: transactionTimeout,
|
||||
}
|
||||
|
||||
// Initial check for any leftover events to send from last time
|
||||
eventCount, err := db.CountEventsWithAppServiceID(ctx, ws.AppService.ID)
|
||||
if err != nil {
|
||||
|
@ -206,7 +200,7 @@ func createTransaction(
|
|||
// send sends events to an application service. Returns an error if an OK was not
|
||||
// received back from the application service or the request timed out.
|
||||
func send(
|
||||
client *http.Client,
|
||||
client *gomatrixserverlib.Client,
|
||||
appservice config.ApplicationService,
|
||||
txnID int,
|
||||
transaction []byte,
|
||||
|
@ -219,7 +213,7 @@ func send(
|
|||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := client.Do(req)
|
||||
resp, err := client.DoHTTPRequest(context.TODO(), req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue