Add EDU server wiring

This commit is contained in:
Neil Alexander 2021-07-28 14:06:23 +01:00
parent dcadec88d9
commit 8683f7553f
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
9 changed files with 105 additions and 0 deletions

View file

@ -75,6 +75,18 @@ type InputReceiptEventRequest struct {
// InputReceiptEventResponse is a response to InputReceiptEventRequest
type InputReceiptEventResponse struct{}
type SigningKeyUpdate struct {
MasterKey gomatrixserverlib.CrossSigningKey `json:"master_key"`
SelfSigningKey gomatrixserverlib.CrossSigningKey `json:"cross_signing_key"`
UserID string `json:"user_id"`
}
type InputSigningKeyUpdateRequest struct {
SigningKeyUpdate
}
type InputSigningKeyUpdateResponse struct{}
// EDUServerInputAPI is used to write events to the typing server.
type EDUServerInputAPI interface {
InputTypingEvent(
@ -94,4 +106,10 @@ type EDUServerInputAPI interface {
request *InputReceiptEventRequest,
response *InputReceiptEventResponse,
) error
InputSigningKeyUpdate(
ctx context.Context,
request *InputSigningKeyUpdateRequest,
response *InputSigningKeyUpdateResponse,
) error
}

View file

@ -85,3 +85,9 @@ type FederationReceiptData struct {
Data ReceiptTS `json:"data"`
EventIDs []string `json:"event_ids"`
}
type OutputSigningKeyUpdate struct {
SigningKeyUpdate
Type string `json:"type"`
Timestamp gomatrixserverlib.Timestamp `json:"timestamp"`
}

View file

@ -86,3 +86,20 @@ func SendReceipt(
response := InputReceiptEventResponse{}
return eduAPI.InputReceiptEvent(ctx, &request, &response)
}
func SendSigningKeyUpdate(
ctx context.Context,
eduAPI EDUServerInputAPI,
userID string,
masterKey, selfSigningKey gomatrixserverlib.CrossSigningKey,
) error {
request := InputSigningKeyUpdateRequest{
SigningKeyUpdate: SigningKeyUpdate{
MasterKey: masterKey,
SelfSigningKey: selfSigningKey,
UserID: userID,
},
}
response := InputSigningKeyUpdateResponse{}
return eduAPI.InputSigningKeyUpdate(ctx, &request, &response)
}

View file

@ -52,6 +52,7 @@ func NewInternalAPI(
OutputTypingEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputTypingEvent),
OutputSendToDeviceEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputSendToDeviceEvent),
OutputReceiptEventTopic: cfg.Matrix.Kafka.TopicFor(config.TopicOutputReceiptEvent),
OutputSigningKeyUpdateTopic: cfg.Matrix.Kafka.TopicFor(config),
ServerName: cfg.Matrix.ServerName,
}
}

View file

@ -39,6 +39,8 @@ type EDUServerInputAPI struct {
OutputSendToDeviceEventTopic string
// The kafka topic to output new receipt events to
OutputReceiptEventTopic string
// The kafka topic to output new signing key updates to
OutputSigningKeyUpdateTopic string
// kafka producer
Producer sarama.SyncProducer
// Internal user query API
@ -203,3 +205,28 @@ func (t *EDUServerInputAPI) InputReceiptEvent(
_, _, err = t.Producer.SendMessage(m)
return err
}
// InputSigningKeyUpdate implements api.EDUServerInputAPI
func (t *EDUServerInputAPI) InputSigningKeyUpdate(
ctx context.Context,
request *api.InputSigningKeyUpdateRequest,
response *api.InputSigningKeyUpdateResponse,
) error {
logrus.WithFields(logrus.Fields{}).Infof("Producing to topic '%s'", t.OutputSigningKeyUpdateTopic)
output := &api.OutputSigningKeyUpdate{
SigningKeyUpdate: request.SigningKeyUpdate,
Type: "m.signing_key_update",
Timestamp: gomatrixserverlib.AsTimestamp(time.Now()),
}
js, err := json.Marshal(output)
if err != nil {
return err
}
m := &sarama.ProducerMessage{
Topic: t.OutputSigningKeyUpdateTopic,
Key: sarama.StringEncoder(request.UserID),
Value: sarama.ByteEncoder(js),
}
_, _, err = t.Producer.SendMessage(m)
return err
}

View file

@ -15,6 +15,7 @@ const (
EDUServerInputTypingEventPath = "/eduserver/input"
EDUServerInputSendToDeviceEventPath = "/eduserver/sendToDevice"
EDUServerInputReceiptEventPath = "/eduserver/receipt"
EDUServerInputSigningKeyUpdatePath = "/eduserver/signingKeyUpdate"
)
// NewEDUServerClient creates a EDUServerInputAPI implemented by talking to a HTTP POST API.
@ -68,3 +69,16 @@ func (h *httpEDUServerInputAPI) InputReceiptEvent(
apiURL := h.eduServerURL + EDUServerInputReceiptEventPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}
// InputSigningKeyUpdate implements EDUServerInputAPI
func (h *httpEDUServerInputAPI) InputSigningKeyUpdate(
ctx context.Context,
request *api.InputSigningKeyUpdateRequest,
response *api.InputSigningKeyUpdateResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "InputSigningKeyUpdate")
defer span.Finish()
apiURL := h.eduServerURL + EDUServerInputSigningKeyUpdatePath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -51,4 +51,17 @@ func AddRoutes(t api.EDUServerInputAPI, internalAPIMux *mux.Router) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(EDUServerInputSigningKeyUpdatePath,
httputil.MakeInternalAPI("inputSigningKeyUpdate", func(req *http.Request) util.JSONResponse {
var request api.InputSigningKeyUpdateRequest
var response api.InputSigningKeyUpdateResponse
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := t.InputSigningKeyUpdate(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -84,6 +84,14 @@ func (o *testEDUProducer) InputReceiptEvent(
return nil
}
func (o *testEDUProducer) InputSigningKeyUpdate(
ctx context.Context,
request *eduAPI.InputSigningKeyUpdateRequest,
response *eduAPI.InputSigningKeyUpdateResponse,
) error {
return nil
}
type testRoomserverAPI struct {
api.RoomserverInternalAPITrace
inputRoomEvents []api.InputRoomEvent

View file

@ -10,6 +10,7 @@ const (
TopicOutputRoomEvent = "OutputRoomEvent"
TopicOutputClientData = "OutputClientData"
TopicOutputReceiptEvent = "OutputReceiptEvent"
TopicOutputSigningKeyUpdate = "OutputSigningKeyUpdate"
)
type Kafka struct {