2017-09-08 14:17:12 +00:00
|
|
|
// Copyright 2017 Vector Creations Ltd
|
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2017-10-11 17:16:53 +00:00
|
|
|
package routing
|
2017-06-07 13:32:53 +00:00
|
|
|
|
|
|
|
import (
|
2017-09-13 10:03:41 +00:00
|
|
|
"context"
|
2017-06-07 13:32:53 +00:00
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
2017-08-23 14:13:47 +00:00
|
|
|
"net/http"
|
2020-09-28 10:32:59 +00:00
|
|
|
"sync"
|
2020-09-07 11:32:40 +00:00
|
|
|
"time"
|
2017-08-23 14:13:47 +00:00
|
|
|
|
2017-06-07 13:32:53 +00:00
|
|
|
"github.com/matrix-org/dendrite/clientapi/jsonerror"
|
2020-06-10 11:17:54 +00:00
|
|
|
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
|
2021-06-30 11:05:58 +00:00
|
|
|
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
|
2021-03-30 09:01:32 +00:00
|
|
|
"github.com/matrix-org/dendrite/internal"
|
2020-08-05 12:41:16 +00:00
|
|
|
keyapi "github.com/matrix-org/dendrite/keyserver/api"
|
2017-06-07 13:32:53 +00:00
|
|
|
"github.com/matrix-org/dendrite/roomserver/api"
|
2020-12-02 17:41:00 +00:00
|
|
|
"github.com/matrix-org/dendrite/setup/config"
|
2017-06-07 13:32:53 +00:00
|
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
|
|
"github.com/matrix-org/util"
|
2021-03-23 11:33:36 +00:00
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
2020-05-05 14:48:37 +00:00
|
|
|
"github.com/sirupsen/logrus"
|
2021-07-02 11:33:27 +00:00
|
|
|
"go.uber.org/atomic"
|
2017-06-07 13:32:53 +00:00
|
|
|
)
|
|
|
|
|
2021-03-23 15:22:00 +00:00
|
|
|
const (
|
|
|
|
// Event was passed to the roomserver
|
|
|
|
MetricsOutcomeOK = "ok"
|
|
|
|
// Event failed to be processed
|
|
|
|
MetricsOutcomeFail = "fail"
|
|
|
|
// Event failed auth checks
|
|
|
|
MetricsOutcomeRejected = "rejected"
|
|
|
|
// Terminated the transaction
|
|
|
|
MetricsOutcomeFatal = "fatal"
|
|
|
|
// The event has missing auth_events we need to fetch
|
|
|
|
MetricsWorkMissingAuthEvents = "missing_auth_events"
|
|
|
|
// No work had to be done as we had all prev/auth events
|
|
|
|
MetricsWorkDirect = "direct"
|
|
|
|
// The event has missing prev_events we need to call /g_m_e for
|
|
|
|
MetricsWorkMissingPrevEvents = "missing_prev_events"
|
|
|
|
)
|
|
|
|
|
2021-03-23 11:33:36 +00:00
|
|
|
var (
|
|
|
|
pduCountTotal = prometheus.NewCounterVec(
|
|
|
|
prometheus.CounterOpts{
|
|
|
|
Namespace: "dendrite",
|
|
|
|
Subsystem: "federationapi",
|
|
|
|
Name: "recv_pdus",
|
2021-03-23 15:22:00 +00:00
|
|
|
Help: "Number of incoming PDUs from remote servers with labels for success",
|
2021-03-23 11:33:36 +00:00
|
|
|
},
|
2021-03-23 15:22:00 +00:00
|
|
|
[]string{"status"}, // 'success' or 'total'
|
2021-03-23 11:33:36 +00:00
|
|
|
)
|
|
|
|
eduCountTotal = prometheus.NewCounter(
|
|
|
|
prometheus.CounterOpts{
|
|
|
|
Namespace: "dendrite",
|
|
|
|
Subsystem: "federationapi",
|
|
|
|
Name: "recv_edus",
|
2021-03-23 15:22:00 +00:00
|
|
|
Help: "Number of incoming EDUs from remote servers",
|
|
|
|
},
|
|
|
|
)
|
|
|
|
processEventSummary = prometheus.NewSummaryVec(
|
|
|
|
prometheus.SummaryOpts{
|
|
|
|
Namespace: "dendrite",
|
|
|
|
Subsystem: "federationapi",
|
|
|
|
Name: "process_event",
|
|
|
|
Help: "How long it takes to process an incoming event and what work had to be done for it",
|
2021-03-23 11:33:36 +00:00
|
|
|
},
|
2021-03-23 15:22:00 +00:00
|
|
|
[]string{"work", "outcome"},
|
2021-03-23 11:33:36 +00:00
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
prometheus.MustRegister(
|
2021-03-23 15:22:00 +00:00
|
|
|
pduCountTotal, eduCountTotal, processEventSummary,
|
2021-03-23 11:33:36 +00:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2021-07-02 11:33:27 +00:00
|
|
|
type sendFIFOQueue struct {
|
|
|
|
tasks []*inputTask
|
|
|
|
count int
|
|
|
|
mutex sync.Mutex
|
|
|
|
notifs chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func newSendFIFOQueue() *sendFIFOQueue {
|
|
|
|
q := &sendFIFOQueue{
|
|
|
|
notifs: make(chan struct{}, 1),
|
|
|
|
}
|
|
|
|
return q
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *sendFIFOQueue) push(frame *inputTask) {
|
|
|
|
q.mutex.Lock()
|
|
|
|
defer q.mutex.Unlock()
|
|
|
|
q.tasks = append(q.tasks, frame)
|
|
|
|
q.count++
|
|
|
|
select {
|
|
|
|
case q.notifs <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// pop returns the first item of the queue, if there is one.
|
|
|
|
// The second return value will indicate if a task was returned.
|
|
|
|
func (q *sendFIFOQueue) pop() (*inputTask, bool) {
|
|
|
|
q.mutex.Lock()
|
|
|
|
defer q.mutex.Unlock()
|
|
|
|
if q.count == 0 {
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
frame := q.tasks[0]
|
|
|
|
q.tasks[0] = nil
|
|
|
|
q.tasks = q.tasks[1:]
|
|
|
|
q.count--
|
|
|
|
if q.count == 0 {
|
|
|
|
// Force a GC of the underlying array, since it might have
|
|
|
|
// grown significantly if the queue was hammered for some reason
|
|
|
|
q.tasks = nil
|
|
|
|
}
|
|
|
|
return frame, true
|
|
|
|
}
|
|
|
|
|
|
|
|
type inputTask struct {
|
|
|
|
ctx context.Context
|
|
|
|
t *txnReq
|
2021-12-15 15:45:53 +00:00
|
|
|
event *gomatrixserverlib.HeaderedEvent
|
2021-07-02 11:33:27 +00:00
|
|
|
wg *sync.WaitGroup
|
|
|
|
err error // written back by worker, only safe to read when all tasks are done
|
|
|
|
duration time.Duration // written back by worker, only safe to read when all tasks are done
|
|
|
|
}
|
|
|
|
|
|
|
|
type inputWorker struct {
|
|
|
|
running atomic.Bool
|
|
|
|
input *sendFIFOQueue
|
|
|
|
}
|
|
|
|
|
2021-11-08 09:24:16 +00:00
|
|
|
var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
|
|
|
|
var inputWorkers sync.Map // room ID -> *inputWorker
|
2021-07-02 11:33:27 +00:00
|
|
|
|
2017-06-07 13:32:53 +00:00
|
|
|
// Send implements /_matrix/federation/v1/send/{txnID}
|
|
|
|
func Send(
|
2017-09-04 12:14:01 +00:00
|
|
|
httpReq *http.Request,
|
|
|
|
request *gomatrixserverlib.FederationRequest,
|
2017-06-07 13:32:53 +00:00
|
|
|
txnID gomatrixserverlib.TransactionID,
|
2020-08-10 13:18:04 +00:00
|
|
|
cfg *config.FederationAPI,
|
2020-05-01 09:48:17 +00:00
|
|
|
rsAPI api.RoomserverInternalAPI,
|
2020-06-10 11:17:54 +00:00
|
|
|
eduAPI eduserverAPI.EDUServerInputAPI,
|
2020-08-05 12:41:16 +00:00
|
|
|
keyAPI keyapi.KeyInternalAPI,
|
2020-06-15 15:57:59 +00:00
|
|
|
keys gomatrixserverlib.JSONVerifier,
|
2017-06-07 13:32:53 +00:00
|
|
|
federation *gomatrixserverlib.FederationClient,
|
2021-03-30 09:01:32 +00:00
|
|
|
mu *internal.MutexByRoom,
|
2021-06-30 11:05:58 +00:00
|
|
|
servers federationAPI.ServersInRoomProvider,
|
2017-06-07 13:32:53 +00:00
|
|
|
) util.JSONResponse {
|
2021-11-08 09:24:16 +00:00
|
|
|
// First we should check if this origin has already submitted this
|
|
|
|
// txn ID to us. If they have and the txnIDs map contains an entry,
|
|
|
|
// the transaction is still being worked on. The new client can wait
|
|
|
|
// for it to complete rather than creating more work.
|
|
|
|
index := string(request.Origin()) + "\000" + string(txnID)
|
|
|
|
v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1))
|
|
|
|
ch := v.(chan util.JSONResponse)
|
|
|
|
if ok {
|
|
|
|
// This origin already submitted this txn ID to us, and the work
|
|
|
|
// is still taking place, so we'll just wait for it to finish.
|
|
|
|
ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5)
|
|
|
|
defer cancel()
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
// If the caller gives up then return straight away. We don't
|
|
|
|
// want to attempt to process what they sent us any further.
|
|
|
|
return util.JSONResponse{Code: http.StatusRequestTimeout}
|
|
|
|
case res := <-ch:
|
|
|
|
// The original task just finished processing so let's return
|
|
|
|
// the result of it.
|
|
|
|
if res.Code == 0 {
|
|
|
|
return util.JSONResponse{Code: http.StatusAccepted}
|
|
|
|
}
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Otherwise, store that we're currently working on this txn from
|
|
|
|
// this origin. When we're done processing, close the channel.
|
|
|
|
defer close(ch)
|
|
|
|
defer inFlightTxnsPerOrigin.Delete(index)
|
|
|
|
|
2017-06-07 13:32:53 +00:00
|
|
|
t := txnReq{
|
2020-06-10 11:17:54 +00:00
|
|
|
rsAPI: rsAPI,
|
|
|
|
eduAPI: eduAPI,
|
|
|
|
keys: keys,
|
|
|
|
federation: federation,
|
2021-06-30 11:05:58 +00:00
|
|
|
servers: servers,
|
2020-08-05 12:41:16 +00:00
|
|
|
keyAPI: keyAPI,
|
2021-03-30 09:01:32 +00:00
|
|
|
roomsMu: mu,
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
2020-03-27 16:28:22 +00:00
|
|
|
|
|
|
|
var txnEvents struct {
|
2020-03-30 15:40:28 +00:00
|
|
|
PDUs []json.RawMessage `json:"pdus"`
|
|
|
|
EDUs []gomatrixserverlib.EDU `json:"edus"`
|
2020-03-27 16:28:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := json.Unmarshal(request.Content(), &txnEvents); err != nil {
|
2017-06-07 13:32:53 +00:00
|
|
|
return util.JSONResponse{
|
2018-03-13 15:55:45 +00:00
|
|
|
Code: http.StatusBadRequest,
|
2017-08-23 14:13:47 +00:00
|
|
|
JSON: jsonerror.NotJSON("The request body could not be decoded into valid JSON. " + err.Error()),
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
|
|
|
}
|
2020-06-23 12:15:15 +00:00
|
|
|
// Transactions are limited in size; they can have at most 50 PDUs and 100 EDUs.
|
|
|
|
// https://matrix.org/docs/spec/server_server/latest#transactions
|
|
|
|
if len(txnEvents.PDUs) > 50 || len(txnEvents.EDUs) > 100 {
|
|
|
|
return util.JSONResponse{
|
|
|
|
Code: http.StatusBadRequest,
|
|
|
|
JSON: jsonerror.BadJSON("max 50 pdus / 100 edus"),
|
|
|
|
}
|
|
|
|
}
|
2017-06-07 13:32:53 +00:00
|
|
|
|
2020-03-30 15:40:28 +00:00
|
|
|
// TODO: Really we should have a function to convert FederationRequest to txnReq
|
2020-03-27 16:28:22 +00:00
|
|
|
t.PDUs = txnEvents.PDUs
|
2020-03-30 15:40:28 +00:00
|
|
|
t.EDUs = txnEvents.EDUs
|
2017-06-07 13:32:53 +00:00
|
|
|
t.Origin = request.Origin()
|
|
|
|
t.TransactionID = txnID
|
2017-06-19 14:21:04 +00:00
|
|
|
t.Destination = cfg.Matrix.ServerName
|
2017-06-07 13:32:53 +00:00
|
|
|
|
2020-10-02 10:38:35 +00:00
|
|
|
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
|
2020-03-27 16:28:22 +00:00
|
|
|
|
2021-11-08 09:24:16 +00:00
|
|
|
resp, jsonErr := t.processTransaction(context.Background())
|
2020-06-23 12:15:15 +00:00
|
|
|
if jsonErr != nil {
|
|
|
|
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
|
|
|
|
return *jsonErr
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
2020-05-13 12:01:45 +00:00
|
|
|
|
|
|
|
// https://matrix.org/docs/spec/server_server/r0.1.3#put-matrix-federation-v1-send-txnid
|
|
|
|
// Status code 200:
|
|
|
|
// The result of processing the transaction. The server is to use this response
|
|
|
|
// even in the event of one or more PDUs failing to be processed.
|
2021-11-08 09:24:16 +00:00
|
|
|
res := util.JSONResponse{
|
2020-05-13 12:01:45 +00:00
|
|
|
Code: http.StatusOK,
|
|
|
|
JSON: resp,
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
2021-11-08 09:24:16 +00:00
|
|
|
ch <- res
|
|
|
|
return res
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type txnReq struct {
|
|
|
|
gomatrixserverlib.Transaction
|
2021-06-30 11:05:58 +00:00
|
|
|
rsAPI api.RoomserverInternalAPI
|
|
|
|
eduAPI eduserverAPI.EDUServerInputAPI
|
|
|
|
keyAPI keyapi.KeyInternalAPI
|
|
|
|
keys gomatrixserverlib.JSONVerifier
|
|
|
|
federation txnFederationClient
|
|
|
|
roomsMu *internal.MutexByRoom
|
2021-12-15 13:59:58 +00:00
|
|
|
servers federationAPI.ServersInRoomProvider
|
|
|
|
work string
|
2021-07-07 17:55:44 +00:00
|
|
|
}
|
|
|
|
|
2020-05-06 13:27:02 +00:00
|
|
|
// A subset of FederationClient functionality that txn requires. Useful for testing.
|
|
|
|
type txnFederationClient interface {
|
|
|
|
LookupState(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
|
|
|
|
res gomatrixserverlib.RespState, err error,
|
|
|
|
)
|
|
|
|
LookupStateIDs(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, eventID string) (res gomatrixserverlib.RespStateIDs, err error)
|
|
|
|
GetEvent(ctx context.Context, s gomatrixserverlib.ServerName, eventID string) (res gomatrixserverlib.Transaction, err error)
|
2020-05-12 15:24:28 +00:00
|
|
|
LookupMissingEvents(ctx context.Context, s gomatrixserverlib.ServerName, roomID string, missing gomatrixserverlib.MissingEvents,
|
|
|
|
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
|
|
|
|
2020-09-07 11:32:40 +00:00
|
|
|
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
|
2020-04-16 16:59:55 +00:00
|
|
|
results := make(map[string]gomatrixserverlib.PDUResult)
|
2021-07-02 11:33:27 +00:00
|
|
|
var wg sync.WaitGroup
|
|
|
|
var tasks []*inputTask
|
2020-04-16 16:59:55 +00:00
|
|
|
|
2020-03-27 16:28:22 +00:00
|
|
|
for _, pdu := range t.PDUs {
|
2021-03-23 11:33:36 +00:00
|
|
|
pduCountTotal.WithLabelValues("total").Inc()
|
2020-03-27 16:28:22 +00:00
|
|
|
var header struct {
|
|
|
|
RoomID string `json:"room_id"`
|
|
|
|
}
|
|
|
|
if err := json.Unmarshal(pdu, &header); err != nil {
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(err).Warn("Transaction: Failed to extract room ID from event")
|
2020-05-13 12:01:45 +00:00
|
|
|
// We don't know the event ID at this point so we can't return the
|
|
|
|
// failure in the PDU results
|
|
|
|
continue
|
2020-03-27 16:28:22 +00:00
|
|
|
}
|
|
|
|
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID}
|
|
|
|
verRes := api.QueryRoomVersionForRoomResponse{}
|
2020-09-07 11:32:40 +00:00
|
|
|
if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID)
|
2020-05-13 12:01:45 +00:00
|
|
|
// We don't know the event ID at this point so we can't return the
|
|
|
|
// failure in the PDU results
|
|
|
|
continue
|
2020-03-27 16:28:22 +00:00
|
|
|
}
|
|
|
|
event, err := gomatrixserverlib.NewEventFromUntrustedJSON(pdu, verRes.RoomVersion)
|
|
|
|
if err != nil {
|
2020-06-23 12:15:15 +00:00
|
|
|
if _, ok := err.(gomatrixserverlib.BadJSONError); ok {
|
|
|
|
// Room version 6 states that homeservers should strictly enforce canonical JSON
|
|
|
|
// on PDUs.
|
|
|
|
//
|
|
|
|
// This enforces that the entire transaction is rejected if a single bad PDU is
|
|
|
|
// sent. It is unclear if this is the correct behaviour or not.
|
|
|
|
//
|
|
|
|
// See https://github.com/matrix-org/synapse/issues/7543
|
|
|
|
return nil, &util.JSONResponse{
|
|
|
|
Code: 400,
|
|
|
|
JSON: jsonerror.BadJSON("PDU contains bad JSON"),
|
|
|
|
}
|
2020-05-13 12:01:45 +00:00
|
|
|
}
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %s", string(pdu))
|
2020-05-13 12:01:45 +00:00
|
|
|
continue
|
2020-03-27 16:28:22 +00:00
|
|
|
}
|
2020-09-07 11:32:40 +00:00
|
|
|
if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
|
2020-08-11 17:19:11 +00:00
|
|
|
results[event.EventID()] = gomatrixserverlib.PDUResult{
|
|
|
|
Error: "Forbidden by server ACLs",
|
|
|
|
}
|
|
|
|
continue
|
|
|
|
}
|
2021-11-02 10:13:38 +00:00
|
|
|
if err = event.VerifyEventSignatures(ctx, t.keys); err != nil {
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
|
2020-05-13 12:01:45 +00:00
|
|
|
results[event.EventID()] = gomatrixserverlib.PDUResult{
|
|
|
|
Error: err.Error(),
|
|
|
|
}
|
|
|
|
continue
|
2020-03-27 16:28:22 +00:00
|
|
|
}
|
2021-07-02 11:33:27 +00:00
|
|
|
v, _ := inputWorkers.LoadOrStore(event.RoomID(), &inputWorker{
|
|
|
|
input: newSendFIFOQueue(),
|
|
|
|
})
|
|
|
|
worker := v.(*inputWorker)
|
|
|
|
wg.Add(1)
|
|
|
|
task := &inputTask{
|
|
|
|
ctx: ctx,
|
|
|
|
t: t,
|
2021-12-15 15:45:53 +00:00
|
|
|
event: event.Headered(verRes.RoomVersion),
|
2021-07-02 11:33:27 +00:00
|
|
|
wg: &wg,
|
|
|
|
}
|
|
|
|
tasks = append(tasks, task)
|
|
|
|
worker.input.push(task)
|
2021-07-05 11:14:31 +00:00
|
|
|
if worker.running.CAS(false, true) {
|
|
|
|
go worker.run()
|
|
|
|
}
|
2021-07-02 11:33:27 +00:00
|
|
|
}
|
|
|
|
|
2021-07-05 11:14:31 +00:00
|
|
|
t.processEDUs(ctx)
|
2021-07-02 11:33:27 +00:00
|
|
|
wg.Wait()
|
|
|
|
|
|
|
|
for _, task := range tasks {
|
|
|
|
if task.err != nil {
|
|
|
|
results[task.event.EventID()] = gomatrixserverlib.PDUResult{
|
2022-01-05 17:44:49 +00:00
|
|
|
// Error: task.err.Error(), TODO: this upsets tests if uncommented
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
|
|
|
} else {
|
2021-07-02 11:33:27 +00:00
|
|
|
results[task.event.EventID()] = gomatrixserverlib.PDUResult{}
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-07 14:00:23 +00:00
|
|
|
if c := len(results); c > 0 {
|
2021-09-07 14:07:14 +00:00
|
|
|
util.GetLogger(ctx).Infof("Processed %d PDUs from %v in transaction %q", c, t.Origin, t.TransactionID)
|
2020-08-07 14:00:23 +00:00
|
|
|
}
|
2017-06-07 13:32:53 +00:00
|
|
|
return &gomatrixserverlib.RespSend{PDUs: results}, nil
|
|
|
|
}
|
|
|
|
|
2021-07-02 11:33:27 +00:00
|
|
|
func (t *inputWorker) run() {
|
|
|
|
defer t.running.Store(false)
|
|
|
|
for {
|
|
|
|
task, ok := t.input.pop()
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if task == nil {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
func() {
|
|
|
|
defer task.wg.Done()
|
|
|
|
select {
|
|
|
|
case <-task.ctx.Done():
|
|
|
|
task.err = context.DeadlineExceeded
|
2021-07-05 12:47:37 +00:00
|
|
|
pduCountTotal.WithLabelValues("expired").Inc()
|
2021-07-02 11:33:27 +00:00
|
|
|
return
|
|
|
|
default:
|
|
|
|
evStart := time.Now()
|
2021-07-05 11:14:31 +00:00
|
|
|
// TODO: Is 5 minutes too long?
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
|
|
|
|
task.err = task.t.processEvent(ctx, task.event)
|
|
|
|
cancel()
|
2021-07-02 11:33:27 +00:00
|
|
|
task.duration = time.Since(evStart)
|
|
|
|
if err := task.err; err != nil {
|
|
|
|
switch err.(type) {
|
|
|
|
case *gomatrixserverlib.NotAllowed:
|
|
|
|
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeRejected).Observe(
|
|
|
|
float64(time.Since(evStart).Nanoseconds()) / 1000.,
|
|
|
|
)
|
|
|
|
util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", true).Warn(
|
|
|
|
"Failed to process incoming federation event, skipping",
|
|
|
|
)
|
|
|
|
task.err = nil // make "rejected" failures silent
|
|
|
|
default:
|
|
|
|
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeFail).Observe(
|
|
|
|
float64(time.Since(evStart).Nanoseconds()) / 1000.,
|
|
|
|
)
|
|
|
|
util.GetLogger(task.ctx).WithError(err).WithField("event_id", task.event.EventID()).WithField("rejected", false).Warn(
|
|
|
|
"Failed to process incoming federation event, skipping",
|
|
|
|
)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
pduCountTotal.WithLabelValues("success").Inc()
|
|
|
|
processEventSummary.WithLabelValues(task.t.work, MetricsOutcomeOK).Observe(
|
|
|
|
float64(time.Since(evStart).Nanoseconds()) / 1000.,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
2020-05-13 12:01:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-07 11:32:40 +00:00
|
|
|
func (t *txnReq) processEDUs(ctx context.Context) {
|
|
|
|
for _, e := range t.EDUs {
|
2021-03-23 11:33:36 +00:00
|
|
|
eduCountTotal.Inc()
|
2020-03-30 15:40:28 +00:00
|
|
|
switch e.Type {
|
|
|
|
case gomatrixserverlib.MTyping:
|
|
|
|
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
|
|
|
|
var typingPayload struct {
|
|
|
|
RoomID string `json:"room_id"`
|
|
|
|
UserID string `json:"user_id"`
|
|
|
|
Typing bool `json:"typing"`
|
|
|
|
}
|
|
|
|
if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event")
|
2020-03-30 15:40:28 +00:00
|
|
|
continue
|
|
|
|
}
|
2020-10-14 15:49:25 +00:00
|
|
|
_, domain, err := gomatrixserverlib.SplitID('@', typingPayload.UserID)
|
|
|
|
if err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to split domain from typing event sender")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if domain != t.Origin {
|
|
|
|
util.GetLogger(ctx).Warnf("Dropping typing event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
|
|
|
|
continue
|
|
|
|
}
|
2020-09-07 11:32:40 +00:00
|
|
|
if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
|
2020-03-30 15:40:28 +00:00
|
|
|
}
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
case gomatrixserverlib.MDirectToDevice:
|
|
|
|
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
|
|
|
|
var directPayload gomatrixserverlib.ToDeviceMessage
|
|
|
|
if err := json.Unmarshal(e.Content, &directPayload); err != nil {
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal send-to-device events")
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
for userID, byUser := range directPayload.Messages {
|
|
|
|
for deviceID, message := range byUser {
|
|
|
|
// TODO: check that the user and the device actually exist here
|
2020-09-07 11:32:40 +00:00
|
|
|
if err := eduserverAPI.SendToDevice(ctx, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
|
|
|
"sender": directPayload.Sender,
|
|
|
|
"user_id": userID,
|
|
|
|
"device_id": deviceID,
|
|
|
|
}).Error("Failed to send send-to-device event to edu server")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-08-05 12:41:16 +00:00
|
|
|
case gomatrixserverlib.MDeviceListUpdate:
|
2020-09-07 11:32:40 +00:00
|
|
|
t.processDeviceListUpdate(ctx, e)
|
2020-11-09 18:46:11 +00:00
|
|
|
case gomatrixserverlib.MReceipt:
|
|
|
|
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts
|
|
|
|
payload := map[string]eduserverAPI.FederationReceiptMRead{}
|
|
|
|
|
|
|
|
if err := json.Unmarshal(e.Content, &payload); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal receipt event")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
for roomID, receipt := range payload {
|
|
|
|
for userID, mread := range receipt.User {
|
|
|
|
_, domain, err := gomatrixserverlib.SplitID('@', userID)
|
|
|
|
if err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to split domain from receipt event sender")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if t.Origin != domain {
|
|
|
|
util.GetLogger(ctx).Warnf("Dropping receipt event where sender domain (%q) doesn't match origin (%q)", domain, t.Origin)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if err := t.processReceiptEvent(ctx, userID, roomID, "m.read", mread.Data.TS, mread.EventIDs); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
|
|
|
"sender": t.Origin,
|
|
|
|
"user_id": userID,
|
|
|
|
"room_id": roomID,
|
|
|
|
"events": mread.EventIDs,
|
|
|
|
}).Error("Failed to send receipt event to edu server")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-08-17 12:44:30 +00:00
|
|
|
case eduserverAPI.MSigningKeyUpdate:
|
|
|
|
var updatePayload eduserverAPI.CrossSigningKeyUpdate
|
|
|
|
if err := json.Unmarshal(e.Content, &updatePayload); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
|
|
|
|
"user_id": updatePayload.UserID,
|
|
|
|
}).Error("Failed to send signing key update to edu server")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
inputReq := &eduserverAPI.InputCrossSigningKeyUpdateRequest{
|
|
|
|
CrossSigningKeyUpdate: updatePayload,
|
|
|
|
}
|
|
|
|
inputRes := &eduserverAPI.InputCrossSigningKeyUpdateResponse{}
|
|
|
|
if err := t.eduAPI.InputCrossSigningKeyUpdate(ctx, inputReq, inputRes); err != nil {
|
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal cross-signing update")
|
|
|
|
continue
|
|
|
|
}
|
2020-03-30 15:40:28 +00:00
|
|
|
default:
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
|
2020-03-30 15:40:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-11-09 18:46:11 +00:00
|
|
|
// processReceiptEvent sends receipt events to the edu server
|
|
|
|
func (t *txnReq) processReceiptEvent(ctx context.Context,
|
|
|
|
userID, roomID, receiptType string,
|
|
|
|
timestamp gomatrixserverlib.Timestamp,
|
|
|
|
eventIDs []string,
|
|
|
|
) error {
|
|
|
|
// store every event
|
|
|
|
for _, eventID := range eventIDs {
|
|
|
|
req := eduserverAPI.InputReceiptEventRequest{
|
|
|
|
InputReceiptEvent: eduserverAPI.InputReceiptEvent{
|
|
|
|
UserID: userID,
|
|
|
|
RoomID: roomID,
|
|
|
|
EventID: eventID,
|
|
|
|
Type: receiptType,
|
|
|
|
Timestamp: timestamp,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
resp := eduserverAPI.InputReceiptEventResponse{}
|
|
|
|
if err := t.eduAPI.InputReceiptEvent(ctx, &req, &resp); err != nil {
|
|
|
|
return fmt.Errorf("unable to set receipt event: %w", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-09-07 11:32:40 +00:00
|
|
|
func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverlib.EDU) {
|
2020-08-05 12:41:16 +00:00
|
|
|
var payload gomatrixserverlib.DeviceListUpdateEvent
|
|
|
|
if err := json.Unmarshal(e.Content, &payload); err != nil {
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal device list update event")
|
2020-08-05 12:41:16 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
var inputRes keyapi.InputDeviceListUpdateResponse
|
|
|
|
t.keyAPI.InputDeviceListUpdate(context.Background(), &keyapi.InputDeviceListUpdateRequest{
|
|
|
|
Event: payload,
|
|
|
|
}, &inputRes)
|
|
|
|
if inputRes.Error != nil {
|
2020-09-07 11:32:40 +00:00
|
|
|
util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
|
2020-08-05 12:41:16 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-12-15 15:45:53 +00:00
|
|
|
func (t *txnReq) processEvent(_ context.Context, e *gomatrixserverlib.HeaderedEvent) error {
|
2021-03-23 15:22:00 +00:00
|
|
|
t.work = "" // reset from previous event
|
2017-06-07 13:32:53 +00:00
|
|
|
|
2021-12-15 15:45:53 +00:00
|
|
|
/*
|
|
|
|
// Ask the roomserver if we know about the room and/or if we're joined
|
|
|
|
// to it. If we aren't then we won't bother processing the event.
|
|
|
|
joinedReq := api.QueryServerJoinedToRoomRequest{
|
|
|
|
RoomID: e.RoomID(),
|
|
|
|
}
|
|
|
|
var joinedRes api.QueryServerJoinedToRoomResponse
|
|
|
|
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, &joinedReq, &joinedRes); err != nil {
|
|
|
|
return fmt.Errorf("t.rsAPI.QueryServerJoinedToRoom: %w", err)
|
|
|
|
}
|
2021-07-09 15:36:45 +00:00
|
|
|
|
2021-12-15 15:45:53 +00:00
|
|
|
if !joinedRes.RoomExists || !joinedRes.IsInRoom {
|
|
|
|
// We don't believe we're a member of this room, therefore there's
|
|
|
|
// no point in wasting work trying to figure out what to do with
|
|
|
|
// missing auth or prev events. Drop the event.
|
|
|
|
return roomNotFoundError{e.RoomID()}
|
|
|
|
}
|
2021-07-09 15:36:45 +00:00
|
|
|
|
2021-12-15 15:45:53 +00:00
|
|
|
// Work out if the roomserver knows everything it needs to know to auth
|
|
|
|
// the event. This includes the prev_events and auth_events.
|
|
|
|
// NOTE! This is going to include prev_events that have an empty state
|
|
|
|
// snapshot. This is because we will need to re-request the event, and
|
|
|
|
// it's /state_ids, in order for it to exist in the roomserver correctly
|
|
|
|
// before the roomserver tries to work out
|
|
|
|
stateReq := api.QueryMissingAuthPrevEventsRequest{
|
|
|
|
RoomID: e.RoomID(),
|
|
|
|
AuthEventIDs: nil, //e.AuthEventIDs(),
|
|
|
|
PrevEventIDs: nil, //e.PrevEventIDs(),
|
|
|
|
}
|
|
|
|
var stateResp api.QueryMissingAuthPrevEventsResponse
|
|
|
|
if err := t.rsAPI.QueryMissingAuthPrevEvents(ctx, &stateReq, &stateResp); err != nil {
|
|
|
|
return fmt.Errorf("t.rsAPI.QueryMissingAuthPrevEvents: %w", err)
|
|
|
|
}
|
|
|
|
*/
|
2017-06-07 13:32:53 +00:00
|
|
|
|
2020-09-16 12:00:52 +00:00
|
|
|
// pass the event to the roomserver which will do auth checks
|
|
|
|
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
|
|
|
|
// discarded by the caller of this function
|
2020-09-03 14:22:16 +00:00
|
|
|
return api.SendEvents(
|
2020-09-07 11:32:40 +00:00
|
|
|
context.Background(),
|
|
|
|
t.rsAPI,
|
2020-10-19 13:59:13 +00:00
|
|
|
api.KindNew,
|
2021-12-15 15:45:53 +00:00
|
|
|
[]*gomatrixserverlib.HeaderedEvent{e},
|
2021-12-15 13:59:58 +00:00
|
|
|
t.Origin,
|
2020-03-27 16:28:22 +00:00
|
|
|
api.DoNotSendToOtherServers,
|
|
|
|
nil,
|
2022-01-05 17:44:49 +00:00
|
|
|
false,
|
2020-03-27 16:28:22 +00:00
|
|
|
)
|
2017-06-07 13:32:53 +00:00
|
|
|
}
|