mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 22:22:46 +00:00
Add context.Context to the federation client (#225)
* Add context.Context to the federation client * gb vendor update github.com/matrix-org/gomatrixserverlib
This commit is contained in:
parent
086683459f
commit
029e71828a
17 changed files with 139 additions and 72 deletions
|
@ -66,7 +66,7 @@ func DirectoryRoom(
|
|||
}
|
||||
}
|
||||
} else {
|
||||
resp, err = federation.LookupRoomAlias(domain, roomAlias)
|
||||
resp, err = federation.LookupRoomAlias(req.Context(), domain, roomAlias)
|
||||
if err != nil {
|
||||
switch x := err.(type) {
|
||||
case gomatrix.HTTPError:
|
||||
|
|
|
@ -136,7 +136,7 @@ func (r joinRoomReq) joinRoomByAlias(roomAlias string) util.JSONResponse {
|
|||
func (r joinRoomReq) joinRoomByRemoteAlias(
|
||||
domain gomatrixserverlib.ServerName, roomAlias string,
|
||||
) util.JSONResponse {
|
||||
resp, err := r.federation.LookupRoomAlias(domain, roomAlias)
|
||||
resp, err := r.federation.LookupRoomAlias(r.req.Context(), domain, roomAlias)
|
||||
if err != nil {
|
||||
switch x := err.(type) {
|
||||
case gomatrix.HTTPError:
|
||||
|
@ -226,7 +226,7 @@ func (r joinRoomReq) joinRoomUsingServers(
|
|||
// server was invalid this returns an error.
|
||||
// Otherwise this returns a JSONResponse.
|
||||
func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib.ServerName) (*util.JSONResponse, error) {
|
||||
respMakeJoin, err := r.federation.MakeJoin(server, roomID, r.userID)
|
||||
respMakeJoin, err := r.federation.MakeJoin(r.req.Context(), server, roomID, r.userID)
|
||||
if err != nil {
|
||||
// TODO: Check if the user was not allowed to join the room.
|
||||
return nil, err
|
||||
|
@ -246,12 +246,12 @@ func (r joinRoomReq) joinRoomUsingServer(roomID string, server gomatrixserverlib
|
|||
return &res, nil
|
||||
}
|
||||
|
||||
respSendJoin, err := r.federation.SendJoin(server, event)
|
||||
respSendJoin, err := r.federation.SendJoin(r.req.Context(), server, event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = respSendJoin.Check(r.keyRing, event); err != nil {
|
||||
if err = respSendJoin.Check(r.req.Context(), r.keyRing, event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,9 @@
|
|||
package keydb
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/matrix-org/gomatrixserverlib"
|
||||
)
|
||||
|
||||
|
@ -41,6 +43,7 @@ func NewDatabase(dataSourceName string) (*Database, error) {
|
|||
|
||||
// FetchKeys implements gomatrixserverlib.KeyDatabase
|
||||
func (d *Database) FetchKeys(
|
||||
ctx context.Context,
|
||||
requests map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.Timestamp,
|
||||
) (map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys, error) {
|
||||
return d.statements.bulkSelectServerKeys(requests)
|
||||
|
@ -48,6 +51,7 @@ func (d *Database) FetchKeys(
|
|||
|
||||
// StoreKeys implements gomatrixserverlib.KeyDatabase
|
||||
func (d *Database) StoreKeys(
|
||||
ctx context.Context,
|
||||
keyMap map[gomatrixserverlib.PublicKeyRequest]gomatrixserverlib.ServerKeys,
|
||||
) error {
|
||||
// TODO: Inserting all the keys within a single transaction may
|
||||
|
|
|
@ -76,7 +76,7 @@ func Invite(
|
|||
Message: event.Redact().JSON(),
|
||||
AtTS: event.OriginServerTS(),
|
||||
}}
|
||||
verifyResults, err := keys.VerifyJSONs(verifyRequests)
|
||||
verifyResults, err := keys.VerifyJSONs(httpReq.Context(), verifyRequests)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(httpReq, err)
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package writers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
@ -41,6 +42,7 @@ func Send(
|
|||
) util.JSONResponse {
|
||||
|
||||
t := txnReq{
|
||||
context: httpReq.Context(),
|
||||
query: query,
|
||||
producer: producer,
|
||||
keys: keys,
|
||||
|
@ -70,6 +72,7 @@ func Send(
|
|||
|
||||
type txnReq struct {
|
||||
gomatrixserverlib.Transaction
|
||||
context context.Context
|
||||
query api.RoomserverQueryAPI
|
||||
producer *producers.RoomserverProducer
|
||||
keys gomatrixserverlib.KeyRing
|
||||
|
@ -78,7 +81,7 @@ type txnReq struct {
|
|||
|
||||
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
|
||||
// Check the event signatures
|
||||
if err := gomatrixserverlib.VerifyEventSignatures(t.PDUs, t.keys); err != nil {
|
||||
if err := gomatrixserverlib.VerifyEventSignatures(t.context, t.PDUs, t.keys); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -110,7 +113,9 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, error) {
|
|||
// our server so we should bail processing the transaction entirely.
|
||||
return nil, err
|
||||
}
|
||||
results[e.EventID()] = gomatrixserverlib.PDUResult{err.Error()}
|
||||
results[e.EventID()] = gomatrixserverlib.PDUResult{
|
||||
Error: err.Error(),
|
||||
}
|
||||
} else {
|
||||
results[e.EventID()] = gomatrixserverlib.PDUResult{}
|
||||
}
|
||||
|
@ -197,12 +202,12 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event) error {
|
|||
// need to fallback to /state.
|
||||
// TODO: Attempt to fill in the gap using /get_missing_events
|
||||
// TODO: Attempt to fetch the state using /state_ids and /events
|
||||
state, err := t.federation.LookupState(t.Origin, e.RoomID(), e.EventID())
|
||||
state, err := t.federation.LookupState(t.context, t.Origin, e.RoomID(), e.EventID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Check that the returned state is valid.
|
||||
if err := state.Check(t.keys); err != nil {
|
||||
if err := state.Check(t.context, t.keys); err != nil {
|
||||
return err
|
||||
}
|
||||
// Check that the event is allowed by the state.
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package writers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -63,7 +64,9 @@ func CreateInvitesFrom3PIDInvites(
|
|||
|
||||
evs := []gomatrixserverlib.Event{}
|
||||
for _, inv := range body.Invites {
|
||||
event, err := createInviteFrom3PIDInvite(queryAPI, cfg, inv, federation)
|
||||
event, err := createInviteFrom3PIDInvite(
|
||||
req.Context(), queryAPI, cfg, inv, federation,
|
||||
)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(req, err)
|
||||
}
|
||||
|
@ -139,7 +142,7 @@ func ExchangeThirdPartyInvite(
|
|||
|
||||
// Ask the requesting server to sign the newly created event so we know it
|
||||
// acknowledged it
|
||||
signedEvent, err := federation.SendInvite(request.Origin(), *event)
|
||||
signedEvent, err := federation.SendInvite(httpReq.Context(), request.Origin(), *event)
|
||||
if err != nil {
|
||||
return httputil.LogThenError(httpReq, err)
|
||||
}
|
||||
|
@ -160,8 +163,8 @@ func ExchangeThirdPartyInvite(
|
|||
// Returns an error if there was a problem building the event or fetching the
|
||||
// necessary data to do so.
|
||||
func createInviteFrom3PIDInvite(
|
||||
queryAPI api.RoomserverQueryAPI, cfg config.Dendrite, inv invite,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
ctx context.Context, queryAPI api.RoomserverQueryAPI, cfg config.Dendrite,
|
||||
inv invite, federation *gomatrixserverlib.FederationClient,
|
||||
) (*gomatrixserverlib.Event, error) {
|
||||
// Build the event
|
||||
builder := &gomatrixserverlib.EventBuilder{
|
||||
|
@ -185,7 +188,10 @@ func createInviteFrom3PIDInvite(
|
|||
|
||||
event, err := buildMembershipEvent(builder, queryAPI, cfg)
|
||||
if err == errNotInRoom {
|
||||
return nil, sendToRemoteServer(inv, federation, cfg, *builder)
|
||||
return nil, sendToRemoteServer(ctx, inv, federation, cfg, *builder)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return event, nil
|
||||
|
@ -253,7 +259,8 @@ func buildMembershipEvent(
|
|||
// Returns an error if it couldn't get the server names to reach or if all of
|
||||
// them responded with an error.
|
||||
func sendToRemoteServer(
|
||||
inv invite, federation *gomatrixserverlib.FederationClient, cfg config.Dendrite,
|
||||
ctx context.Context, inv invite,
|
||||
federation *gomatrixserverlib.FederationClient, cfg config.Dendrite,
|
||||
builder gomatrixserverlib.EventBuilder,
|
||||
) (err error) {
|
||||
remoteServers := make([]gomatrixserverlib.ServerName, 2)
|
||||
|
@ -269,7 +276,7 @@ func sendToRemoteServer(
|
|||
}
|
||||
|
||||
for _, server := range remoteServers {
|
||||
err = federation.ExchangeThirdPartyInvite(server, builder)
|
||||
err = federation.ExchangeThirdPartyInvite(ctx, server, builder)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package queue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -65,7 +66,7 @@ func (oq *destinationQueue) backgroundSend() {
|
|||
// TODO: handle retries.
|
||||
// TODO: blacklist uncooperative servers.
|
||||
|
||||
_, err := oq.client.SendTransaction(*t)
|
||||
_, err := oq.client.SendTransaction(context.TODO(), *t)
|
||||
if err != nil {
|
||||
log.WithFields(log.Fields{
|
||||
"destination": oq.destination,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue