Merge branch 'master' into add-nats-support

This commit is contained in:
Neil Alexander 2021-11-16 12:35:50 +00:00
commit aac7e4187e
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
12 changed files with 115 additions and 24 deletions

View file

@ -1,5 +1,26 @@
# Changelog # Changelog
## Dendrite 0.5.1 (2021-11-16)
### Features
* Experimental (although incomplete) support for joining version 8 and 9 rooms
* State resolution v2 optimisations (close to 20% speed improvement thanks to reduced allocations)
* Optimisations made to the federation `/send` endpoint which avoids duplicate work, reduces CPU usage and smooths out incoming federation
* The sync API now consumes less CPU when generating sync responses (optimised `SelectStateInRange`)
* Support for serving the `.well-known/matrix/server` endpoint from within Dendrite itself (contributed by [twentybit](https://github.com/twentybit))
* Support for thumbnailing WebP media (contributed by [hacktivista](https://github.com/hacktivista))
### Fixes
* The `/publicRooms` handler now handles `POST` requests in addition to `GET` correctly
* Only valid canonical aliases will be returned in the `/publicRooms` response
* The media API now correctly handles `max_file_size_bytes` being configured to `0` (contributed by [database64128](https://github.com/database64128))
* Unverifiable auth events in `/send_join` responses no longer result in a panic
* Build issues on Windows are now resolved (contributed by [S7evinK](https://github.com/S7evinK))
* The default power levels in a room now set the invite level to 50, as per the spec
* A panic has been fixed when malformed messages are received in the key change consumers
## Dendrite 0.5.0 (2021-08-24) ## Dendrite 0.5.0 (2021-08-24)
### Features ### Features

View file

@ -148,7 +148,8 @@ type inputWorker struct {
input *sendFIFOQueue input *sendFIFOQueue
} }
var inputWorkers sync.Map // room ID -> *inputWorker var inFlightTxnsPerOrigin sync.Map // transaction ID -> chan util.JSONResponse
var inputWorkers sync.Map // room ID -> *inputWorker
// Send implements /_matrix/federation/v1/send/{txnID} // Send implements /_matrix/federation/v1/send/{txnID}
func Send( func Send(
@ -164,6 +165,37 @@ func Send(
mu *internal.MutexByRoom, mu *internal.MutexByRoom,
servers federationAPI.ServersInRoomProvider, servers federationAPI.ServersInRoomProvider,
) util.JSONResponse { ) util.JSONResponse {
// First we should check if this origin has already submitted this
// txn ID to us. If they have and the txnIDs map contains an entry,
// the transaction is still being worked on. The new client can wait
// for it to complete rather than creating more work.
index := string(request.Origin()) + "\000" + string(txnID)
v, ok := inFlightTxnsPerOrigin.LoadOrStore(index, make(chan util.JSONResponse, 1))
ch := v.(chan util.JSONResponse)
if ok {
// This origin already submitted this txn ID to us, and the work
// is still taking place, so we'll just wait for it to finish.
ctx, cancel := context.WithTimeout(httpReq.Context(), time.Minute*5)
defer cancel()
select {
case <-ctx.Done():
// If the caller gives up then return straight away. We don't
// want to attempt to process what they sent us any further.
return util.JSONResponse{Code: http.StatusRequestTimeout}
case res := <-ch:
// The original task just finished processing so let's return
// the result of it.
if res.Code == 0 {
return util.JSONResponse{Code: http.StatusAccepted}
}
return res
}
}
// Otherwise, store that we're currently working on this txn from
// this origin. When we're done processing, close the channel.
defer close(ch)
defer inFlightTxnsPerOrigin.Delete(index)
t := txnReq{ t := txnReq{
rsAPI: rsAPI, rsAPI: rsAPI,
eduAPI: eduAPI, eduAPI: eduAPI,
@ -205,7 +237,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs)) util.GetLogger(httpReq.Context()).Infof("Received transaction %q from %q containing %d PDUs, %d EDUs", txnID, request.Origin(), len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction(httpReq.Context()) resp, jsonErr := t.processTransaction(context.Background())
if jsonErr != nil { if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed") util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr return *jsonErr
@ -215,10 +247,12 @@ func Send(
// Status code 200: // Status code 200:
// The result of processing the transaction. The server is to use this response // The result of processing the transaction. The server is to use this response
// even in the event of one or more PDUs failing to be processed. // even in the event of one or more PDUs failing to be processed.
return util.JSONResponse{ res := util.JSONResponse{
Code: http.StatusOK, Code: http.StatusOK,
JSON: resp, JSON: resp,
} }
ch <- res
return res
} }
type txnReq struct { type txnReq struct {

View file

@ -85,6 +85,11 @@ func (t *KeyChangeConsumer) onMessage(msg *sarama.ConsumerMessage) error {
logrus.WithError(err).Errorf("failed to read device message from key change topic") logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil return nil
} }
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type { switch m.Type {
case api.TypeCrossSigningUpdate: case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m) return t.onCrossSigningMessage(m)

4
go.mod
View file

@ -36,8 +36,8 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1 github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.8 github.com/mattn/go-sqlite3 v1.14.8
github.com/morikuni/aec v1.0.0 // indirect github.com/morikuni/aec v1.0.0 // indirect

8
go.sum
View file

@ -987,10 +987,10 @@ github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d/go.mod h1
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0= github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5dLDCud4r0r55eP4j9FuUNpl60Gmntcop4=
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s= github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1 h1:Pv7+98sreiHltpamJ4em6RCX/WPVN1wl53Gli9Cz744= github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 h1:RFsBN3509Ql6NJ7TDVkcKoN3bb/tmqUqzur5c0AwIHQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211102131912-13366e7985e1/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc= github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89 h1:6JkIymZ1vxfI0shSpg6gNPTJaF4/95Evy34slPVZGKM= github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d h1:V1b6GZVvL95qTkjYSEWH9Pja6c0WcJKBt2MlAILlw+Q=
github.com/matrix-org/pinecone v0.0.0-20211022090602-08a50945ac89/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk= github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U= github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -17,7 +17,7 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 5 VersionMinor = 5
VersionPatch = 0 VersionPatch = 1
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -78,6 +78,11 @@ func (t *OutputCrossSigningKeyUpdateConsumer) onMessage(msg *sarama.ConsumerMess
logrus.WithError(err).Errorf("failed to read device message from key change topic") logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil return nil
} }
if m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type { switch m.Type {
case api.TypeCrossSigningUpdate: case api.TypeCrossSigningUpdate:
return t.onCrossSigningMessage(m) return t.onCrossSigningMessage(m)

View file

@ -778,7 +778,8 @@ func (v *StateResolution) resolveConflictsV2(
ctx context.Context, ctx context.Context,
notConflicted, conflicted []types.StateEntry, notConflicted, conflicted []types.StateEntry,
) ([]types.StateEntry, error) { ) ([]types.StateEntry, error) {
eventIDMap := make(map[string]types.StateEntry) estimate := len(conflicted) + len(notConflicted)
eventIDMap := make(map[string]types.StateEntry, estimate)
// Load the conflicted events // Load the conflicted events
conflictedEvents, conflictedEventMap, err := v.loadStateEvents(ctx, conflicted) conflictedEvents, conflictedEventMap, err := v.loadStateEvents(ctx, conflicted)
@ -800,18 +801,20 @@ func (v *StateResolution) resolveConflictsV2(
// For each conflicted event, we will add a new set of auth events. Auth // For each conflicted event, we will add a new set of auth events. Auth
// events may be duplicated across these sets but that's OK. // events may be duplicated across these sets but that's OK.
authSets := make(map[string][]*gomatrixserverlib.Event) authSets := make(map[string][]*gomatrixserverlib.Event, len(conflicted))
var authEvents []*gomatrixserverlib.Event authEvents := make([]*gomatrixserverlib.Event, 0, estimate*3)
var authDifference []*gomatrixserverlib.Event authDifference := make([]*gomatrixserverlib.Event, 0, estimate)
// For each conflicted event, let's try and get the needed auth events. // For each conflicted event, let's try and get the needed auth events.
neededStateKeys := make([]string, 16)
authEntries := make([]types.StateEntry, 16)
for _, conflictedEvent := range conflictedEvents { for _, conflictedEvent := range conflictedEvents {
// Work out which auth events we need to load. // Work out which auth events we need to load.
key := conflictedEvent.EventID() key := conflictedEvent.EventID()
needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent}) needed := gomatrixserverlib.StateNeededForAuth([]*gomatrixserverlib.Event{conflictedEvent})
// Find the numeric IDs for the necessary state keys. // Find the numeric IDs for the necessary state keys.
var neededStateKeys []string neededStateKeys = neededStateKeys[:0]
neededStateKeys = append(neededStateKeys, needed.Member...) neededStateKeys = append(neededStateKeys, needed.Member...)
neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...) neededStateKeys = append(neededStateKeys, needed.ThirdPartyInvite...)
stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys) stateKeyNIDMap, err := v.db.EventStateKeyNIDs(ctx, neededStateKeys)
@ -821,7 +824,7 @@ func (v *StateResolution) resolveConflictsV2(
// Load the necessary auth events. // Load the necessary auth events.
tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed) tuplesNeeded := v.stateKeyTuplesNeeded(stateKeyNIDMap, needed)
var authEntries []types.StateEntry authEntries = authEntries[:0]
for _, tuple := range tuplesNeeded { for _, tuple := range tuplesNeeded {
if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok { if eventNID, ok := stateEntryMap(notConflicted).lookup(tuple); ok {
authEntries = append(authEntries, types.StateEntry{ authEntries = append(authEntries, types.StateEntry{

View file

@ -109,6 +109,11 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
logrus.WithError(err).Errorf("failed to read device message from key change topic") logrus.WithError(err).Errorf("failed to read device message from key change topic")
return nil return nil
} }
if m.DeviceKeys == nil && m.OutputCrossSigningKeyUpdate == nil {
// This probably shouldn't happen but stops us from panicking if we come
// across an update that doesn't satisfy either types.
return nil
}
switch m.Type { switch m.Type {
case api.TypeCrossSigningUpdate: case api.TypeCrossSigningUpdate:
return s.onCrossSigningMessage(m, msg.Offset, msg.Partition) return s.onCrossSigningMessage(m, msg.Offset, msg.Partition)

View file

@ -116,7 +116,7 @@ const updateEventJSONSQL = "" +
// In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id). // In order for us to apply the state updates correctly, rows need to be ordered in the order they were received (id).
const selectStateInRangeSQL = "" + const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" + " WHERE (id > $1 AND id <= $2) AND (add_state_ids IS NOT NULL OR remove_state_ids IS NOT NULL)" +
" AND ( $3::text[] IS NULL OR sender = ANY($3) )" + " AND ( $3::text[] IS NULL OR sender = ANY($3) )" +
@ -221,13 +221,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for rows.Next() { for rows.Next() {
var ( var (
eventID string
streamPos types.StreamPosition streamPos types.StreamPosition
eventBytes []byte eventBytes []byte
excludeFromSync bool excludeFromSync bool
addIDs pq.StringArray addIDs pq.StringArray
delIDs pq.StringArray delIDs pq.StringArray
) )
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil { if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDs, &delIDs); err != nil {
return nil, nil, err return nil, nil, err
} }
// Sanity check for deleted state and whine if we see it. We don't need to do anything // Sanity check for deleted state and whine if we see it. We don't need to do anything
@ -243,7 +244,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// TODO: Handle redacted events // TODO: Handle redacted events
var ev gomatrixserverlib.HeaderedEvent var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil { if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, nil, err return nil, nil, err
} }
needSet := stateNeeded[ev.RoomID()] needSet := stateNeeded[ev.RoomID()]
@ -258,7 +259,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
} }
stateNeeded[ev.RoomID()] = needSet stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{ eventIDToEvent[eventID] = types.StreamEvent{
HeaderedEvent: &ev, HeaderedEvent: &ev,
StreamPosition: streamPos, StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,

View file

@ -81,7 +81,7 @@ const updateEventJSONSQL = "" +
"UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2" "UPDATE syncapi_output_room_events SET headered_event_json=$1 WHERE event_id=$2"
const selectStateInRangeSQL = "" + const selectStateInRangeSQL = "" +
"SELECT id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" + "SELECT event_id, id, headered_event_json, exclude_from_sync, add_state_ids, remove_state_ids" +
" FROM syncapi_output_room_events" + " FROM syncapi_output_room_events" +
" WHERE (id > $1 AND id <= $2)" + " WHERE (id > $1 AND id <= $2)" +
" AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))" " AND ((add_state_ids IS NOT NULL AND add_state_ids != '') OR (remove_state_ids IS NOT NULL AND remove_state_ids != ''))"
@ -173,13 +173,14 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
for rows.Next() { for rows.Next() {
var ( var (
eventID string
streamPos types.StreamPosition streamPos types.StreamPosition
eventBytes []byte eventBytes []byte
excludeFromSync bool excludeFromSync bool
addIDsJSON string addIDsJSON string
delIDsJSON string delIDsJSON string
) )
if err := rows.Scan(&streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil { if err := rows.Scan(&eventID, &streamPos, &eventBytes, &excludeFromSync, &addIDsJSON, &delIDsJSON); err != nil {
return nil, nil, err return nil, nil, err
} }
@ -201,7 +202,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
// TODO: Handle redacted events // TODO: Handle redacted events
var ev gomatrixserverlib.HeaderedEvent var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil { if err := ev.UnmarshalJSONWithEventID(eventBytes, eventID); err != nil {
return nil, nil, err return nil, nil, err
} }
needSet := stateNeeded[ev.RoomID()] needSet := stateNeeded[ev.RoomID()]
@ -216,7 +217,7 @@ func (s *outputRoomEventsStatements) SelectStateInRange(
} }
stateNeeded[ev.RoomID()] = needSet stateNeeded[ev.RoomID()] = needSet
eventIDToEvent[ev.EventID()] = types.StreamEvent{ eventIDToEvent[eventID] = types.StreamEvent{
HeaderedEvent: &ev, HeaderedEvent: &ev,
StreamPosition: streamPos, StreamPosition: streamPos,
ExcludeFromSync: excludeFromSync, ExcludeFromSync: excludeFromSync,

View file

@ -571,3 +571,19 @@ A prev_batch token from incremental sync can be used in the v1 messages API
Inbound federation rejects invites which are not signed by the sender Inbound federation rejects invites which are not signed by the sender
Invited user can reject invite over federation several times Invited user can reject invite over federation several times
Test that we can be reinvited to a room we created Test that we can be reinvited to a room we created
User can create and send/receive messages in a room with version 8
local user can join room with version 8
User can invite local user to room with version 8
remote user can join room with version 8
User can invite remote user to room with version 8
Remote user can backfill in a room with version 8
Can reject invites over federation for rooms with version 8
Can receive redactions from regular users over federation in room version 8
User can create and send/receive messages in a room with version 9
local user can join room with version 9
User can invite local user to room with version 9
remote user can join room with version 9
User can invite remote user to room with version 9
Remote user can backfill in a room with version 9
Can reject invites over federation for rooms with version 9
Can receive redactions from regular users over federation in room version 9