mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 23:48:27 +00:00
some more stuff
This commit is contained in:
parent
77f912721a
commit
087efc0e34
5 changed files with 145 additions and 2 deletions
|
@ -2,6 +2,7 @@ package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -61,6 +62,12 @@ type FederationInternalAPI interface {
|
||||||
response *QueryJoinedHostServerNamesInRoomResponse,
|
response *QueryJoinedHostServerNamesInRoomResponse,
|
||||||
) error
|
) error
|
||||||
|
|
||||||
|
QueryEventsFromFederation(
|
||||||
|
ctx context.Context,
|
||||||
|
request *QueryEventsFromFederationRequest,
|
||||||
|
response *QueryEventsFromFederationResponse,
|
||||||
|
) error
|
||||||
|
|
||||||
QueryEventAuthFromFederation(
|
QueryEventAuthFromFederation(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *QueryEventAuthFromFederationRequest,
|
request *QueryEventAuthFromFederationRequest,
|
||||||
|
@ -213,6 +220,15 @@ type QueryJoinedHostServerNamesInRoomResponse struct {
|
||||||
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
|
ServerNames []gomatrixserverlib.ServerName `json:"server_names"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type QueryEventsFromFederationRequest struct {
|
||||||
|
RoomID string `json:"room_id"`
|
||||||
|
EventIDs []string `json:"event_ids"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type QueryEventsFromFederationResponse struct {
|
||||||
|
Events []json.RawMessage `json:"events"`
|
||||||
|
}
|
||||||
|
|
||||||
type QueryEventAuthFromFederationRequest struct {
|
type QueryEventAuthFromFederationRequest struct {
|
||||||
RoomID string `json:"room_id"`
|
RoomID string `json:"room_id"`
|
||||||
EventID string `json:"event_id"`
|
EventID string `json:"event_id"`
|
||||||
|
|
|
@ -25,6 +25,47 @@ func (f *FederationInternalAPI) QueryJoinedHostServerNamesInRoom(
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *FederationInternalAPI) QueryEventsFromFederation(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.QueryEventsFromFederationRequest,
|
||||||
|
response *api.QueryEventsFromFederationResponse,
|
||||||
|
) error {
|
||||||
|
joinedHosts, err := f.db.GetJoinedHostsForRooms(ctx, []string{request.RoomID})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("f.db.GetJoinedHostsForRooms: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tryHost := func(serverName gomatrixserverlib.ServerName, eventID string) error {
|
||||||
|
reqctx, cancel := context.WithTimeout(ctx, time.Second*30)
|
||||||
|
defer cancel()
|
||||||
|
ires, err := f.doRequest(serverName, func() (interface{}, error) {
|
||||||
|
return f.federation.GetEvent(
|
||||||
|
reqctx,
|
||||||
|
serverName,
|
||||||
|
eventID,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("f.doRequest: %w", err)
|
||||||
|
}
|
||||||
|
tx := ires.(gomatrixserverlib.Transaction)
|
||||||
|
response.Events = append(response.Events, tx.PDUs...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var lasterr error
|
||||||
|
for _, eventID := range request.EventIDs {
|
||||||
|
for _, host := range joinedHosts {
|
||||||
|
if lasterr = tryHost(host, eventID); lasterr != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return lasterr
|
||||||
|
}
|
||||||
|
|
||||||
func (f *FederationInternalAPI) QueryEventAuthFromFederation(
|
func (f *FederationInternalAPI) QueryEventAuthFromFederation(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
request *api.QueryEventAuthFromFederationRequest,
|
request *api.QueryEventAuthFromFederationRequest,
|
||||||
|
|
|
@ -17,6 +17,7 @@ import (
|
||||||
const (
|
const (
|
||||||
FederationAPIQueryJoinedHostServerNamesInRoomPath = "/federationapi/queryJoinedHostServerNamesInRoom"
|
FederationAPIQueryJoinedHostServerNamesInRoomPath = "/federationapi/queryJoinedHostServerNamesInRoom"
|
||||||
FederationAPIQueryServerKeysPath = "/federationapi/queryServerKeys"
|
FederationAPIQueryServerKeysPath = "/federationapi/queryServerKeys"
|
||||||
|
FederationAPIQueryEventsFromFederationPath = "/federationapi/queryEventsFromFederation"
|
||||||
FederationAPIQueryEventAuthFromFederationPath = "/federationapi/queryEventAuthFromFederation"
|
FederationAPIQueryEventAuthFromFederationPath = "/federationapi/queryEventAuthFromFederation"
|
||||||
FederationAPIQueryStateIDsFromFederationPath = "/federationapi/queryStateIDsFromFederation"
|
FederationAPIQueryStateIDsFromFederationPath = "/federationapi/queryStateIDsFromFederation"
|
||||||
FederationAPIQueryStateFromFederationPath = "/federationapi/queryStateFromFederation"
|
FederationAPIQueryStateFromFederationPath = "/federationapi/queryStateFromFederation"
|
||||||
|
@ -123,6 +124,19 @@ func (h *httpFederationInternalAPI) QueryJoinedHostServerNamesInRoom(
|
||||||
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueryEventAuthFromFederation implements FederationInternalAPI
|
||||||
|
func (h *httpFederationInternalAPI) QueryEventsFromFederation(
|
||||||
|
ctx context.Context,
|
||||||
|
request *api.QueryEventsFromFederationRequest,
|
||||||
|
response *api.QueryEventsFromFederationResponse,
|
||||||
|
) error {
|
||||||
|
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryEventsFromFederation")
|
||||||
|
defer span.Finish()
|
||||||
|
|
||||||
|
apiURL := h.federationAPIURL + FederationAPIQueryEventsFromFederationPath
|
||||||
|
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
|
||||||
|
}
|
||||||
|
|
||||||
// QueryEventAuthFromFederation implements FederationInternalAPI
|
// QueryEventAuthFromFederation implements FederationInternalAPI
|
||||||
func (h *httpFederationInternalAPI) QueryEventAuthFromFederation(
|
func (h *httpFederationInternalAPI) QueryEventAuthFromFederation(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
|
|
|
@ -27,6 +27,20 @@ func AddRoutes(intAPI api.FederationInternalAPI, internalAPIMux *mux.Router) {
|
||||||
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
internalAPIMux.Handle(
|
||||||
|
FederationAPIQueryEventAuthFromFederationPath,
|
||||||
|
httputil.MakeInternalAPI("QueryEventsFromFederation", func(req *http.Request) util.JSONResponse {
|
||||||
|
var request api.QueryEventsFromFederationRequest
|
||||||
|
var response api.QueryEventsFromFederationResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
if err := intAPI.QueryEventsFromFederation(req.Context(), &request, &response); err != nil {
|
||||||
|
return util.ErrorResponse(err)
|
||||||
|
}
|
||||||
|
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
|
||||||
|
}),
|
||||||
|
)
|
||||||
internalAPIMux.Handle(
|
internalAPIMux.Handle(
|
||||||
FederationAPIQueryEventAuthFromFederationPath,
|
FederationAPIQueryEventAuthFromFederationPath,
|
||||||
httputil.MakeInternalAPI("QueryEventAuthFromFederation", func(req *http.Request) util.JSONResponse {
|
httputil.MakeInternalAPI("QueryEventAuthFromFederation", func(req *http.Request) util.JSONResponse {
|
||||||
|
|
|
@ -125,7 +125,7 @@ func (r *Inputer) processRoomEvent(
|
||||||
|
|
||||||
// Then check if the prev events are known, which we need in order
|
// Then check if the prev events are known, which we need in order
|
||||||
// to calculate the state before the event.
|
// to calculate the state before the event.
|
||||||
if err := r.checkForMissingPrevEvents(ctx, input); err != nil {
|
if err := r.checkForMissingPrevEvents(ctx, input.Event); err != nil {
|
||||||
return "", fmt.Errorf("r.checkForMissingPrevEvents: %w", err)
|
return "", fmt.Errorf("r.checkForMissingPrevEvents: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,6 +299,14 @@ func (r *Inputer) checkForMissingAuthEvents(
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the signatures of the event.
|
||||||
|
// TODO: It really makes sense for the federation API to be doing this,
|
||||||
|
// because then it can attempt another server if one serves up an event
|
||||||
|
// with an invalid signature. For now this will do.
|
||||||
|
if err := event.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
|
||||||
|
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Otherwise, we need to store, and that means we need to know the
|
// Otherwise, we need to store, and that means we need to know the
|
||||||
// auth event NIDs. Let's see if we can find those.
|
// auth event NIDs. Let's see if we can find those.
|
||||||
authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs()))
|
authEventNIDs := make([]types.EventNID, 0, len(event.AuthEventIDs()))
|
||||||
|
@ -342,8 +350,58 @@ func (r *Inputer) checkForMissingAuthEvents(
|
||||||
|
|
||||||
func (r *Inputer) checkForMissingPrevEvents(
|
func (r *Inputer) checkForMissingPrevEvents(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
input *api.InputRoomEvent,
|
event *gomatrixserverlib.HeaderedEvent,
|
||||||
) error {
|
) error {
|
||||||
|
// First of all, determine if we know of the prev events.
|
||||||
|
prevEventIDs := event.PrevEventIDs()
|
||||||
|
prevEventNIDs, err := r.DB.EventNIDs(ctx, prevEventIDs)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("r.DB.EventNIDs: %w", err)
|
||||||
|
}
|
||||||
|
missingPrevEvents := make([]string, 0, len(prevEventIDs))
|
||||||
|
for _, eventID := range prevEventIDs {
|
||||||
|
if _, ok := prevEventNIDs[eventID]; !ok {
|
||||||
|
missingPrevEvents = append(prevEventIDs, eventID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there are no missing prev events then we're all happy and there
|
||||||
|
// is nothing more to do here.
|
||||||
|
if len(missingPrevEvents) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("there are %d missing prev events", len(missingPrevEvents))
|
||||||
|
|
||||||
|
req := &fedapi.QueryEventsFromFederationRequest{
|
||||||
|
RoomID: event.RoomID(),
|
||||||
|
EventIDs: missingPrevEvents,
|
||||||
|
}
|
||||||
|
res := &fedapi.QueryEventsFromFederationResponse{}
|
||||||
|
if err := r.FSAPI.QueryEventsFromFederation(ctx, req, res); err != nil {
|
||||||
|
return fmt.Errorf("r.FSAPI.QueryEventsFromFederation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, js := range res.Events {
|
||||||
|
ev, err := gomatrixserverlib.NewEventFromUntrustedJSON(js, event.RoomVersion)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("gomatrixserverlib.NewEventFromUntrustedJSON: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ev.VerifyEventSignatures(ctx, r.FSAPI.KeyRing()); err != nil {
|
||||||
|
return fmt.Errorf("event.VerifyEventSignatures: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &fedapi.QueryStateIDsFromFederationRequest{
|
||||||
|
RoomID: ev.RoomID(),
|
||||||
|
EventID: ev.EventID(),
|
||||||
|
}
|
||||||
|
res := &fedapi.QueryStateIDsFromFederationResponse{}
|
||||||
|
if err := r.FSAPI.QueryStateIDsFromFederation(ctx, req, res); err != nil {
|
||||||
|
return fmt.Errorf("r.FSAPI.QueryStateIDsFromFederation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue