Update ACLs when received as outliers (#3008)

This should fix #3004 by making sure we also update our in-memory ACLs
after joining a new room.
Also makes use of more caching in `GetStateEvent`

Bonus: Adds some tests, as I was about to use `GetBulkStateContent`, but
turns out that `GetStateEvent` is basically doing the same, just that it
only gets the `eventTypeNID`/`eventStateKeyNID` once and not for every
call.
This commit is contained in:
Till 2023-11-22 15:38:04 +01:00 committed by GitHub
parent c4528b2de8
commit b8f91485b4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 155 additions and 63 deletions

View file

@ -125,7 +125,7 @@ func NewInternalAPI(
queues := queue.NewOutgoingQueues(
federationDB, processContext,
cfg.Matrix.DisableFederation,
cfg.Matrix.ServerName, federation, rsAPI, &stats,
cfg.Matrix.ServerName, federation, &stats,
signingInfo,
)

View file

@ -65,7 +65,7 @@ func TestFederationClientQueryKeys(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedapi := FederationInternalAPI{
@ -96,7 +96,7 @@ func TestFederationClientQueryKeysBlacklisted(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedapi := FederationInternalAPI{
@ -126,7 +126,7 @@ func TestFederationClientQueryKeysFailure(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedapi := FederationInternalAPI{
@ -156,7 +156,7 @@ func TestFederationClientClaimKeys(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedapi := FederationInternalAPI{
@ -187,7 +187,7 @@ func TestFederationClientClaimKeysBlacklisted(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedapi := FederationInternalAPI{

View file

@ -70,7 +70,7 @@ func TestPerformWakeupServers(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedAPI := NewFederationInternalAPI(
@ -116,7 +116,7 @@ func TestQueryRelayServers(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedAPI := NewFederationInternalAPI(
@ -157,7 +157,7 @@ func TestRemoveRelayServers(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedAPI := NewFederationInternalAPI(
@ -197,7 +197,7 @@ func TestPerformDirectoryLookup(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedAPI := NewFederationInternalAPI(
@ -236,7 +236,7 @@ func TestPerformDirectoryLookupRelaying(t *testing.T) {
queues := queue.NewOutgoingQueues(
testDB, process.NewProcessContext(),
false,
cfg.Matrix.ServerName, fedClient, nil, &stats,
cfg.Matrix.ServerName, fedClient, &stats,
nil,
)
fedAPI := NewFederationInternalAPI(

View file

@ -31,7 +31,6 @@ import (
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/process"
)
@ -53,7 +52,6 @@ type destinationQueue struct {
db storage.Database
process *process.ProcessContext
signing map[spec.ServerName]*fclient.SigningIdentity
rsAPI api.FederationRoomserverAPI
client fclient.FederationClient // federation client
origin spec.ServerName // origin of requests
destination spec.ServerName // destination of requests

View file

@ -27,12 +27,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
"github.com/matrix-org/dendrite/federationapi/storage/shared/receipt"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/process"
)
@ -43,7 +41,6 @@ type OutgoingQueues struct {
db storage.Database
process *process.ProcessContext
disabled bool
rsAPI api.FederationRoomserverAPI
origin spec.ServerName
client fclient.FederationClient
statistics *statistics.Statistics
@ -90,7 +87,6 @@ func NewOutgoingQueues(
disabled bool,
origin spec.ServerName,
client fclient.FederationClient,
rsAPI api.FederationRoomserverAPI,
statistics *statistics.Statistics,
signing []*fclient.SigningIdentity,
) *OutgoingQueues {
@ -98,7 +94,6 @@ func NewOutgoingQueues(
disabled: disabled,
process: process,
db: db,
rsAPI: rsAPI,
origin: origin,
client: client,
statistics: statistics,
@ -162,7 +157,6 @@ func (oqs *OutgoingQueues) getQueue(destination spec.ServerName) *destinationQue
queues: oqs,
db: oqs.db,
process: oqs.process,
rsAPI: oqs.rsAPI,
origin: oqs.origin,
destination: destination,
client: oqs.client,
@ -213,18 +207,6 @@ func (oqs *OutgoingQueues) SendEvent(
delete(destmap, local)
}
// Check if any of the destinations are prohibited by server ACLs.
for destination := range destmap {
if api.IsServerBannedFromRoom(
oqs.process.Context(),
oqs.rsAPI,
ev.RoomID().String(),
destination,
) {
delete(destmap, destination)
}
}
// If there are no remaining destinations then give up.
if len(destmap) == 0 {
return nil
@ -303,24 +285,6 @@ func (oqs *OutgoingQueues) SendEDU(
delete(destmap, local)
}
// There is absolutely no guarantee that the EDU will have a room_id
// field, as it is not required by the spec. However, if it *does*
// (e.g. typing notifications) then we should try to make sure we don't
// bother sending them to servers that are prohibited by the server
// ACLs.
if result := gjson.GetBytes(e.Content, "room_id"); result.Exists() {
for destination := range destmap {
if api.IsServerBannedFromRoom(
oqs.process.Context(),
oqs.rsAPI,
result.Str,
destination,
) {
delete(destmap, destination)
}
}
}
// If there are no remaining destinations then give up.
if len(destmap) == 0 {
return nil

View file

@ -34,7 +34,6 @@ import (
"github.com/matrix-org/dendrite/federationapi/statistics"
"github.com/matrix-org/dendrite/federationapi/storage"
rsapi "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup/config"
"github.com/matrix-org/dendrite/setup/process"
@ -65,15 +64,6 @@ func mustCreateFederationDatabase(t *testing.T, dbType test.DBType, realDatabase
}
}
type stubFederationRoomServerAPI struct {
rsapi.FederationRoomserverAPI
}
func (r *stubFederationRoomServerAPI) QueryServerBannedFromRoom(ctx context.Context, req *rsapi.QueryServerBannedFromRoomRequest, res *rsapi.QueryServerBannedFromRoomResponse) error {
res.Banned = false
return nil
}
type stubFederationClient struct {
fclient.FederationClient
shouldTxSucceed bool
@ -126,7 +116,6 @@ func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32
txCount: *atomic.NewUint32(0),
txRelayCount: *atomic.NewUint32(0),
}
rs := &stubFederationRoomServerAPI{}
stats := statistics.NewStatistics(db, failuresUntilBlacklist, failuresUntilAssumedOffline)
signingInfo := []*fclient.SigningIdentity{
@ -136,7 +125,7 @@ func testSetup(failuresUntilBlacklist uint32, failuresUntilAssumedOffline uint32
ServerName: "localhost",
},
}
queues := NewOutgoingQueues(db, processContext, false, "localhost", fc, rs, &stats, signingInfo)
queues := NewOutgoingQueues(db, processContext, false, "localhost", fc, &stats, signingInfo)
return db, fc, queues, processContext, close
}