2017-09-08 14:17:12 +00:00
// Copyright 2017 Vector Creations Ltd
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2017-10-11 17:16:53 +00:00
package routing
2017-06-07 13:32:53 +00:00
import (
2017-09-13 10:03:41 +00:00
"context"
2017-06-07 13:32:53 +00:00
"encoding/json"
2021-06-30 09:39:47 +00:00
"errors"
2017-06-07 13:32:53 +00:00
"fmt"
2017-08-23 14:13:47 +00:00
"net/http"
2020-09-28 10:32:59 +00:00
"sync"
2020-09-07 11:32:40 +00:00
"time"
2017-08-23 14:13:47 +00:00
2017-06-07 13:32:53 +00:00
"github.com/matrix-org/dendrite/clientapi/jsonerror"
2020-06-10 11:17:54 +00:00
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
2021-06-30 11:05:58 +00:00
federationAPI "github.com/matrix-org/dendrite/federationapi/api"
2021-03-30 09:01:32 +00:00
"github.com/matrix-org/dendrite/internal"
2020-08-05 12:41:16 +00:00
keyapi "github.com/matrix-org/dendrite/keyserver/api"
2017-06-07 13:32:53 +00:00
"github.com/matrix-org/dendrite/roomserver/api"
2020-12-02 17:41:00 +00:00
"github.com/matrix-org/dendrite/setup/config"
2017-06-07 13:32:53 +00:00
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
2021-03-23 11:33:36 +00:00
"github.com/prometheus/client_golang/prometheus"
2020-05-05 14:48:37 +00:00
"github.com/sirupsen/logrus"
2021-07-02 11:33:27 +00:00
"go.uber.org/atomic"
2017-06-07 13:32:53 +00:00
)
2021-03-23 15:22:00 +00:00
const (
// Event was passed to the roomserver
MetricsOutcomeOK = "ok"
// Event failed to be processed
MetricsOutcomeFail = "fail"
// Event failed auth checks
MetricsOutcomeRejected = "rejected"
// Terminated the transaction
MetricsOutcomeFatal = "fatal"
// The event has missing auth_events we need to fetch
MetricsWorkMissingAuthEvents = "missing_auth_events"
// No work had to be done as we had all prev/auth events
MetricsWorkDirect = "direct"
// The event has missing prev_events we need to call /g_m_e for
MetricsWorkMissingPrevEvents = "missing_prev_events"
)
2021-03-23 11:33:36 +00:00
var (
pduCountTotal = prometheus . NewCounterVec (
prometheus . CounterOpts {
Namespace : "dendrite" ,
Subsystem : "federationapi" ,
Name : "recv_pdus" ,
2021-03-23 15:22:00 +00:00
Help : "Number of incoming PDUs from remote servers with labels for success" ,
2021-03-23 11:33:36 +00:00
} ,
2021-03-23 15:22:00 +00:00
[ ] string { "status" } , // 'success' or 'total'
2021-03-23 11:33:36 +00:00
)
eduCountTotal = prometheus . NewCounter (
prometheus . CounterOpts {
Namespace : "dendrite" ,
Subsystem : "federationapi" ,
Name : "recv_edus" ,
2021-03-23 15:22:00 +00:00
Help : "Number of incoming EDUs from remote servers" ,
} ,
)
processEventSummary = prometheus . NewSummaryVec (
prometheus . SummaryOpts {
Namespace : "dendrite" ,
Subsystem : "federationapi" ,
Name : "process_event" ,
Help : "How long it takes to process an incoming event and what work had to be done for it" ,
2021-03-23 11:33:36 +00:00
} ,
2021-03-23 15:22:00 +00:00
[ ] string { "work" , "outcome" } ,
2021-03-23 11:33:36 +00:00
)
)
func init ( ) {
prometheus . MustRegister (
2021-03-23 15:22:00 +00:00
pduCountTotal , eduCountTotal , processEventSummary ,
2021-03-23 11:33:36 +00:00
)
}
2021-07-02 11:33:27 +00:00
type sendFIFOQueue struct {
tasks [ ] * inputTask
count int
mutex sync . Mutex
notifs chan struct { }
}
func newSendFIFOQueue ( ) * sendFIFOQueue {
q := & sendFIFOQueue {
notifs : make ( chan struct { } , 1 ) ,
}
return q
}
func ( q * sendFIFOQueue ) push ( frame * inputTask ) {
q . mutex . Lock ( )
defer q . mutex . Unlock ( )
q . tasks = append ( q . tasks , frame )
q . count ++
select {
case q . notifs <- struct { } { } :
default :
}
}
// pop returns the first item of the queue, if there is one.
// The second return value will indicate if a task was returned.
func ( q * sendFIFOQueue ) pop ( ) ( * inputTask , bool ) {
q . mutex . Lock ( )
defer q . mutex . Unlock ( )
if q . count == 0 {
return nil , false
}
frame := q . tasks [ 0 ]
q . tasks [ 0 ] = nil
q . tasks = q . tasks [ 1 : ]
q . count --
if q . count == 0 {
// Force a GC of the underlying array, since it might have
// grown significantly if the queue was hammered for some reason
q . tasks = nil
}
return frame , true
}
type inputTask struct {
ctx context . Context
t * txnReq
event * gomatrixserverlib . Event
wg * sync . WaitGroup
err error // written back by worker, only safe to read when all tasks are done
duration time . Duration // written back by worker, only safe to read when all tasks are done
}
type inputWorker struct {
running atomic . Bool
input * sendFIFOQueue
}
var inputWorkers sync . Map // room ID -> *inputWorker
2017-06-07 13:32:53 +00:00
// Send implements /_matrix/federation/v1/send/{txnID}
func Send (
2017-09-04 12:14:01 +00:00
httpReq * http . Request ,
request * gomatrixserverlib . FederationRequest ,
2017-06-07 13:32:53 +00:00
txnID gomatrixserverlib . TransactionID ,
2020-08-10 13:18:04 +00:00
cfg * config . FederationAPI ,
2020-05-01 09:48:17 +00:00
rsAPI api . RoomserverInternalAPI ,
2020-06-10 11:17:54 +00:00
eduAPI eduserverAPI . EDUServerInputAPI ,
2020-08-05 12:41:16 +00:00
keyAPI keyapi . KeyInternalAPI ,
2020-06-15 15:57:59 +00:00
keys gomatrixserverlib . JSONVerifier ,
2017-06-07 13:32:53 +00:00
federation * gomatrixserverlib . FederationClient ,
2021-03-30 09:01:32 +00:00
mu * internal . MutexByRoom ,
2021-06-30 11:05:58 +00:00
servers federationAPI . ServersInRoomProvider ,
2017-06-07 13:32:53 +00:00
) util . JSONResponse {
t := txnReq {
2020-06-10 11:17:54 +00:00
rsAPI : rsAPI ,
eduAPI : eduAPI ,
keys : keys ,
federation : federation ,
2021-06-30 09:01:56 +00:00
hadEvents : make ( map [ string ] bool ) ,
2020-06-10 11:17:54 +00:00
haveEvents : make ( map [ string ] * gomatrixserverlib . HeaderedEvent ) ,
2021-06-30 11:05:58 +00:00
servers : servers ,
2020-08-05 12:41:16 +00:00
keyAPI : keyAPI ,
2021-03-30 09:01:32 +00:00
roomsMu : mu ,
2017-06-07 13:32:53 +00:00
}
2020-03-27 16:28:22 +00:00
var txnEvents struct {
2020-03-30 15:40:28 +00:00
PDUs [ ] json . RawMessage ` json:"pdus" `
EDUs [ ] gomatrixserverlib . EDU ` json:"edus" `
2020-03-27 16:28:22 +00:00
}
if err := json . Unmarshal ( request . Content ( ) , & txnEvents ) ; err != nil {
2017-06-07 13:32:53 +00:00
return util . JSONResponse {
2018-03-13 15:55:45 +00:00
Code : http . StatusBadRequest ,
2017-08-23 14:13:47 +00:00
JSON : jsonerror . NotJSON ( "The request body could not be decoded into valid JSON. " + err . Error ( ) ) ,
2017-06-07 13:32:53 +00:00
}
}
2020-06-23 12:15:15 +00:00
// Transactions are limited in size; they can have at most 50 PDUs and 100 EDUs.
// https://matrix.org/docs/spec/server_server/latest#transactions
if len ( txnEvents . PDUs ) > 50 || len ( txnEvents . EDUs ) > 100 {
return util . JSONResponse {
Code : http . StatusBadRequest ,
JSON : jsonerror . BadJSON ( "max 50 pdus / 100 edus" ) ,
}
}
2017-06-07 13:32:53 +00:00
2020-03-30 15:40:28 +00:00
// TODO: Really we should have a function to convert FederationRequest to txnReq
2020-03-27 16:28:22 +00:00
t . PDUs = txnEvents . PDUs
2020-03-30 15:40:28 +00:00
t . EDUs = txnEvents . EDUs
2017-06-07 13:32:53 +00:00
t . Origin = request . Origin ( )
t . TransactionID = txnID
2017-06-19 14:21:04 +00:00
t . Destination = cfg . Matrix . ServerName
2017-06-07 13:32:53 +00:00
2020-10-02 10:38:35 +00:00
util . GetLogger ( httpReq . Context ( ) ) . Infof ( "Received transaction %q from %q containing %d PDUs, %d EDUs" , txnID , request . Origin ( ) , len ( t . PDUs ) , len ( t . EDUs ) )
2020-03-27 16:28:22 +00:00
2021-07-05 11:14:31 +00:00
resp , jsonErr := t . processTransaction ( httpReq . Context ( ) )
2020-06-23 12:15:15 +00:00
if jsonErr != nil {
util . GetLogger ( httpReq . Context ( ) ) . WithField ( "jsonErr" , jsonErr ) . Error ( "t.processTransaction failed" )
return * jsonErr
2017-06-07 13:32:53 +00:00
}
2020-05-13 12:01:45 +00:00
// https://matrix.org/docs/spec/server_server/r0.1.3#put-matrix-federation-v1-send-txnid
// Status code 200:
// The result of processing the transaction. The server is to use this response
// even in the event of one or more PDUs failing to be processed.
2017-06-07 13:32:53 +00:00
return util . JSONResponse {
2020-05-13 12:01:45 +00:00
Code : http . StatusOK ,
JSON : resp ,
2017-06-07 13:32:53 +00:00
}
}
type txnReq struct {
gomatrixserverlib . Transaction
2021-06-30 11:05:58 +00:00
rsAPI api . RoomserverInternalAPI
eduAPI eduserverAPI . EDUServerInputAPI
keyAPI keyapi . KeyInternalAPI
keys gomatrixserverlib . JSONVerifier
federation txnFederationClient
roomsMu * internal . MutexByRoom
// something that can tell us about which servers are in a room right now
servers federationAPI . ServersInRoomProvider
2021-06-30 09:01:56 +00:00
// a list of events from the auth and prev events which we already had
2021-07-07 17:55:44 +00:00
hadEvents map [ string ] bool
hadEventsMutex sync . Mutex
2020-05-12 15:24:28 +00:00
// local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of.
2021-06-30 11:32:20 +00:00
haveEvents map [ string ] * gomatrixserverlib . HeaderedEvent
haveEventsMutex sync . Mutex
work string // metrics
2020-05-06 13:27:02 +00:00
}
2021-07-07 17:55:44 +00:00
func ( t * txnReq ) hadEvent ( eventID string , had bool ) {
t . hadEventsMutex . Lock ( )
defer t . hadEventsMutex . Unlock ( )
t . hadEvents [ eventID ] = had
}
2020-05-06 13:27:02 +00:00
// A subset of FederationClient functionality that txn requires. Useful for testing.
type txnFederationClient interface {
LookupState ( ctx context . Context , s gomatrixserverlib . ServerName , roomID string , eventID string , roomVersion gomatrixserverlib . RoomVersion ) (
res gomatrixserverlib . RespState , err error ,
)
LookupStateIDs ( ctx context . Context , s gomatrixserverlib . ServerName , roomID string , eventID string ) ( res gomatrixserverlib . RespStateIDs , err error )
GetEvent ( ctx context . Context , s gomatrixserverlib . ServerName , eventID string ) ( res gomatrixserverlib . Transaction , err error )
2020-05-12 15:24:28 +00:00
LookupMissingEvents ( ctx context . Context , s gomatrixserverlib . ServerName , roomID string , missing gomatrixserverlib . MissingEvents ,
roomVersion gomatrixserverlib . RoomVersion ) ( res gomatrixserverlib . RespMissingEvents , err error )
2017-06-07 13:32:53 +00:00
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processTransaction ( ctx context . Context ) ( * gomatrixserverlib . RespSend , * util . JSONResponse ) {
2020-04-16 16:59:55 +00:00
results := make ( map [ string ] gomatrixserverlib . PDUResult )
2021-07-02 11:33:27 +00:00
var wg sync . WaitGroup
var tasks [ ] * inputTask
2020-04-16 16:59:55 +00:00
2020-03-27 16:28:22 +00:00
for _ , pdu := range t . PDUs {
2021-03-23 11:33:36 +00:00
pduCountTotal . WithLabelValues ( "total" ) . Inc ( )
2020-03-27 16:28:22 +00:00
var header struct {
RoomID string ` json:"room_id" `
}
if err := json . Unmarshal ( pdu , & header ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warn ( "Transaction: Failed to extract room ID from event" )
2020-05-13 12:01:45 +00:00
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
2020-03-27 16:28:22 +00:00
}
verReq := api . QueryRoomVersionForRoomRequest { RoomID : header . RoomID }
verRes := api . QueryRoomVersionForRoomResponse { }
2020-09-07 11:32:40 +00:00
if err := t . rsAPI . QueryRoomVersionForRoom ( ctx , & verReq , & verRes ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Warn ( "Transaction: Failed to query room version for room" , verReq . RoomID )
2020-05-13 12:01:45 +00:00
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
2020-03-27 16:28:22 +00:00
}
event , err := gomatrixserverlib . NewEventFromUntrustedJSON ( pdu , verRes . RoomVersion )
if err != nil {
2020-06-23 12:15:15 +00:00
if _ , ok := err . ( gomatrixserverlib . BadJSONError ) ; ok {
// Room version 6 states that homeservers should strictly enforce canonical JSON
// on PDUs.
//
// This enforces that the entire transaction is rejected if a single bad PDU is
// sent. It is unclear if this is the correct behaviour or not.
//
// See https://github.com/matrix-org/synapse/issues/7543
return nil , & util . JSONResponse {
Code : 400 ,
JSON : jsonerror . BadJSON ( "PDU contains bad JSON" ) ,
}
2020-05-13 12:01:45 +00:00
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Failed to parse event JSON of event %s" , string ( pdu ) )
2020-05-13 12:01:45 +00:00
continue
2020-03-27 16:28:22 +00:00
}
2020-09-07 11:32:40 +00:00
if api . IsServerBannedFromRoom ( ctx , t . rsAPI , event . RoomID ( ) , t . Origin ) {
2020-08-11 17:19:11 +00:00
results [ event . EventID ( ) ] = gomatrixserverlib . PDUResult {
Error : "Forbidden by server ACLs" ,
}
continue
}
2021-11-02 10:13:38 +00:00
if err = event . VerifyEventSignatures ( ctx , t . keys ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Couldn't validate signature of event %q" , event . EventID ( ) )
2020-05-13 12:01:45 +00:00
results [ event . EventID ( ) ] = gomatrixserverlib . PDUResult {
Error : err . Error ( ) ,
}
continue
2020-03-27 16:28:22 +00:00
}
2021-07-02 11:33:27 +00:00
v , _ := inputWorkers . LoadOrStore ( event . RoomID ( ) , & inputWorker {
input : newSendFIFOQueue ( ) ,
} )
worker := v . ( * inputWorker )
wg . Add ( 1 )
task := & inputTask {
ctx : ctx ,
t : t ,
event : event ,
wg : & wg ,
}
tasks = append ( tasks , task )
worker . input . push ( task )
2021-07-05 11:14:31 +00:00
if worker . running . CAS ( false , true ) {
go worker . run ( )
}
2021-07-02 11:33:27 +00:00
}
2021-07-05 11:14:31 +00:00
t . processEDUs ( ctx )
2021-07-02 11:33:27 +00:00
wg . Wait ( )
for _ , task := range tasks {
if task . err != nil {
results [ task . event . EventID ( ) ] = gomatrixserverlib . PDUResult {
Error : task . err . Error ( ) ,
2017-06-07 13:32:53 +00:00
}
} else {
2021-07-02 11:33:27 +00:00
results [ task . event . EventID ( ) ] = gomatrixserverlib . PDUResult { }
2017-06-07 13:32:53 +00:00
}
}
2020-08-07 14:00:23 +00:00
if c := len ( results ) ; c > 0 {
2021-09-07 14:07:14 +00:00
util . GetLogger ( ctx ) . Infof ( "Processed %d PDUs from %v in transaction %q" , c , t . Origin , t . TransactionID )
2020-08-07 14:00:23 +00:00
}
2017-06-07 13:32:53 +00:00
return & gomatrixserverlib . RespSend { PDUs : results } , nil
}
2021-07-02 11:33:27 +00:00
func ( t * inputWorker ) run ( ) {
defer t . running . Store ( false )
for {
task , ok := t . input . pop ( )
if ! ok {
return
}
if task == nil {
continue
}
func ( ) {
defer task . wg . Done ( )
select {
case <- task . ctx . Done ( ) :
task . err = context . DeadlineExceeded
2021-07-05 12:47:37 +00:00
pduCountTotal . WithLabelValues ( "expired" ) . Inc ( )
2021-07-02 11:33:27 +00:00
return
default :
evStart := time . Now ( )
2021-07-05 11:14:31 +00:00
// TODO: Is 5 minutes too long?
ctx , cancel := context . WithTimeout ( context . Background ( ) , time . Minute * 5 )
task . err = task . t . processEvent ( ctx , task . event )
cancel ( )
2021-07-02 11:33:27 +00:00
task . duration = time . Since ( evStart )
if err := task . err ; err != nil {
switch err . ( type ) {
case * gomatrixserverlib . NotAllowed :
processEventSummary . WithLabelValues ( task . t . work , MetricsOutcomeRejected ) . Observe (
float64 ( time . Since ( evStart ) . Nanoseconds ( ) ) / 1000. ,
)
util . GetLogger ( task . ctx ) . WithError ( err ) . WithField ( "event_id" , task . event . EventID ( ) ) . WithField ( "rejected" , true ) . Warn (
"Failed to process incoming federation event, skipping" ,
)
task . err = nil // make "rejected" failures silent
default :
processEventSummary . WithLabelValues ( task . t . work , MetricsOutcomeFail ) . Observe (
float64 ( time . Since ( evStart ) . Nanoseconds ( ) ) / 1000. ,
)
util . GetLogger ( task . ctx ) . WithError ( err ) . WithField ( "event_id" , task . event . EventID ( ) ) . WithField ( "rejected" , false ) . Warn (
"Failed to process incoming federation event, skipping" ,
)
}
} else {
pduCountTotal . WithLabelValues ( "success" ) . Inc ( )
processEventSummary . WithLabelValues ( task . t . work , MetricsOutcomeOK ) . Observe (
float64 ( time . Since ( evStart ) . Nanoseconds ( ) ) / 1000. ,
)
}
}
} ( )
2020-05-13 12:01:45 +00:00
}
}
2020-04-16 16:59:55 +00:00
type roomNotFoundError struct {
2017-06-07 13:32:53 +00:00
roomID string
}
2020-04-16 16:59:55 +00:00
type verifySigError struct {
eventID string
err error
}
2020-05-12 15:24:28 +00:00
type missingPrevEventsError struct {
eventID string
err error
}
2017-06-07 13:32:53 +00:00
2020-04-16 16:59:55 +00:00
func ( e roomNotFoundError ) Error ( ) string { return fmt . Sprintf ( "room %q not found" , e . roomID ) }
func ( e verifySigError ) Error ( ) string {
return fmt . Sprintf ( "unable to verify signature of event %q: %s" , e . eventID , e . err )
}
2020-05-12 15:24:28 +00:00
func ( e missingPrevEventsError ) Error ( ) string {
return fmt . Sprintf ( "unable to get prev_events for event %q: %s" , e . eventID , e . err )
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processEDUs ( ctx context . Context ) {
for _ , e := range t . EDUs {
2021-03-23 11:33:36 +00:00
eduCountTotal . Inc ( )
2020-03-30 15:40:28 +00:00
switch e . Type {
case gomatrixserverlib . MTyping :
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
var typingPayload struct {
RoomID string ` json:"room_id" `
UserID string ` json:"user_id" `
Typing bool ` json:"typing" `
}
if err := json . Unmarshal ( e . Content , & typingPayload ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal typing event" )
2020-03-30 15:40:28 +00:00
continue
}
2020-10-14 15:49:25 +00:00
_ , domain , err := gomatrixserverlib . SplitID ( '@' , typingPayload . UserID )
if err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to split domain from typing event sender" )
continue
}
if domain != t . Origin {
util . GetLogger ( ctx ) . Warnf ( "Dropping typing event where sender domain (%q) doesn't match origin (%q)" , domain , t . Origin )
continue
}
2020-09-07 11:32:40 +00:00
if err := eduserverAPI . SendTyping ( ctx , t . eduAPI , typingPayload . UserID , typingPayload . RoomID , typingPayload . Typing , 30 * 1000 ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to send typing event to edu server" )
2020-03-30 15:40:28 +00:00
}
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
case gomatrixserverlib . MDirectToDevice :
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
var directPayload gomatrixserverlib . ToDeviceMessage
if err := json . Unmarshal ( e . Content , & directPayload ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal send-to-device events" )
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
continue
}
for userID , byUser := range directPayload . Messages {
for deviceID , message := range byUser {
// TODO: check that the user and the device actually exist here
2020-09-07 11:32:40 +00:00
if err := eduserverAPI . SendToDevice ( ctx , t . eduAPI , directPayload . Sender , userID , deviceID , directPayload . Type , message ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . WithFields ( logrus . Fields {
Send-to-device support (#1072)
* Groundwork for send-to-device messaging
* Update sample config
* Add unstable routing for now
* Send to device consumer in sync API
* Start the send-to-device consumer
* fix indentation in dendrite-config.yaml
* Create send-to-device database tables, other tweaks
* Add some logic for send-to-device messages, add them into sync stream
* Handle incoming send-to-device messages, count them with EDU stream pos
* Undo changes to test
* pq.Array
* Fix sync
* Logging
* Fix a couple of transaction things, fix client API
* Add send-to-device test, hopefully fix bugs
* Comments
* Refactor a bit
* Fix schema
* Fix queries
* Debug logging
* Fix storing and retrieving of send-to-device messages
* Try to avoid database locks
* Update sync position
* Use latest sync position
* Jiggle about sync a bit
* Fix tests
* Break out the retrieval from the update/delete behaviour
* Comments
* nolint on getResponseWithPDUsForCompleteSync
* Try to line up sync tokens again
* Implement wildcard
* Add all send-to-device tests to whitelist, what could possibly go wrong?
* Only care about wildcard when targeted locally
* Deduplicate transactions
* Handle tokens properly, return immediately if waiting send-to-device messages
* Fix sync
* Update sytest-whitelist
* Fix copyright notice (need to do more of this)
* Comments, copyrights
* Return errors from Do, fix dendritejs
* Review comments
* Comments
* Constructor for TransactionWriter
* defletions
* Update gomatrixserverlib, sytest-blacklist
2020-06-01 16:50:19 +00:00
"sender" : directPayload . Sender ,
"user_id" : userID ,
"device_id" : deviceID ,
} ) . Error ( "Failed to send send-to-device event to edu server" )
}
}
}
2020-08-05 12:41:16 +00:00
case gomatrixserverlib . MDeviceListUpdate :
2020-09-07 11:32:40 +00:00
t . processDeviceListUpdate ( ctx , e )
2020-11-09 18:46:11 +00:00
case gomatrixserverlib . MReceipt :
// https://matrix.org/docs/spec/server_server/r0.1.4#receipts
payload := map [ string ] eduserverAPI . FederationReceiptMRead { }
if err := json . Unmarshal ( e . Content , & payload ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal receipt event" )
continue
}
for roomID , receipt := range payload {
for userID , mread := range receipt . User {
_ , domain , err := gomatrixserverlib . SplitID ( '@' , userID )
if err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to split domain from receipt event sender" )
continue
}
if t . Origin != domain {
util . GetLogger ( ctx ) . Warnf ( "Dropping receipt event where sender domain (%q) doesn't match origin (%q)" , domain , t . Origin )
continue
}
if err := t . processReceiptEvent ( ctx , userID , roomID , "m.read" , mread . Data . TS , mread . EventIDs ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . WithFields ( logrus . Fields {
"sender" : t . Origin ,
"user_id" : userID ,
"room_id" : roomID ,
"events" : mread . EventIDs ,
} ) . Error ( "Failed to send receipt event to edu server" )
continue
}
}
}
2021-08-17 12:44:30 +00:00
case eduserverAPI . MSigningKeyUpdate :
var updatePayload eduserverAPI . CrossSigningKeyUpdate
if err := json . Unmarshal ( e . Content , & updatePayload ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . WithFields ( logrus . Fields {
"user_id" : updatePayload . UserID ,
} ) . Error ( "Failed to send signing key update to edu server" )
continue
}
inputReq := & eduserverAPI . InputCrossSigningKeyUpdateRequest {
CrossSigningKeyUpdate : updatePayload ,
}
inputRes := & eduserverAPI . InputCrossSigningKeyUpdateResponse { }
if err := t . eduAPI . InputCrossSigningKeyUpdate ( ctx , inputReq , inputRes ) ; err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal cross-signing update" )
continue
}
2020-03-30 15:40:28 +00:00
default :
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithField ( "type" , e . Type ) . Debug ( "Unhandled EDU" )
2020-03-30 15:40:28 +00:00
}
}
}
2020-11-09 18:46:11 +00:00
// processReceiptEvent sends receipt events to the edu server
func ( t * txnReq ) processReceiptEvent ( ctx context . Context ,
userID , roomID , receiptType string ,
timestamp gomatrixserverlib . Timestamp ,
eventIDs [ ] string ,
) error {
// store every event
for _ , eventID := range eventIDs {
req := eduserverAPI . InputReceiptEventRequest {
InputReceiptEvent : eduserverAPI . InputReceiptEvent {
UserID : userID ,
RoomID : roomID ,
EventID : eventID ,
Type : receiptType ,
Timestamp : timestamp ,
} ,
}
resp := eduserverAPI . InputReceiptEventResponse { }
if err := t . eduAPI . InputReceiptEvent ( ctx , & req , & resp ) ; err != nil {
return fmt . Errorf ( "unable to set receipt event: %w" , err )
}
}
return nil
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) processDeviceListUpdate ( ctx context . Context , e gomatrixserverlib . EDU ) {
2020-08-05 12:41:16 +00:00
var payload gomatrixserverlib . DeviceListUpdateEvent
if err := json . Unmarshal ( e . Content , & payload ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Error ( "Failed to unmarshal device list update event" )
2020-08-05 12:41:16 +00:00
return
}
var inputRes keyapi . InputDeviceListUpdateResponse
t . keyAPI . InputDeviceListUpdate ( context . Background ( ) , & keyapi . InputDeviceListUpdateRequest {
Event : payload ,
} , & inputRes )
if inputRes . Error != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( inputRes . Error ) . WithField ( "user_id" , payload . UserID ) . Error ( "failed to InputDeviceListUpdate" )
2020-08-05 12:41:16 +00:00
}
}
2021-06-30 11:05:58 +00:00
func ( t * txnReq ) getServers ( ctx context . Context , roomID string , event * gomatrixserverlib . Event ) [ ] gomatrixserverlib . ServerName {
// The server that sent us the event should be sufficient to tell us about missing
// prev and auth events.
servers := [ ] gomatrixserverlib . ServerName { t . Origin }
// If the event origin is different to the transaction origin then we can use
// this as a last resort. The origin server that created the event would have
// had to know the auth and prev events.
if event != nil {
if origin := event . Origin ( ) ; origin != t . Origin {
servers = append ( servers , origin )
}
2021-06-29 08:37:28 +00:00
}
2021-06-30 11:05:58 +00:00
// If a specific room-to-server provider exists then use that. This will primarily
// be used for the P2P demos.
if t . servers != nil {
servers = append ( servers , t . servers . GetServersForRoom ( ctx , roomID , event ) ... )
2021-06-29 08:37:28 +00:00
}
2021-06-30 11:05:58 +00:00
return servers
2020-10-13 10:53:20 +00:00
}
2020-11-16 15:44:53 +00:00
func ( t * txnReq ) processEvent ( ctx context . Context , e * gomatrixserverlib . Event ) error {
2020-09-29 12:40:29 +00:00
logger := util . GetLogger ( ctx ) . WithField ( "event_id" , e . EventID ( ) ) . WithField ( "room_id" , e . RoomID ( ) )
2021-03-23 15:22:00 +00:00
t . work = "" // reset from previous event
2017-06-07 13:32:53 +00:00
2021-07-09 15:36:45 +00:00
// Ask the roomserver if we know about the room and/or if we're joined
// to it. If we aren't then we won't bother processing the event.
joinedReq := api . QueryServerJoinedToRoomRequest {
RoomID : e . RoomID ( ) ,
}
var joinedRes api . QueryServerJoinedToRoomResponse
if err := t . rsAPI . QueryServerJoinedToRoom ( ctx , & joinedReq , & joinedRes ) ; err != nil {
return fmt . Errorf ( "t.rsAPI.QueryServerJoinedToRoom: %w" , err )
}
if ! joinedRes . RoomExists || ! joinedRes . IsInRoom {
// We don't believe we're a member of this room, therefore there's
// no point in wasting work trying to figure out what to do with
// missing auth or prev events. Drop the event.
return roomNotFoundError { e . RoomID ( ) }
}
2020-09-29 12:40:29 +00:00
// Work out if the roomserver knows everything it needs to know to auth
2020-10-12 14:56:15 +00:00
// the event. This includes the prev_events and auth_events.
// NOTE! This is going to include prev_events that have an empty state
// snapshot. This is because we will need to re-request the event, and
// it's /state_ids, in order for it to exist in the roomserver correctly
// before the roomserver tries to work out
2020-09-29 12:40:29 +00:00
stateReq := api . QueryMissingAuthPrevEventsRequest {
2017-06-07 13:32:53 +00:00
RoomID : e . RoomID ( ) ,
2020-09-29 12:40:29 +00:00
AuthEventIDs : e . AuthEventIDs ( ) ,
PrevEventIDs : e . PrevEventIDs ( ) ,
2017-06-07 13:32:53 +00:00
}
2020-09-29 12:40:29 +00:00
var stateResp api . QueryMissingAuthPrevEventsResponse
if err := t . rsAPI . QueryMissingAuthPrevEvents ( ctx , & stateReq , & stateResp ) ; err != nil {
2020-10-13 10:53:20 +00:00
return fmt . Errorf ( "t.rsAPI.QueryMissingAuthPrevEvents: %w" , err )
2017-06-07 13:32:53 +00:00
}
2021-06-30 09:01:56 +00:00
// Prepare a map of all the events we already had before this point, so
// that we don't send them to the roomserver again.
for _ , eventID := range append ( e . AuthEventIDs ( ) , e . PrevEventIDs ( ) ... ) {
2021-07-07 17:55:44 +00:00
t . hadEvent ( eventID , true )
2021-06-30 09:01:56 +00:00
}
for _ , eventID := range append ( stateResp . MissingAuthEventIDs , stateResp . MissingPrevEventIDs ... ) {
2021-07-07 17:55:44 +00:00
t . hadEvent ( eventID , false )
2021-06-30 09:01:56 +00:00
}
2020-09-29 12:40:29 +00:00
if len ( stateResp . MissingAuthEventIDs ) > 0 {
2021-03-23 15:22:00 +00:00
t . work = MetricsWorkMissingAuthEvents
2020-09-29 12:40:29 +00:00
logger . Infof ( "Event refers to %d unknown auth_events" , len ( stateResp . MissingAuthEventIDs ) )
2020-10-13 10:53:20 +00:00
if err := t . retrieveMissingAuthEvents ( ctx , e , & stateResp ) ; err != nil {
return fmt . Errorf ( "t.retrieveMissingAuthEvents: %w" , err )
2020-09-29 12:40:29 +00:00
}
}
if len ( stateResp . MissingPrevEventIDs ) > 0 {
2021-03-23 15:22:00 +00:00
t . work = MetricsWorkMissingPrevEvents
2020-09-29 12:40:29 +00:00
logger . Infof ( "Event refers to %d unknown prev_events" , len ( stateResp . MissingPrevEventIDs ) )
2020-10-12 14:56:15 +00:00
return t . processEventWithMissingState ( ctx , e , stateResp . RoomVersion )
2017-06-07 13:32:53 +00:00
}
2021-03-23 15:22:00 +00:00
t . work = MetricsWorkDirect
2017-06-07 13:32:53 +00:00
2020-09-16 12:00:52 +00:00
// pass the event to the roomserver which will do auth checks
// If the event fail auth checks, gmsl.NotAllowed error will be returned which we be silently
// discarded by the caller of this function
2020-09-03 14:22:16 +00:00
return api . SendEvents (
2020-09-07 11:32:40 +00:00
context . Background ( ) ,
t . rsAPI ,
2020-10-19 13:59:13 +00:00
api . KindNew ,
2020-11-16 15:44:53 +00:00
[ ] * gomatrixserverlib . HeaderedEvent {
2020-03-27 16:28:22 +00:00
e . Headered ( stateResp . RoomVersion ) ,
} ,
api . DoNotSendToOtherServers ,
nil ,
)
2017-06-07 13:32:53 +00:00
}
2020-10-13 10:53:20 +00:00
func ( t * txnReq ) retrieveMissingAuthEvents (
2020-11-16 15:44:53 +00:00
ctx context . Context , e * gomatrixserverlib . Event , stateResp * api . QueryMissingAuthPrevEventsResponse ,
2020-10-13 10:53:20 +00:00
) error {
logger := util . GetLogger ( ctx ) . WithField ( "event_id" , e . EventID ( ) ) . WithField ( "room_id" , e . RoomID ( ) )
missingAuthEvents := make ( map [ string ] struct { } )
for _ , missingAuthEventID := range stateResp . MissingAuthEventIDs {
missingAuthEvents [ missingAuthEventID ] = struct { } { }
}
withNextEvent :
for missingAuthEventID := range missingAuthEvents {
withNextServer :
2021-06-30 11:05:58 +00:00
for _ , server := range t . getServers ( ctx , e . RoomID ( ) , e ) {
2020-10-13 10:53:20 +00:00
logger . Infof ( "Retrieving missing auth event %q from %q" , missingAuthEventID , server )
tx , err := t . federation . GetEvent ( ctx , server , missingAuthEventID )
if err != nil {
logger . WithError ( err ) . Warnf ( "Failed to retrieve auth event %q" , missingAuthEventID )
2021-06-30 09:39:47 +00:00
if errors . Is ( err , context . DeadlineExceeded ) {
return err
}
2020-10-13 10:53:20 +00:00
continue withNextServer
}
ev , err := gomatrixserverlib . NewEventFromUntrustedJSON ( tx . PDUs [ 0 ] , stateResp . RoomVersion )
if err != nil {
logger . WithError ( err ) . Warnf ( "Failed to unmarshal auth event %q" , missingAuthEventID )
continue withNextServer
}
if err = api . SendInputRoomEvents (
context . Background ( ) ,
t . rsAPI ,
[ ] api . InputRoomEvent {
{
Kind : api . KindOutlier ,
Event : ev . Headered ( stateResp . RoomVersion ) ,
AuthEventIDs : ev . AuthEventIDs ( ) ,
SendAsServer : api . DoNotSendToOtherServers ,
} ,
} ,
) ; err != nil {
return fmt . Errorf ( "api.SendEvents: %w" , err )
}
2021-07-07 17:55:44 +00:00
t . hadEvent ( ev . EventID ( ) , true ) // if the roomserver didn't know about the event before, it does now
2021-06-30 09:01:56 +00:00
t . cacheAndReturn ( ev . Headered ( stateResp . RoomVersion ) )
2020-10-13 10:53:20 +00:00
delete ( missingAuthEvents , missingAuthEventID )
continue withNextEvent
}
}
if missing := len ( missingAuthEvents ) ; missing > 0 {
2021-09-08 16:31:03 +00:00
return fmt . Errorf ( "event refers to %d auth_events which we failed to fetch" , missing )
2020-10-13 10:53:20 +00:00
}
return nil
}
2020-11-16 15:44:53 +00:00
func checkAllowedByState ( e * gomatrixserverlib . Event , stateEvents [ ] * gomatrixserverlib . Event ) error {
2017-06-07 13:32:53 +00:00
authUsingState := gomatrixserverlib . NewAuthEvents ( nil )
for i := range stateEvents {
2020-11-16 15:44:53 +00:00
err := authUsingState . AddEvent ( stateEvents [ i ] )
2017-09-20 09:59:19 +00:00
if err != nil {
return err
}
2017-06-07 13:32:53 +00:00
}
return gomatrixserverlib . Allowed ( e , & authUsingState )
}
2021-04-08 12:50:39 +00:00
func ( t * txnReq ) processEventWithMissingState (
ctx context . Context , e * gomatrixserverlib . Event , roomVersion gomatrixserverlib . RoomVersion ,
) error {
2017-06-07 13:32:53 +00:00
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
// 1) We can fill in the gap using /get_missing_events
// 2) We can leave the gap and request the state of the room at
// this event from the remote server using either /state_ids
// or /state.
// Synapse will attempt to do 1 and if that fails or if the gap is
// too large then it will attempt 2.
2017-06-12 17:30:47 +00:00
// Synapse will use /state_ids if possible since usually the state
2017-06-07 13:32:53 +00:00
// is largely unchanged and it is more efficient to fetch a list of
// event ids and then use /event to fetch the individual events.
// However not all version of synapse support /state_ids so you may
// need to fallback to /state.
2020-05-05 14:48:37 +00:00
2020-05-12 15:24:28 +00:00
// Attempt to fill in the gap using /get_missing_events
// This will either:
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
2021-07-02 11:33:27 +00:00
newEvents , err := t . getMissingEvents ( ctx , e , roomVersion )
2017-06-07 13:32:53 +00:00
if err != nil {
2020-05-12 15:24:28 +00:00
return err
}
2020-10-12 14:56:15 +00:00
if len ( newEvents ) == 0 {
2020-05-12 15:24:28 +00:00
return nil
}
2020-11-16 15:44:53 +00:00
backwardsExtremity := newEvents [ 0 ]
2020-10-12 14:56:15 +00:00
newEvents = newEvents [ 1 : ]
2020-10-15 11:08:49 +00:00
type respState struct {
// A snapshot is considered trustworthy if it came from our own roomserver.
// That's because the state will have been through state resolution once
// already in QueryStateAfterEvent.
trustworthy bool
* gomatrixserverlib . RespState
}
2020-05-12 15:24:28 +00:00
// at this point we know we're going to have a gap: we need to work out the room state at the new backwards extremity.
// Therefore, we cannot just query /state_ids with this event to get the state before. Instead, we need to query
2020-06-29 13:39:21 +00:00
// the state AFTER all the prev_events for this event, then apply state resolution to that to get the state before the event.
2020-10-15 11:08:49 +00:00
var states [ ] * respState
2020-05-12 15:24:28 +00:00
for _ , prevEventID := range backwardsExtremity . PrevEventIDs ( ) {
2020-10-12 14:56:15 +00:00
// Look up what the state is after the backward extremity. This will either
// come from the roomserver, if we know all the required events, or it will
// come from a remote server via /state_ids if not.
2021-07-02 11:33:27 +00:00
prevState , trustworthy , lerr := t . lookupStateAfterEvent ( ctx , roomVersion , backwardsExtremity . RoomID ( ) , prevEventID )
2020-10-15 11:08:49 +00:00
if lerr != nil {
util . GetLogger ( ctx ) . WithError ( lerr ) . Errorf ( "Failed to lookup state after prev_event: %s" , prevEventID )
return lerr
2020-05-05 14:48:37 +00:00
}
2020-10-12 14:56:15 +00:00
// Append the state onto the collected state. We'll run this through the
// state resolution next.
2020-10-15 11:08:49 +00:00
states = append ( states , & respState { trustworthy , prevState } )
2020-05-12 15:24:28 +00:00
}
2020-10-12 14:56:15 +00:00
// Now that we have collected all of the state from the prev_events, we'll
// run the state through the appropriate state resolution algorithm for the
2020-10-15 11:08:49 +00:00
// room if needed. This does a couple of things:
2020-10-12 14:56:15 +00:00
// 1. Ensures that the state is deduplicated fully for each state-key tuple
// 2. Ensures that we pick the latest events from both sets, in the case that
// one of the prev_events is quite a bit older than the others
2020-10-15 11:08:49 +00:00
resolvedState := & gomatrixserverlib . RespState { }
switch len ( states ) {
case 0 :
extremityIsCreate := backwardsExtremity . Type ( ) == gomatrixserverlib . MRoomCreate && backwardsExtremity . StateKeyEquals ( "" )
if ! extremityIsCreate {
// There are no previous states and this isn't the beginning of the
// room - this is an error condition!
util . GetLogger ( ctx ) . Errorf ( "Failed to lookup any state after prev_events" )
return fmt . Errorf ( "expected %d states but got %d" , len ( backwardsExtremity . PrevEventIDs ( ) ) , len ( states ) )
}
case 1 :
// There's only one previous state - if it's trustworthy (came from a
// local state snapshot which will already have been through state res),
// use it as-is. There's no point in resolving it again.
if states [ 0 ] . trustworthy {
resolvedState = states [ 0 ] . RespState
break
}
// Otherwise, if it isn't trustworthy (came from federation), run it through
// state resolution anyway for safety, in case there are duplicates.
fallthrough
default :
respStates := make ( [ ] * gomatrixserverlib . RespState , len ( states ) )
for i := range states {
respStates [ i ] = states [ i ] . RespState
}
// There's more than one previous state - run them all through state res
2021-04-13 10:13:07 +00:00
t . roomsMu . Lock ( e . RoomID ( ) )
2021-07-02 11:33:27 +00:00
resolvedState , err = t . resolveStatesAndCheck ( ctx , roomVersion , respStates , backwardsExtremity )
2021-04-13 10:13:07 +00:00
t . roomsMu . Unlock ( e . RoomID ( ) )
2020-10-15 11:08:49 +00:00
if err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . Errorf ( "Failed to resolve state conflicts for event %s" , backwardsExtremity . EventID ( ) )
return err
}
2020-05-12 15:24:28 +00:00
}
2020-10-12 14:56:15 +00:00
// First of all, send the backward extremity into the roomserver with the
// newly resolved state. This marks the "oldest" point in the backfill and
2021-07-07 17:55:44 +00:00
// sets the baseline state for any new events after this. We'll make a
// copy of the hadEvents map so that it can be taken downstream without
// worrying about concurrent map reads/writes, since t.hadEvents is meant
// to be protected by a mutex.
hadEvents := map [ string ] bool { }
t . hadEventsMutex . Lock ( )
for k , v := range t . hadEvents {
hadEvents [ k ] = v
}
t . hadEventsMutex . Unlock ( )
2020-10-12 14:56:15 +00:00
err = api . SendEventWithState (
context . Background ( ) ,
t . rsAPI ,
2020-10-19 13:59:13 +00:00
api . KindOld ,
2020-10-12 14:56:15 +00:00
resolvedState ,
backwardsExtremity . Headered ( roomVersion ) ,
2021-07-07 17:55:44 +00:00
hadEvents ,
2020-10-12 14:56:15 +00:00
)
if err != nil {
return fmt . Errorf ( "api.SendEventWithState: %w" , err )
}
// Then send all of the newer backfilled events, of which will all be newer
// than the backward extremity, into the roomserver without state. This way
// they will automatically fast-forward based on the room state at the
// extremity in the last step.
2020-11-16 15:44:53 +00:00
headeredNewEvents := make ( [ ] * gomatrixserverlib . HeaderedEvent , len ( newEvents ) )
2020-10-12 14:56:15 +00:00
for i , newEvent := range newEvents {
headeredNewEvents [ i ] = newEvent . Headered ( roomVersion )
}
if err = api . SendEvents (
context . Background ( ) ,
t . rsAPI ,
2020-10-19 13:59:13 +00:00
api . KindOld ,
2020-10-12 14:56:15 +00:00
append ( headeredNewEvents , e . Headered ( roomVersion ) ) ,
api . DoNotSendToOtherServers ,
nil ,
) ; err != nil {
return fmt . Errorf ( "api.SendEvents: %w" , err )
}
return nil
2020-05-12 15:24:28 +00:00
}
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
2020-10-15 11:08:49 +00:00
func ( t * txnReq ) lookupStateAfterEvent ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , roomID , eventID string ) ( * gomatrixserverlib . RespState , bool , error ) {
2020-05-12 15:24:28 +00:00
// try doing all this locally before we resort to querying federation
2020-10-14 11:39:37 +00:00
respState := t . lookupStateAfterEventLocally ( ctx , roomID , eventID )
2020-05-12 15:24:28 +00:00
if respState != nil {
2020-10-15 11:08:49 +00:00
return respState , true , nil
2020-05-12 15:24:28 +00:00
}
2020-09-07 11:32:40 +00:00
respState , err := t . lookupStateBeforeEvent ( ctx , roomVersion , roomID , eventID )
2020-05-12 15:24:28 +00:00
if err != nil {
2020-10-15 11:08:49 +00:00
return nil , false , fmt . Errorf ( "t.lookupStateBeforeEvent: %w" , err )
2020-10-13 10:53:20 +00:00
}
2020-05-12 15:24:28 +00:00
// fetch the event we're missing and add it to the pile
2021-02-16 17:12:17 +00:00
h , err := t . lookupEvent ( ctx , roomVersion , roomID , eventID , false )
2020-09-08 09:28:13 +00:00
switch err . ( type ) {
case verifySigError :
2020-10-15 11:08:49 +00:00
return respState , false , nil
2020-09-08 09:28:13 +00:00
case nil :
// do nothing
default :
2020-10-15 11:08:49 +00:00
return nil , false , fmt . Errorf ( "t.lookupEvent: %w" , err )
2020-05-12 15:24:28 +00:00
}
2021-06-30 09:01:56 +00:00
h = t . cacheAndReturn ( h )
2020-05-12 15:24:28 +00:00
if h . StateKey ( ) != nil {
addedToState := false
for i := range respState . StateEvents {
se := respState . StateEvents [ i ]
if se . Type ( ) == h . Type ( ) && se . StateKeyEquals ( * h . StateKey ( ) ) {
respState . StateEvents [ i ] = h . Unwrap ( )
addedToState = true
break
}
}
if ! addedToState {
respState . StateEvents = append ( respState . StateEvents , h . Unwrap ( ) )
}
}
2020-10-15 11:08:49 +00:00
return respState , false , nil
2020-05-12 15:24:28 +00:00
}
2021-04-08 12:50:39 +00:00
func ( t * txnReq ) cacheAndReturn ( ev * gomatrixserverlib . HeaderedEvent ) * gomatrixserverlib . HeaderedEvent {
2021-06-30 11:32:20 +00:00
t . haveEventsMutex . Lock ( )
defer t . haveEventsMutex . Unlock ( )
2021-04-08 12:50:39 +00:00
if cached , exists := t . haveEvents [ ev . EventID ( ) ] ; exists {
return cached
}
t . haveEvents [ ev . EventID ( ) ] = ev
return ev
}
2020-10-14 11:39:37 +00:00
func ( t * txnReq ) lookupStateAfterEventLocally ( ctx context . Context , roomID , eventID string ) * gomatrixserverlib . RespState {
2020-05-12 15:24:28 +00:00
var res api . QueryStateAfterEventsResponse
2020-09-07 11:32:40 +00:00
err := t . rsAPI . QueryStateAfterEvents ( ctx , & api . QueryStateAfterEventsRequest {
2020-05-12 15:24:28 +00:00
RoomID : roomID ,
PrevEventIDs : [ ] string { eventID } ,
} , & res )
if err != nil || ! res . PrevEventsExist {
2021-04-08 12:50:39 +00:00
util . GetLogger ( ctx ) . WithField ( "room_id" , roomID ) . WithError ( err ) . Warnf ( "failed to query state after %s locally, prev exists=%v" , eventID , res . PrevEventsExist )
2020-05-12 15:24:28 +00:00
return nil
}
2021-04-08 12:50:39 +00:00
stateEvents := make ( [ ] * gomatrixserverlib . HeaderedEvent , len ( res . StateEvents ) )
2020-05-12 15:24:28 +00:00
for i , ev := range res . StateEvents {
2021-04-08 12:50:39 +00:00
// set the event from the haveEvents cache - this means we will share pointers with other prev_event branches for this
// processEvent request, which is better for memory.
stateEvents [ i ] = t . cacheAndReturn ( ev )
2021-07-07 17:55:44 +00:00
t . hadEvent ( ev . EventID ( ) , true )
2020-05-12 15:24:28 +00:00
}
2021-04-08 12:50:39 +00:00
// we should never access res.StateEvents again so we delete it here to make GC faster
res . StateEvents = nil
2020-11-16 15:44:53 +00:00
var authEvents [ ] * gomatrixserverlib . Event
2021-02-16 17:12:17 +00:00
missingAuthEvents := map [ string ] bool { }
2021-04-08 12:50:39 +00:00
for _ , ev := range stateEvents {
2021-06-30 11:32:20 +00:00
t . haveEventsMutex . Lock ( )
2020-05-12 15:24:28 +00:00
for _ , ae := range ev . AuthEventIDs ( ) {
2021-02-16 17:12:17 +00:00
if aev , ok := t . haveEvents [ ae ] ; ok {
2020-05-12 15:24:28 +00:00
authEvents = append ( authEvents , aev . Unwrap ( ) )
} else {
missingAuthEvents [ ae ] = true
}
}
2021-06-30 11:32:20 +00:00
t . haveEventsMutex . Unlock ( )
2020-05-12 15:24:28 +00:00
}
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event.
2021-02-16 17:12:17 +00:00
if len ( missingAuthEvents ) > 0 {
var missingEventList [ ] string
for evID := range missingAuthEvents {
missingEventList = append ( missingEventList , evID )
}
queryReq := api . QueryEventsByIDRequest {
EventIDs : missingEventList ,
}
2021-03-23 15:22:00 +00:00
util . GetLogger ( ctx ) . WithField ( "count" , len ( missingEventList ) ) . Infof ( "Fetching missing auth events" )
2021-02-16 17:12:17 +00:00
var queryRes api . QueryEventsByIDResponse
if err = t . rsAPI . QueryEventsByID ( ctx , & queryReq , & queryRes ) ; err != nil {
return nil
}
2021-06-30 09:01:56 +00:00
for i , ev := range queryRes . Events {
2021-04-08 12:50:39 +00:00
authEvents = append ( authEvents , t . cacheAndReturn ( queryRes . Events [ i ] ) . Unwrap ( ) )
2021-07-07 17:55:44 +00:00
t . hadEvent ( ev . EventID ( ) , true )
2021-02-16 17:12:17 +00:00
}
2021-04-08 12:50:39 +00:00
queryRes . Events = nil
2020-05-12 15:24:28 +00:00
}
return & gomatrixserverlib . RespState {
2021-04-08 12:50:39 +00:00
StateEvents : gomatrixserverlib . UnwrapEventHeaders ( stateEvents ) ,
2020-05-12 15:24:28 +00:00
AuthEvents : authEvents ,
}
}
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
// the server supports.
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupStateBeforeEvent ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , roomID , eventID string ) (
2020-09-28 09:03:18 +00:00
* gomatrixserverlib . RespState , error ) {
2020-05-12 15:24:28 +00:00
// Attempt to fetch the missing state using /state_ids and /events
2020-09-28 09:03:18 +00:00
return t . lookupMissingStateViaStateIDs ( ctx , roomID , eventID , roomVersion )
2020-05-12 15:24:28 +00:00
}
2020-05-05 14:48:37 +00:00
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) resolveStatesAndCheck ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , states [ ] * gomatrixserverlib . RespState , backwardsExtremity * gomatrixserverlib . Event ) ( * gomatrixserverlib . RespState , error ) {
2020-11-16 15:44:53 +00:00
var authEventList [ ] * gomatrixserverlib . Event
var stateEventList [ ] * gomatrixserverlib . Event
2020-05-12 15:24:28 +00:00
for _ , state := range states {
authEventList = append ( authEventList , state . AuthEvents ... )
stateEventList = append ( stateEventList , state . StateEvents ... )
}
resolvedStateEvents , err := gomatrixserverlib . ResolveConflicts ( roomVersion , stateEventList , authEventList )
if err != nil {
return nil , err
}
// apply the current event
2020-03-06 16:58:10 +00:00
retryAllowedState :
2020-11-16 15:44:53 +00:00
if err = checkAllowedByState ( backwardsExtremity , resolvedStateEvents ) ; err != nil {
2020-03-06 16:58:10 +00:00
switch missing := err . ( type ) {
case gomatrixserverlib . MissingAuthEventError :
2021-02-16 17:12:17 +00:00
h , err2 := t . lookupEvent ( ctx , roomVersion , backwardsExtremity . RoomID ( ) , missing . AuthEventID , true )
2020-09-08 09:28:13 +00:00
switch err2 . ( type ) {
case verifySigError :
return & gomatrixserverlib . RespState {
AuthEvents : authEventList ,
StateEvents : resolvedStateEvents ,
} , nil
case nil :
// do nothing
default :
2020-05-12 15:24:28 +00:00
return nil , fmt . Errorf ( "missing auth event %s and failed to look it up: %w" , missing . AuthEventID , err2 )
2020-03-06 16:58:10 +00:00
}
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . Infof ( "fetched event %s" , missing . AuthEventID )
2020-05-12 15:24:28 +00:00
resolvedStateEvents = append ( resolvedStateEvents , h . Unwrap ( ) )
goto retryAllowedState
2020-03-06 16:58:10 +00:00
default :
}
2020-05-12 15:24:28 +00:00
return nil , err
2017-06-07 13:32:53 +00:00
}
2020-05-12 15:24:28 +00:00
return & gomatrixserverlib . RespState {
AuthEvents : authEventList ,
StateEvents : resolvedStateEvents ,
} , nil
}
2020-03-27 16:28:22 +00:00
2020-11-16 15:44:53 +00:00
func ( t * txnReq ) getMissingEvents ( ctx context . Context , e * gomatrixserverlib . Event , roomVersion gomatrixserverlib . RoomVersion ) ( newEvents [ ] * gomatrixserverlib . Event , err error ) {
2020-09-07 11:32:40 +00:00
logger := util . GetLogger ( ctx ) . WithField ( "event_id" , e . EventID ( ) ) . WithField ( "room_id" , e . RoomID ( ) )
2020-11-16 15:44:53 +00:00
needed := gomatrixserverlib . StateNeededForAuth ( [ ] * gomatrixserverlib . Event { e } )
2020-05-12 15:24:28 +00:00
// query latest events (our trusted forward extremities)
req := api . QueryLatestEventsAndStateRequest {
RoomID : e . RoomID ( ) ,
StateToFetch : needed . Tuples ( ) ,
}
var res api . QueryLatestEventsAndStateResponse
2020-09-07 11:32:40 +00:00
if err = t . rsAPI . QueryLatestEventsAndState ( ctx , & req , & res ) ; err != nil {
2020-05-12 15:24:28 +00:00
logger . WithError ( err ) . Warn ( "Failed to query latest events" )
2020-10-12 14:56:15 +00:00
return nil , err
2020-05-12 15:24:28 +00:00
}
latestEvents := make ( [ ] string , len ( res . LatestEvents ) )
2021-06-30 09:01:56 +00:00
for i , ev := range res . LatestEvents {
2020-05-12 15:24:28 +00:00
latestEvents [ i ] = res . LatestEvents [ i ] . EventID
2021-07-07 17:55:44 +00:00
t . hadEvent ( ev . EventID , true )
2020-05-12 15:24:28 +00:00
}
2020-09-29 12:40:29 +00:00
var missingResp * gomatrixserverlib . RespMissingEvents
2021-06-30 11:05:58 +00:00
servers := t . getServers ( ctx , e . RoomID ( ) , e )
2020-09-29 12:40:29 +00:00
for _ , server := range servers {
var m gomatrixserverlib . RespMissingEvents
if m , err = t . federation . LookupMissingEvents ( ctx , server , e . RoomID ( ) , gomatrixserverlib . MissingEvents {
Limit : 20 ,
// The latest event IDs that the sender already has. These are skipped when retrieving the previous events of latest_events.
EarliestEvents : latestEvents ,
// The event IDs to retrieve the previous events for.
LatestEvents : [ ] string { e . EventID ( ) } ,
} , roomVersion ) ; err == nil {
missingResp = & m
break
} else {
logger . WithError ( err ) . Errorf ( "%s pushed us an event but %q did not respond to /get_missing_events" , t . Origin , server )
2021-06-30 09:39:47 +00:00
if errors . Is ( err , context . DeadlineExceeded ) {
break
}
2020-09-29 12:40:29 +00:00
}
}
if missingResp == nil {
logger . WithError ( err ) . Errorf (
"%s pushed us an event but %d server(s) couldn't give us details about prev_events via /get_missing_events - dropping this event until it can" ,
t . Origin , len ( servers ) ,
)
return nil , missingPrevEventsError {
eventID : e . EventID ( ) ,
err : err ,
}
}
2020-05-12 15:24:28 +00:00
// security: how we handle failures depends on whether or not this event will become the new forward extremity for the room.
// There's 2 scenarios to consider:
// - Case A: We got pushed an event and are now fetching missing prev_events. (isInboundTxn=true)
// - Case B: We are fetching missing prev_events already and now fetching some more (isInboundTxn=false)
// In Case B, we know for sure that the event we are currently processing will not become the new forward extremity for the room,
// as it was called in response to an inbound txn which had it as a prev_event.
// In Case A, the event is a forward extremity, and could eventually become the _only_ forward extremity in the room. This is bad
// because it means we would trust the state at that event to be the state for the entire room, and allows rooms to be hijacked.
// https://github.com/matrix-org/synapse/pull/3456
// https://github.com/matrix-org/synapse/blob/229eb81498b0fe1da81e9b5b333a0285acde9446/synapse/handlers/federation.py#L335
// For now, we do not allow Case B, so reject the event.
logger . Infof ( "get_missing_events returned %d events" , len ( missingResp . Events ) )
2021-06-30 09:01:56 +00:00
// Make sure events from the missingResp are using the cache - missing events
// will be added and duplicates will be removed.
for i , ev := range missingResp . Events {
missingResp . Events [ i ] = t . cacheAndReturn ( ev . Headered ( roomVersion ) ) . Unwrap ( )
}
2020-05-12 15:24:28 +00:00
// topologically sort and sanity check that we are making forward progress
2020-10-12 14:56:15 +00:00
newEvents = gomatrixserverlib . ReverseTopologicalOrdering ( missingResp . Events , gomatrixserverlib . TopologicalOrderByPrevEvents )
2020-05-12 15:24:28 +00:00
shouldHaveSomeEventIDs := e . PrevEventIDs ( )
hasPrevEvent := false
Event :
for _ , pe := range shouldHaveSomeEventIDs {
for _ , ev := range newEvents {
if ev . EventID ( ) == pe {
hasPrevEvent = true
break Event
}
}
}
if ! hasPrevEvent {
err = fmt . Errorf ( "called /get_missing_events but server %s didn't return any prev_events with IDs %v" , t . Origin , shouldHaveSomeEventIDs )
logger . WithError ( err ) . Errorf (
"%s pushed us an event but couldn't give us details about prev_events via /get_missing_events - dropping this event until it can" ,
t . Origin ,
)
return nil , missingPrevEventsError {
eventID : e . EventID ( ) ,
err : err ,
}
}
2020-10-12 14:56:15 +00:00
return newEvents , nil
2020-05-05 14:48:37 +00:00
}
2020-09-28 10:32:59 +00:00
func ( t * txnReq ) lookupMissingStateViaState ( ctx context . Context , roomID , eventID string , roomVersion gomatrixserverlib . RoomVersion ) (
respState * gomatrixserverlib . RespState , err error ) {
state , err := t . federation . LookupState ( ctx , t . Origin , roomID , eventID , roomVersion )
if err != nil {
return nil , err
}
// Check that the returned state is valid.
if err := state . Check ( ctx , t . keys , nil ) ; err != nil {
return nil , err
}
2021-06-30 09:01:56 +00:00
// Cache the results of this state lookup and deduplicate anything we already
// have in the cache, freeing up memory.
for i , ev := range state . AuthEvents {
state . AuthEvents [ i ] = t . cacheAndReturn ( ev . Headered ( roomVersion ) ) . Unwrap ( )
}
for i , ev := range state . StateEvents {
state . StateEvents [ i ] = t . cacheAndReturn ( ev . Headered ( roomVersion ) ) . Unwrap ( )
}
2020-09-28 10:32:59 +00:00
return & state , nil
}
2020-09-07 11:32:40 +00:00
func ( t * txnReq ) lookupMissingStateViaStateIDs ( ctx context . Context , roomID , eventID string , roomVersion gomatrixserverlib . RoomVersion ) (
2020-05-12 15:24:28 +00:00
* gomatrixserverlib . RespState , error ) {
2021-04-08 12:50:39 +00:00
util . GetLogger ( ctx ) . WithField ( "room_id" , roomID ) . Infof ( "lookupMissingStateViaStateIDs %s" , eventID )
2020-05-05 14:48:37 +00:00
// fetch the state event IDs at the time of the event
2020-09-07 11:32:40 +00:00
stateIDs , err := t . federation . LookupStateIDs ( ctx , t . Origin , roomID , eventID )
2020-05-05 14:48:37 +00:00
if err != nil {
2020-05-12 15:24:28 +00:00
return nil , err
2020-05-05 15:46:22 +00:00
}
2020-05-05 14:48:37 +00:00
// work out which auth/state IDs are missing
wantIDs := append ( stateIDs . StateEventIDs , stateIDs . AuthEventIDs ... )
missing := make ( map [ string ] bool )
2020-05-12 15:24:28 +00:00
var missingEventList [ ] string
2021-06-30 11:32:20 +00:00
t . haveEventsMutex . Lock ( )
2020-05-05 14:48:37 +00:00
for _ , sid := range wantIDs {
2020-05-12 15:24:28 +00:00
if _ , ok := t . haveEvents [ sid ] ; ! ok {
if ! missing [ sid ] {
missing [ sid ] = true
missingEventList = append ( missingEventList , sid )
}
}
}
2021-06-30 11:32:20 +00:00
t . haveEventsMutex . Unlock ( )
2020-05-12 15:24:28 +00:00
// fetch as many as we can from the roomserver
queryReq := api . QueryEventsByIDRequest {
EventIDs : missingEventList ,
}
var queryRes api . QueryEventsByIDResponse
2020-09-07 11:32:40 +00:00
if err = t . rsAPI . QueryEventsByID ( ctx , & queryReq , & queryRes ) ; err != nil {
2020-05-12 15:24:28 +00:00
return nil , err
}
2021-06-30 09:01:56 +00:00
for i , ev := range queryRes . Events {
queryRes . Events [ i ] = t . cacheAndReturn ( queryRes . Events [ i ] )
2021-07-07 17:55:44 +00:00
t . hadEvent ( ev . EventID ( ) , true )
2020-05-12 15:24:28 +00:00
evID := queryRes . Events [ i ] . EventID ( )
if missing [ evID ] {
delete ( missing , evID )
2020-05-05 14:48:37 +00:00
}
}
2021-04-08 12:50:39 +00:00
queryRes . Events = nil // allow it to be GCed
2020-05-12 15:24:28 +00:00
2020-09-28 10:32:59 +00:00
concurrentRequests := 8
missingCount := len ( missing )
2021-04-08 12:50:39 +00:00
util . GetLogger ( ctx ) . WithField ( "room_id" , roomID ) . WithField ( "event_id" , eventID ) . Infof ( "lookupMissingStateViaStateIDs missing %d/%d events" , missingCount , len ( wantIDs ) )
2020-09-28 10:32:59 +00:00
// If over 50% of the auth/state events from /state_ids are missing
// then we'll just call /state instead, otherwise we'll just end up
// hammering the remote side with /event requests unnecessarily.
if missingCount > concurrentRequests && missingCount > len ( wantIDs ) / 2 {
util . GetLogger ( ctx ) . WithFields ( logrus . Fields {
"missing" : missingCount ,
"event_id" : eventID ,
"room_id" : roomID ,
"total_state" : len ( stateIDs . StateEventIDs ) ,
"total_auth_events" : len ( stateIDs . AuthEventIDs ) ,
} ) . Info ( "Fetching all state at event" )
return t . lookupMissingStateViaState ( ctx , roomID , eventID , roomVersion )
}
2021-01-04 13:47:48 +00:00
if missingCount > 0 {
util . GetLogger ( ctx ) . WithFields ( logrus . Fields {
"missing" : missingCount ,
"event_id" : eventID ,
"room_id" : roomID ,
"total_state" : len ( stateIDs . StateEventIDs ) ,
"total_auth_events" : len ( stateIDs . AuthEventIDs ) ,
"concurrent_requests" : concurrentRequests ,
} ) . Info ( "Fetching missing state at event" )
// Create a queue containing all of the missing event IDs that we want
// to retrieve.
pending := make ( chan string , missingCount )
for missingEventID := range missing {
pending <- missingEventID
}
close ( pending )
2020-10-13 10:53:20 +00:00
2021-01-04 13:47:48 +00:00
// Define how many workers we should start to do this.
if missingCount < concurrentRequests {
concurrentRequests = missingCount
}
2020-09-28 10:32:59 +00:00
2021-01-04 13:47:48 +00:00
// Create the wait group.
var fetchgroup sync . WaitGroup
fetchgroup . Add ( concurrentRequests )
2020-09-28 10:32:59 +00:00
2021-01-04 13:47:48 +00:00
// This is the only place where we'll write to t.haveEvents from
// multiple goroutines, and everywhere else is blocked on this
// synchronous function anyway.
var haveEventsMutex sync . Mutex
2020-09-28 10:32:59 +00:00
2021-01-04 13:47:48 +00:00
// Define what we'll do in order to fetch the missing event ID.
fetch := func ( missingEventID string ) {
var h * gomatrixserverlib . HeaderedEvent
2021-02-16 17:12:17 +00:00
h , err = t . lookupEvent ( ctx , roomVersion , roomID , missingEventID , false )
2021-01-04 13:47:48 +00:00
switch err . ( type ) {
case verifySigError :
return
case nil :
break
default :
util . GetLogger ( ctx ) . WithFields ( logrus . Fields {
"event_id" : missingEventID ,
"room_id" : roomID ,
} ) . Info ( "Failed to fetch missing event" )
return
}
haveEventsMutex . Lock ( )
2021-04-08 12:50:39 +00:00
t . cacheAndReturn ( h )
2021-01-04 13:47:48 +00:00
haveEventsMutex . Unlock ( )
}
2020-09-28 10:32:59 +00:00
2021-01-04 13:47:48 +00:00
// Create the worker.
worker := func ( ch <- chan string ) {
defer fetchgroup . Done ( )
for missingEventID := range ch {
fetch ( missingEventID )
}
2020-05-05 14:48:37 +00:00
}
2020-09-28 10:32:59 +00:00
2021-01-04 13:47:48 +00:00
// Start the workers.
for i := 0 ; i < concurrentRequests ; i ++ {
go worker ( pending )
2020-09-28 10:32:59 +00:00
}
2021-01-04 13:47:48 +00:00
// Wait for the workers to finish.
fetchgroup . Wait ( )
2020-09-28 10:32:59 +00:00
}
2020-05-12 15:24:28 +00:00
resp , err := t . createRespStateFromStateIDs ( stateIDs )
return resp , err
2020-05-05 14:48:37 +00:00
}
2020-05-12 15:24:28 +00:00
func ( t * txnReq ) createRespStateFromStateIDs ( stateIDs gomatrixserverlib . RespStateIDs ) (
2020-10-13 10:53:20 +00:00
* gomatrixserverlib . RespState , error ) { // nolint:unparam
2021-06-30 11:32:20 +00:00
t . haveEventsMutex . Lock ( )
defer t . haveEventsMutex . Unlock ( )
2020-05-05 14:48:37 +00:00
// create a RespState response using the response to /state_ids as a guide
2020-10-13 10:53:20 +00:00
respState := gomatrixserverlib . RespState { }
2020-05-05 14:48:37 +00:00
for i := range stateIDs . StateEventIDs {
2020-05-12 15:24:28 +00:00
ev , ok := t . haveEvents [ stateIDs . StateEventIDs [ i ] ]
2020-05-05 14:48:37 +00:00
if ! ok {
2020-10-13 10:53:20 +00:00
logrus . Warnf ( "Missing state event in createRespStateFromStateIDs: %s" , stateIDs . StateEventIDs [ i ] )
continue
2020-05-05 14:48:37 +00:00
}
2020-10-13 10:53:20 +00:00
respState . StateEvents = append ( respState . StateEvents , ev . Unwrap ( ) )
2020-05-05 14:48:37 +00:00
}
for i := range stateIDs . AuthEventIDs {
2020-05-12 15:24:28 +00:00
ev , ok := t . haveEvents [ stateIDs . AuthEventIDs [ i ] ]
2020-05-05 14:48:37 +00:00
if ! ok {
2020-10-13 10:53:20 +00:00
logrus . Warnf ( "Missing auth event in createRespStateFromStateIDs: %s" , stateIDs . AuthEventIDs [ i ] )
continue
2020-05-05 14:48:37 +00:00
}
2020-10-13 10:53:20 +00:00
respState . AuthEvents = append ( respState . AuthEvents , ev . Unwrap ( ) )
2020-05-05 14:48:37 +00:00
}
2020-05-12 15:24:28 +00:00
// We purposefully do not do auth checks on the returned events, as they will still
// be processed in the exact same way, just as a 'rejected' event
// TODO: Add a field to HeaderedEvent to indicate if the event is rejected.
return & respState , nil
}
2021-02-16 17:12:17 +00:00
func ( t * txnReq ) lookupEvent ( ctx context . Context , roomVersion gomatrixserverlib . RoomVersion , roomID , missingEventID string , localFirst bool ) ( * gomatrixserverlib . HeaderedEvent , error ) {
2020-05-12 15:24:28 +00:00
if localFirst {
// fetch from the roomserver
queryReq := api . QueryEventsByIDRequest {
EventIDs : [ ] string { missingEventID } ,
}
var queryRes api . QueryEventsByIDResponse
2020-09-07 11:32:40 +00:00
if err := t . rsAPI . QueryEventsByID ( ctx , & queryReq , & queryRes ) ; err != nil {
util . GetLogger ( ctx ) . Warnf ( "Failed to query roomserver for missing event %s: %s - falling back to remote" , missingEventID , err )
2020-05-12 15:24:28 +00:00
} else if len ( queryRes . Events ) == 1 {
2020-11-16 15:44:53 +00:00
return queryRes . Events [ 0 ] , nil
2020-05-12 15:24:28 +00:00
}
}
2020-11-16 15:44:53 +00:00
var event * gomatrixserverlib . Event
2020-10-13 10:53:20 +00:00
found := false
2021-06-30 11:05:58 +00:00
servers := t . getServers ( ctx , roomID , nil )
2020-10-13 10:53:20 +00:00
for _ , serverName := range servers {
txn , err := t . federation . GetEvent ( ctx , serverName , missingEventID )
if err != nil || len ( txn . PDUs ) == 0 {
util . GetLogger ( ctx ) . WithError ( err ) . WithField ( "event_id" , missingEventID ) . Warn ( "Failed to get missing /event for event ID" )
2021-06-30 09:39:47 +00:00
if errors . Is ( err , context . DeadlineExceeded ) {
break
}
2020-10-13 10:53:20 +00:00
continue
}
event , err = gomatrixserverlib . NewEventFromUntrustedJSON ( txn . PDUs [ 0 ] , roomVersion )
if err != nil {
util . GetLogger ( ctx ) . WithError ( err ) . WithField ( "event_id" , missingEventID ) . Warnf ( "Transaction: Failed to parse event JSON of event" )
continue
}
found = true
break
}
if ! found {
util . GetLogger ( ctx ) . WithField ( "event_id" , missingEventID ) . Warnf ( "Failed to get missing /event for event ID from %d server(s)" , len ( servers ) )
return nil , fmt . Errorf ( "wasn't able to find event via %d server(s)" , len ( servers ) )
2020-05-12 15:24:28 +00:00
}
2021-11-02 10:13:38 +00:00
if err := event . VerifyEventSignatures ( ctx , t . keys ) ; err != nil {
2020-09-07 11:32:40 +00:00
util . GetLogger ( ctx ) . WithError ( err ) . Warnf ( "Transaction: Couldn't validate signature of event %q" , event . EventID ( ) )
2020-05-12 15:24:28 +00:00
return nil , verifySigError { event . EventID ( ) , err }
}
2021-06-30 09:01:56 +00:00
return t . cacheAndReturn ( event . Headered ( roomVersion ) ) , nil
2017-06-07 13:32:53 +00:00
}