Merge branch 'master' into matthew/peeking

This commit is contained in:
Neil Alexander 2020-09-08 14:07:06 +01:00
commit bcbe6512a4
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
66 changed files with 363 additions and 1960 deletions

View file

@ -133,17 +133,6 @@ client_api:
turn_username: ""
turn_password: ""
# Configuration for the Current State Server.
current_state_server:
internal_api:
listen: http://0.0.0.0:7782
connect: http://current_state_server:7782
database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_currentstate?sslmode=disable
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for the EDU server.
edu_server:
internal_api:

View file

@ -43,17 +43,6 @@ services:
networks:
- internal
current_state_server:
hostname: current_state_server
image: matrixdotorg/dendrite:currentstateserver
command: [
"--config=dendrite.yaml"
]
volumes:
- ./config:/etc/dendrite
networks:
- internal
sync_api:
hostname: sync_api
image: matrixdotorg/dendrite:syncapi

View file

@ -15,7 +15,6 @@ docker build -t matrixdotorg/dendrite:federationsender --build-arg component=de
docker build -t matrixdotorg/dendrite:federationproxy --build-arg component=federation-api-proxy -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:keyserver --build-arg component=dendrite-key-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:mediaapi --build-arg component=dendrite-media-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:currentstateserver --build-arg component=dendrite-current-state-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:roomserver --build-arg component=dendrite-room-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:syncapi --build-arg component=dendrite-sync-api-server -f build/docker/Dockerfile.component .
docker build -t matrixdotorg/dendrite:serverkeyapi --build-arg component=dendrite-server-key-api-server -f build/docker/Dockerfile.component .

View file

@ -11,7 +11,6 @@ docker pull matrixdotorg/dendrite:federationsender
docker pull matrixdotorg/dendrite:federationproxy
docker pull matrixdotorg/dendrite:keyserver
docker pull matrixdotorg/dendrite:mediaapi
docker pull matrixdotorg/dendrite:currentstateserver
docker pull matrixdotorg/dendrite:roomserver
docker pull matrixdotorg/dendrite:syncapi
docker pull matrixdotorg/dendrite:userapi

View file

@ -11,7 +11,6 @@ docker push matrixdotorg/dendrite:federationsender
docker push matrixdotorg/dendrite:federationproxy
docker push matrixdotorg/dendrite:keyserver
docker push matrixdotorg/dendrite:mediaapi
docker push matrixdotorg/dendrite:currentstateserver
docker push matrixdotorg/dendrite:roomserver
docker push matrixdotorg/dendrite:syncapi
docker push matrixdotorg/dendrite:serverkeyapi

View file

@ -1,5 +1,5 @@
#!/bin/bash
for db in account device mediaapi syncapi roomserver serverkey keyserver federationsender currentstate appservice e2ekey naffka; do
for db in account device mediaapi syncapi roomserver serverkey keyserver federationsender appservice e2ekey naffka; do
createdb -U dendrite -O dendrite dendrite_$db
done

View file

@ -13,7 +13,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@ -99,7 +98,6 @@ func (m *DendriteMonolith) Start() {
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-keyserver.db", m.StorageDirectory))
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-federationsender.db", m.StorageDirectory))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-appservice.db", m.StorageDirectory))
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-currentstate.db", m.StorageDirectory))
cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
cfg.FederationSender.FederationMaxRetries = 8
@ -128,7 +126,6 @@ func (m *DendriteMonolith) Start() {
)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
)
@ -163,7 +160,6 @@ func (m *DendriteMonolith) Start() {
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
StateAPI: stateAPI,
KeyAPI: keyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,

View file

@ -21,7 +21,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/producers"
"github.com/matrix-org/dendrite/clientapi/routing"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
@ -43,7 +42,6 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI,
eduInputAPI eduServerAPI.EDUServerInputAPI,
asAPI appserviceAPI.AppServiceQueryAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
transactionsCache *transactions.Cache,
fsAPI federationSenderAPI.FederationSenderInternalAPI,
userAPI userapi.UserInternalAPI,
@ -58,6 +56,6 @@ func AddPublicRoutes(
routing.Setup(
router, cfg, eduInputAPI, rsAPI, asAPI,
accountsDB, userAPI, federation,
syncProducer, transactionsCache, fsAPI, stateAPI, keyAPI, extRoomsProvider,
syncProducer, transactionsCache, fsAPI, keyAPI, extRoomsProvider,
)
}

View file

@ -20,7 +20,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -270,7 +269,7 @@ func GetVisibility(
// SetVisibility implements PUT /directory/list/room/{roomID}
// TODO: Allow admin users to edit the room visibility
func SetVisibility(
req *http.Request, stateAPI currentstateAPI.CurrentStateInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI, dev *userapi.Device,
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, dev *userapi.Device,
roomID string,
) util.JSONResponse {
resErr := checkMemberInRoom(req.Context(), rsAPI, dev.UserID, roomID)

View file

@ -26,7 +26,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/api"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
@ -51,7 +50,7 @@ type filter struct {
// GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
federation *gomatrixserverlib.FederationClient,
cfg *config.ClientAPI,
@ -75,7 +74,7 @@ func GetPostPublicRooms(
}
}
response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider)
response, err := publicRooms(req.Context(), request, rsAPI, extRoomsProvider)
if err != nil {
util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms")
return jsonerror.InternalServerError()
@ -86,8 +85,8 @@ func GetPostPublicRooms(
}
}
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
func publicRooms(
ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
) (*gomatrixserverlib.RespPublicRooms, error) {
response := gomatrixserverlib.RespPublicRooms{
@ -110,7 +109,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
var rooms []gomatrixserverlib.PublicRoom
if request.Since == "" {
rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI)
rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider)
} else {
rooms = getPublicRoomsFromCache()
}
@ -226,7 +225,6 @@ func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (
func refreshPublicRoomCache(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider,
stateAPI currentstateAPI.CurrentStateInternalAPI,
) []gomatrixserverlib.PublicRoom {
cacheMu.Lock()
defer cacheMu.Unlock()
@ -241,7 +239,7 @@ func refreshPublicRoomCache(
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return publicRoomsCache
}
pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI)
pubRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, rsAPI)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return publicRoomsCache

View file

@ -23,7 +23,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth/authtypes"
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -94,7 +93,7 @@ func GetAvatarURL(
// SetAvatarURL implements PUT /profile/{userID}/avatar_url
// nolint:gocyclo
func SetAvatarURL(
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
req *http.Request, accountDB accounts.Database,
device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {
@ -212,7 +211,7 @@ func GetDisplayName(
// SetDisplayName implements PUT /profile/{userID}/displayname
// nolint:gocyclo
func SetDisplayName(
req *http.Request, accountDB accounts.Database, stateAPI currentstateAPI.CurrentStateInternalAPI,
req *http.Request, accountDB accounts.Database,
device *userapi.Device, userID string, cfg *config.ClientAPI, rsAPI api.RoomserverInternalAPI,
) util.JSONResponse {
if userID != device.UserID {

View file

@ -25,7 +25,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/auth"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/producers"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
@ -56,7 +55,6 @@ func Setup(
syncProducer *producers.SyncAPIProducer,
transactionsCache *transactions.Cache,
federationSender federationSenderAPI.FederationSenderInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
extRoomsProvider api.ExtraPublicRoomsProvider,
) {
@ -342,12 +340,12 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SetVisibility(req, stateAPI, rsAPI, device, vars["roomID"])
return SetVisibility(req, rsAPI, device, vars["roomID"])
}),
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/publicRooms",
httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider, federation, cfg)
return GetPostPublicRooms(req, rsAPI, extRoomsProvider, federation, cfg)
}),
).Methods(http.MethodGet, http.MethodPost, http.MethodOptions)
@ -505,7 +503,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SetAvatarURL(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
return SetAvatarURL(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows
@ -530,7 +528,7 @@ func Setup(
if err != nil {
return util.ErrorResponse(err)
}
return SetDisplayName(req, accountDB, stateAPI, device, vars["userID"], cfg, rsAPI)
return SetDisplayName(req, accountDB, device, vars["userID"], cfg, rsAPI)
}),
).Methods(http.MethodPut, http.MethodOptions)
// Browsers use the OPTIONS HTTP method to check if the CORS policy allows

View file

@ -34,12 +34,11 @@ func main() {
fsAPI := base.FederationSenderHTTPClient()
eduInputAPI := base.EDUServerClient()
userAPI := base.UserAPIClient()
stateAPI := base.CurrentStateAPIClient()
keyAPI := base.KeyServerHTTPClient()
clientapi.AddPublicRoutes(
base.PublicClientAPIMux, &base.Cfg.ClientAPI, base.KafkaProducer, accountDB, federation,
rsAPI, eduInputAPI, asQuery, stateAPI, transactions.New(), fsAPI, userAPI, keyAPI, nil,
rsAPI, eduInputAPI, asQuery, transactions.New(), fsAPI, userAPI, keyAPI, nil,
)
base.SetupAndServeHTTP(

View file

@ -1,36 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/internal/setup"
)
func main() {
cfg := setup.ParseFlags(false)
base := setup.NewBaseDendrite(cfg, "CurrentStateServer", true)
defer base.Close() // nolint: errcheck
stateAPI := currentstateserver.NewInternalAPI(&cfg.CurrentStateServer, base.KafkaConsumer)
currentstateserver.AddInternalRoutes(base.InternalAPIMux, stateAPI)
base.SetupAndServeHTTP(
base.Cfg.CurrentStateServer.InternalAPI.Listen,
setup.NoExternalListener,
nil, nil,
)
}

View file

@ -29,7 +29,6 @@ import (
p2pdisc "github.com/libp2p/go-libp2p/p2p/discovery"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/embed"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/federationsender"
"github.com/matrix-org/dendrite/internal/config"
@ -129,7 +128,6 @@ func main() {
cfg.ServerKeyAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-serverkey.db", *instanceName))
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-e2ekey.db", *instanceName))
if err = cfg.Derive(); err != nil {
@ -153,7 +151,6 @@ func main() {
base, serverKeyAPI,
)
stateAPI := currentstateserver.NewInternalAPI(&base.Base.Cfg.CurrentStateServer, base.Base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI(
&base.Base, keyRing,
)
@ -165,7 +162,7 @@ func main() {
&base.Base, federation, rsAPI, keyRing,
)
rsAPI.SetFederationSenderAPI(fsAPI)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
err = provider.Start()
if err != nil {
panic("failed to create new public rooms provider: " + err.Error())
@ -185,7 +182,6 @@ func main() {
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
ExtPublicRoomsProvider: provider,

View file

@ -22,7 +22,6 @@ import (
"sync/atomic"
"time"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -46,15 +45,13 @@ type publicRoomsProvider struct {
maintenanceTimer *time.Timer //
roomsAdvertised atomic.Value // stores int
rsAPI roomserverAPI.RoomserverInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
}
func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) *publicRoomsProvider {
func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI) *publicRoomsProvider {
return &publicRoomsProvider{
foundRooms: make(map[string]discoveredRoom),
pubsub: ps,
rsAPI: rsAPI,
stateAPI: stateAPI,
}
}
@ -106,7 +103,7 @@ func (p *publicRoomsProvider) AdvertiseRooms() error {
util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed")
return err
}
ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI)
ourRooms, err := roomserverAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.rsAPI)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed")
return err

View file

@ -29,7 +29,6 @@ import (
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/signing"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggconn"
"github.com/matrix-org/dendrite/cmd/dendrite-demo-yggdrasil/yggrooms"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@ -84,7 +83,6 @@ func main() {
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-keyserver.db", *instanceName))
cfg.FederationSender.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-federationsender.db", *instanceName))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-appservice.db", *instanceName))
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-currentstate.db", *instanceName))
cfg.Global.Kafka.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s-naffka.db", *instanceName))
if err = cfg.Derive(); err != nil {
panic(err)
@ -113,7 +111,6 @@ func main() {
)
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
)
@ -146,7 +143,6 @@ func main() {
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
UserAPI: userAPI,
StateAPI: stateAPI,
KeyAPI: keyAPI,
ExtPublicRoomsProvider: yggrooms.NewYggdrasilRoomProvider(
ygg, fsAPI, federation,

View file

@ -35,7 +35,7 @@ func main() {
federationapi.AddPublicRoutes(
base.PublicFederationAPIMux, base.PublicKeyAPIMux,
&base.Cfg.FederationAPI, userAPI, federation, keyRing,
rsAPI, fsAPI, base.EDUServerClient(), base.CurrentStateAPIClient(), keyAPI,
rsAPI, fsAPI, base.EDUServerClient(), keyAPI,
)
base.SetupAndServeHTTP(

View file

@ -19,7 +19,6 @@ import (
"os"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@ -54,7 +53,6 @@ func main() {
// itself.
cfg.AppServiceAPI.InternalAPI.Connect = httpAddr
cfg.ClientAPI.InternalAPI.Connect = httpAddr
cfg.CurrentStateServer.InternalAPI.Connect = httpAddr
cfg.EDUServer.InternalAPI.Connect = httpAddr
cfg.FederationAPI.InternalAPI.Connect = httpAddr
cfg.FederationSender.InternalAPI.Connect = httpAddr
@ -95,8 +93,6 @@ func main() {
}
}
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
fsAPI := federationsender.NewInternalAPI(
base, federation, rsAPI, keyRing,
)
@ -140,7 +136,6 @@ func main() {
FederationSenderAPI: fsAPI,
RoomserverAPI: rsAPI,
ServerKeyAPI: serverKeyAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
}

View file

@ -31,7 +31,7 @@ func main() {
syncapi.AddPublicRoutes(
base.PublicClientAPIMux, base.KafkaConsumer, userAPI, rsAPI,
base.KeyServerHTTPClient(), base.CurrentStateAPIClient(),
base.KeyServerHTTPClient(),
federation, &cfg.SyncAPI,
)

View file

@ -23,7 +23,6 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/appservice"
"github.com/matrix-org/dendrite/currentstateserver"
"github.com/matrix-org/dendrite/eduserver"
"github.com/matrix-org/dendrite/eduserver/cache"
"github.com/matrix-org/dendrite/federationsender"
@ -171,7 +170,6 @@ func main() {
cfg.RoomServer.Database.ConnectionString = "file:/idb/dendritejs_roomserver.db"
cfg.ServerKeyAPI.Database.ConnectionString = "file:/idb/dendritejs_serverkey.db"
cfg.SyncAPI.Database.ConnectionString = "file:/idb/dendritejs_syncapi.db"
cfg.CurrentStateServer.Database.ConnectionString = "file:/idb/dendritejs_currentstate.db"
cfg.KeyServer.Database.ConnectionString = "file:/idb/dendritejs_e2ekey.db"
cfg.Global.Kafka.UseNaffka = true
cfg.Global.Kafka.Database.ConnectionString = "file:/idb/dendritejs_naffka.db"
@ -204,7 +202,6 @@ func main() {
KeyDatabase: fetcher,
}
stateAPI := currentstateserver.NewInternalAPI(&base.Cfg.CurrentStateServer, base.KafkaConsumer)
rsAPI := roomserver.NewInternalAPI(base, keyRing)
eduInputAPI := eduserver.NewInternalAPI(base, cache.New(), userAPI)
asQuery := appservice.NewInternalAPI(
@ -227,7 +224,6 @@ func main() {
EDUInternalAPI: eduInputAPI,
FederationSenderAPI: fedSenderAPI,
RoomserverAPI: rsAPI,
StateAPI: stateAPI,
UserAPI: userAPI,
KeyAPI: keyAPI,
//ServerKeyAPI: serverKeyAPI,

View file

@ -1,50 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
)
type CurrentStateInternalAPI interface {
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
QueryBulkStateContent(ctx context.Context, req *QueryBulkStateContentRequest, res *QueryBulkStateContentResponse) error
}
type QueryBulkStateContentRequest struct {
// Returns state events in these rooms
RoomIDs []string
// If true, treats the '*' StateKey as "all state events of this type" rather than a literal value of '*'
AllowWildcards bool
// The state events to return. Only a small subset of tuples are allowed in this request as only certain events
// have their content fields extracted. Specifically, the tuple Type must be one of:
// m.room.avatar
// m.room.create
// m.room.canonical_alias
// m.room.guest_access
// m.room.history_visibility
// m.room.join_rules
// m.room.member
// m.room.name
// m.room.topic
// Any other tuple type will result in the query failing.
StateTuples []gomatrixserverlib.StateKeyTuple
}
type QueryBulkStateContentResponse struct {
// map of room ID -> tuple -> content_value
Rooms map[string]map[gomatrixserverlib.StateKeyTuple]string
}

View file

@ -1,89 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"context"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
)
// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the
// published room directory.
// due to lots of switches
// nolint:gocyclo
func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""}
guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""}
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{
nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple,
{EventType: gomatrixserverlib.MRoomMember, StateKey: "*"},
},
}, &stateRes)
if err != nil {
util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed")
return nil, err
}
chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs))
i := 0
for roomID, data := range stateRes.Rooms {
pub := gomatrixserverlib.PublicRoom{
RoomID: roomID,
}
joinCount := 0
var joinRule, guestAccess string
for tuple, contentVal := range data {
if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" {
joinCount++
continue
}
switch tuple {
case avatarTuple:
pub.AvatarURL = contentVal
case nameTuple:
pub.Name = contentVal
case topicTuple:
pub.Topic = contentVal
case canonicalTuple:
pub.CanonicalAlias = contentVal
case visibilityTuple:
pub.WorldReadable = contentVal == "world_readable"
// need both of these to determine whether guests can join
case joinRuleTuple:
joinRule = contentVal
case guestTuple:
guestAccess = contentVal
}
}
if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" {
pub.GuestCanJoin = true
}
pub.JoinedMembersCount = joinCount
chunk[i] = pub
i++
}
return chunk, nil
}

View file

@ -1,147 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consumers
import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/matrix-org/dendrite/currentstateserver/storage"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
"github.com/matrix-org/gomatrixserverlib"
log "github.com/sirupsen/logrus"
)
type OutputRoomEventConsumer struct {
rsConsumer *internal.ContinualConsumer
db storage.Database
}
func NewOutputRoomEventConsumer(topicName string, kafkaConsumer sarama.Consumer, store storage.Database) *OutputRoomEventConsumer {
consumer := &internal.ContinualConsumer{
ComponentName: "currentstateserver/roomserver",
Topic: topicName,
Consumer: kafkaConsumer,
PartitionStore: store,
}
s := &OutputRoomEventConsumer{
rsConsumer: consumer,
db: store,
}
consumer.ProcessMessage = s.onMessage
return s
}
func (c *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error {
// Parse out the event JSON
var output api.OutputEvent
if err := json.Unmarshal(msg.Value, &output); err != nil {
// If the message was invalid, log it and move on to the next message in the stream
log.WithError(err).Errorf("roomserver output log: message parse failure")
return nil
}
switch output.Type {
case api.OutputTypeNewRoomEvent:
return c.onNewRoomEvent(context.TODO(), *output.NewRoomEvent)
case api.OutputTypeNewInviteEvent:
case api.OutputTypeRetireInviteEvent:
case api.OutputTypeRedactedEvent:
return c.onRedactEvent(context.Background(), *output.RedactedEvent)
default:
log.WithField("type", output.Type).Debug(
"roomserver output log: ignoring unknown output type",
)
}
return nil
}
func (c *OutputRoomEventConsumer) onNewRoomEvent(
ctx context.Context, msg api.OutputNewRoomEvent,
) error {
ev := msg.Event
addsStateEvents := msg.AddsState()
ev, err := c.updateStateEvent(ev)
if err != nil {
return err
}
for i := range addsStateEvents {
addsStateEvents[i], err = c.updateStateEvent(addsStateEvents[i])
if err != nil {
return err
}
}
err = c.db.StoreStateEvents(
ctx,
addsStateEvents,
msg.RemovesStateEventIDs,
)
if err != nil {
// panic rather than continue with an inconsistent database
log.WithFields(log.Fields{
"event": string(ev.JSON()),
log.ErrorKey: err,
"add": msg.AddsStateEventIDs,
"del": msg.RemovesStateEventIDs,
}).Panicf("roomserver output log: write event failure")
}
return nil
}
func (c *OutputRoomEventConsumer) onRedactEvent(
ctx context.Context, msg api.OutputRedactedEvent,
) error {
return c.db.RedactEvent(ctx, msg.RedactedEventID, msg.RedactedBecause)
}
// Start consuming from room servers
func (c *OutputRoomEventConsumer) Start() error {
return c.rsConsumer.Start()
}
func (c *OutputRoomEventConsumer) updateStateEvent(event gomatrixserverlib.HeaderedEvent) (gomatrixserverlib.HeaderedEvent, error) {
stateKey := ""
if event.StateKey() != nil {
stateKey = *event.StateKey()
}
prevEvent, err := c.db.GetStateEvent(
context.TODO(), event.RoomID(), event.Type(), stateKey,
)
if err != nil {
return event, err
}
if prevEvent == nil {
return event, nil
}
prev := types.PrevEventRef{
PrevContent: prevEvent.Content(),
ReplacesState: prevEvent.EventID(),
PrevSender: prevEvent.Sender(),
}
event.Event, err = event.SetUnsigned(prev)
return event, err
}

View file

@ -1,51 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package currentstateserver
import (
"github.com/Shopify/sarama"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/currentstateserver/consumers"
"github.com/matrix-org/dendrite/currentstateserver/internal"
"github.com/matrix-org/dendrite/currentstateserver/inthttp"
"github.com/matrix-org/dendrite/currentstateserver/storage"
"github.com/matrix-org/dendrite/internal/config"
"github.com/sirupsen/logrus"
)
// AddInternalRoutes registers HTTP handlers for the internal API. Invokes functions
// on the given input API.
func AddInternalRoutes(router *mux.Router, intAPI api.CurrentStateInternalAPI) {
inthttp.AddRoutes(router, intAPI)
}
// NewInternalAPI returns a concrete implementation of the internal API. Callers
// can call functions directly on the returned API or via an HTTP interface using AddInternalRoutes.
func NewInternalAPI(cfg *config.CurrentStateServer, consumer sarama.Consumer) api.CurrentStateInternalAPI {
csDB, err := storage.NewDatabase(&cfg.Database)
if err != nil {
logrus.WithError(err).Panicf("failed to open database")
}
roomConsumer := consumers.NewOutputRoomEventConsumer(
cfg.Matrix.Kafka.TopicFor(config.TopicOutputRoomEvent), consumer, csDB,
)
if err = roomConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start room server consumer")
}
return &internal.CurrentStateInternalAPI{
DB: csDB,
}
}

View file

@ -1,47 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal
import (
"context"
"github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/currentstateserver/storage"
"github.com/matrix-org/gomatrixserverlib"
)
type CurrentStateInternalAPI struct {
DB storage.Database
}
func (a *CurrentStateInternalAPI) QueryBulkStateContent(ctx context.Context, req *api.QueryBulkStateContentRequest, res *api.QueryBulkStateContentResponse) error {
events, err := a.DB.GetBulkStateContent(ctx, req.RoomIDs, req.StateTuples, req.AllowWildcards)
if err != nil {
return err
}
res.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
for _, ev := range events {
if res.Rooms[ev.RoomID] == nil {
res.Rooms[ev.RoomID] = make(map[gomatrixserverlib.StateKeyTuple]string)
}
room := res.Rooms[ev.RoomID]
room[gomatrixserverlib.StateKeyTuple{
EventType: ev.EventType,
StateKey: ev.StateKey,
}] = ev.ContentValue
res.Rooms[ev.RoomID] = room
}
return nil
}

View file

@ -1,63 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inthttp
import (
"context"
"errors"
"net/http"
"github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/opentracing/opentracing-go"
)
// HTTP paths for the internal HTTP APIs
const (
QueryRoomsForUserPath = "/currentstateserver/queryRoomsForUser"
QueryBulkStateContentPath = "/currentstateserver/queryBulkStateContent"
)
// NewCurrentStateAPIClient creates a CurrentStateInternalAPI implemented by talking to a HTTP POST API.
// If httpClient is nil an error is returned
func NewCurrentStateAPIClient(
apiURL string,
httpClient *http.Client,
) (api.CurrentStateInternalAPI, error) {
if httpClient == nil {
return nil, errors.New("NewCurrentStateAPIClient: httpClient is <nil>")
}
return &httpCurrentStateInternalAPI{
apiURL: apiURL,
httpClient: httpClient,
}, nil
}
type httpCurrentStateInternalAPI struct {
apiURL string
httpClient *http.Client
}
func (h *httpCurrentStateInternalAPI) QueryBulkStateContent(
ctx context.Context,
request *api.QueryBulkStateContentRequest,
response *api.QueryBulkStateContentResponse,
) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryBulkStateContent")
defer span.Finish()
apiURL := h.apiURL + QueryBulkStateContentPath
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, request, response)
}

View file

@ -1,41 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package inthttp
import (
"encoding/json"
"net/http"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/util"
)
func AddRoutes(internalAPIMux *mux.Router, intAPI api.CurrentStateInternalAPI) {
internalAPIMux.Handle(QueryBulkStateContentPath,
httputil.MakeInternalAPI("queryBulkStateContent", func(req *http.Request) util.JSONResponse {
request := api.QueryBulkStateContentRequest{}
response := api.QueryBulkStateContentResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
if err := intAPI.QueryBulkStateContent(req.Context(), &request, &response); err != nil {
return util.ErrorResponse(err)
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}

View file

@ -1,46 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"context"
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/gomatrixserverlib"
)
type Database interface {
internal.PartitionStorer
// StoreStateEvents updates the database with new events from the roomserver.
StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent, removeStateEventIDs []string) error
// GetStateEvent returns the state event of a given type for a given room with a given state key
// If no event could be found, returns nil
// If there was an issue during the retrieval, returns an error
GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
// GetRoomsByMembership returns a list of room IDs matching the provided membership and user ID (as state_key).
GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error)
// GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match.
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error)
// Redact a state event
RedactEvent(ctx context.Context, redactedEventID string, redactedBecause gomatrixserverlib.HeaderedEvent) error
// JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear.
JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error)
// GetKnownUsers searches all users that userID knows about.
GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
// GetKnownRooms returns a list of all rooms we know about.
GetKnownRooms(ctx context.Context) ([]string, error)
}

View file

@ -1,351 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package postgres
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"github.com/lib/pq"
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
var currentRoomStateSchema = `
-- Stores the current room state for every room.
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
-- The 'room_id' key for the state event.
room_id TEXT NOT NULL,
-- The state event ID
event_id TEXT NOT NULL,
-- The state event type e.g 'm.room.member'
type TEXT NOT NULL,
-- The 'sender' property of the event.
sender TEXT NOT NULL,
-- The state_key value for this state event e.g ''
state_key TEXT NOT NULL,
-- The JSON for the event. Stored as TEXT because this should be valid UTF-8.
headered_event_json TEXT NOT NULL,
-- A piece of extracted content e.g membership for m.room.member events
content_value TEXT NOT NULL DEFAULT '',
-- Clobber based on 3-uple of room_id, type and state_key
CONSTRAINT currentstate_current_room_state_unique UNIQUE (room_id, type, state_key)
);
-- for event deletion
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
-- for querying membership states of users
CREATE INDEX IF NOT EXISTS currentstate_membership_idx ON currentstate_current_room_state(type, state_key, content_value)
WHERE type='m.room.member' AND content_value IS NOT NULL AND content_value != 'leave';
`
const upsertRoomStateSQL = "" +
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, content_value)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
" ON CONFLICT ON CONSTRAINT currentstate_current_room_state_unique" +
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, content_value = $7"
const deleteRoomStateByEventIDSQL = "" +
"DELETE FROM currentstate_current_room_state WHERE event_id = $1"
const selectRoomIDsWithMembershipSQL = "" +
"SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND content_value = $2"
const selectStateEventSQL = "" +
"SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
"SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id = ANY($1)"
const selectBulkStateContentSQL = "" +
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2) AND state_key = ANY($3)"
const selectBulkStateContentWildSQL = "" +
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id = ANY($1) AND type = ANY($2)"
const selectJoinedUsersSetForRoomsSQL = "" +
"SELECT state_key, COUNT(room_id) FROM currentstate_current_room_state WHERE room_id = ANY($1) AND" +
" type = 'm.room.member' and content_value = 'join' GROUP BY state_key"
const selectKnownRoomsSQL = "" +
"SELECT DISTINCT room_id FROM currentstate_current_room_state"
// selectKnownUsersSQL uses a sub-select statement here to find rooms that the user is
// joined to. Since this information is used to populate the user directory, we will
// only return users that the user would ordinarily be able to see anyway.
const selectKnownUsersSQL = "" +
"SELECT DISTINCT state_key FROM currentstate_current_room_state WHERE room_id = ANY(" +
" SELECT DISTINCT room_id FROM currentstate_current_room_state WHERE state_key=$1 AND TYPE='m.room.member' AND content_value='join'" +
") AND TYPE='m.room.member' AND content_value='join' AND state_key LIKE $2 LIMIT $3"
type currentRoomStateStatements struct {
upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt
selectEventsWithEventIDsStmt *sql.Stmt
selectStateEventStmt *sql.Stmt
selectBulkStateContentStmt *sql.Stmt
selectBulkStateContentWildStmt *sql.Stmt
selectJoinedUsersSetForRoomsStmt *sql.Stmt
selectKnownRoomsStmt *sql.Stmt
selectKnownUsersStmt *sql.Stmt
}
func NewPostgresCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
s := &currentRoomStateStatements{}
_, err := db.Exec(currentRoomStateSchema)
if err != nil {
return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
return nil, err
}
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
return nil, err
}
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
return nil, err
}
if s.selectEventsWithEventIDsStmt, err = db.Prepare(selectEventsWithEventIDsSQL); err != nil {
return nil, err
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err
}
if s.selectBulkStateContentStmt, err = db.Prepare(selectBulkStateContentSQL); err != nil {
return nil, err
}
if s.selectBulkStateContentWildStmt, err = db.Prepare(selectBulkStateContentWildSQL); err != nil {
return nil, err
}
if s.selectJoinedUsersSetForRoomsStmt, err = db.Prepare(selectJoinedUsersSetForRoomsSQL); err != nil {
return nil, err
}
if s.selectKnownRoomsStmt, err = db.Prepare(selectKnownRoomsSQL); err != nil {
return nil, err
}
if s.selectKnownUsersStmt, err = db.Prepare(selectKnownUsersSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *currentRoomStateStatements) SelectJoinedUsersSetForRooms(ctx context.Context, roomIDs []string) (map[string]int, error) {
rows, err := s.selectJoinedUsersSetForRoomsStmt.QueryContext(ctx, pq.StringArray(roomIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsersSetForRooms: rows.close() failed")
result := make(map[string]int)
for rows.Next() {
var userID string
var count int
if err := rows.Scan(&userID, &count); err != nil {
return nil, err
}
result[userID] = count
}
return result, rows.Err()
}
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
ctx context.Context,
txn *sql.Tx,
userID string,
contentVal string,
) ([]string, error) {
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
rows, err := stmt.QueryContext(ctx, userID, contentVal)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
var result []string
for rows.Next() {
var roomID string
if err := rows.Scan(&roomID); err != nil {
return nil, err
}
result = append(result, roomID)
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
ctx context.Context, txn *sql.Tx, eventID string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
_, err := stmt.ExecContext(ctx, eventID)
return err
}
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.HeaderedEvent, contentVal string,
) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
// upsert state event
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
_, err = stmt.ExecContext(
ctx,
event.RoomID(),
event.EventID(),
event.Type(),
event.Sender(),
*event.StateKey(),
headeredJSON,
contentVal,
)
return err
}
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]gomatrixserverlib.HeaderedEvent, error) {
stmt := sqlutil.TxStmt(txn, s.selectEventsWithEventIDsStmt)
rows, err := stmt.QueryContext(ctx, pq.StringArray(eventIDs))
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
result := []gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
return nil, err
}
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, ev)
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
var ev gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
return &ev, err
}
func (s *currentRoomStateStatements) SelectBulkStateContent(
ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool,
) ([]tables.StrippedEvent, error) {
hasWildcards := false
eventTypeSet := make(map[string]bool)
stateKeySet := make(map[string]bool)
var eventTypes []string
var stateKeys []string
for _, tuple := range tuples {
if !eventTypeSet[tuple.EventType] {
eventTypeSet[tuple.EventType] = true
eventTypes = append(eventTypes, tuple.EventType)
}
if !stateKeySet[tuple.StateKey] {
stateKeySet[tuple.StateKey] = true
stateKeys = append(stateKeys, tuple.StateKey)
}
if tuple.StateKey == "*" {
hasWildcards = true
}
}
var rows *sql.Rows
var err error
if hasWildcards && allowWildcards {
rows, err = s.selectBulkStateContentWildStmt.QueryContext(ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes))
} else {
rows, err = s.selectBulkStateContentStmt.QueryContext(
ctx, pq.StringArray(roomIDs), pq.StringArray(eventTypes), pq.StringArray(stateKeys),
)
}
if err != nil {
return nil, err
}
strippedEvents := []tables.StrippedEvent{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectBulkStateContent: rows.close() failed")
for rows.Next() {
var roomID string
var eventType string
var stateKey string
var contentVal string
if err = rows.Scan(&roomID, &eventType, &stateKey, &contentVal); err != nil {
return nil, err
}
strippedEvents = append(strippedEvents, tables.StrippedEvent{
RoomID: roomID,
ContentValue: contentVal,
EventType: eventType,
StateKey: stateKey,
})
}
return strippedEvents, rows.Err()
}
func (s *currentRoomStateStatements) SelectKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
rows, err := s.selectKnownUsersStmt.QueryContext(ctx, userID, fmt.Sprintf("%%%s%%", searchString), limit)
if err != nil {
return nil, err
}
result := []string{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownUsers: rows.close() failed")
for rows.Next() {
var userID string
if err := rows.Scan(&userID); err != nil {
return nil, err
}
result = append(result, userID)
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) SelectKnownRooms(ctx context.Context) ([]string, error) {
rows, err := s.selectKnownRoomsStmt.QueryContext(ctx)
if err != nil {
return nil, err
}
result := []string{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownRooms: rows.close() failed")
for rows.Next() {
var roomID string
if err := rows.Scan(&roomID); err != nil {
return nil, err
}
result = append(result, roomID)
}
return result, rows.Err()
}

View file

@ -1,39 +0,0 @@
package postgres
import (
"database/sql"
"github.com/matrix-org/dendrite/currentstateserver/storage/shared"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
type Database struct {
shared.Database
db *sql.DB
writer sqlutil.Writer
sqlutil.PartitionOffsetStatements
}
// NewDatabase creates a new sync server database
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
d.writer = sqlutil.NewDummyWriter()
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
return nil, err
}
currRoomState, err := NewPostgresCurrentRoomStateTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
CurrentRoomState: currRoomState,
}
return &d, nil
}

View file

@ -1,100 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package shared
import (
"context"
"database/sql"
"fmt"
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/internal/eventutil"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
type Database struct {
DB *sql.DB
Writer sqlutil.Writer
CurrentRoomState tables.CurrentRoomState
}
func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error) {
return d.CurrentRoomState.SelectStateEvent(ctx, roomID, evType, stateKey)
}
func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) {
return d.CurrentRoomState.SelectBulkStateContent(ctx, roomIDs, tuples, allowWildcards)
}
func (d *Database) RedactEvent(ctx context.Context, redactedEventID string, redactedBecause gomatrixserverlib.HeaderedEvent) error {
events, err := d.CurrentRoomState.SelectEventsWithEventIDs(ctx, nil, []string{redactedEventID})
if err != nil {
return err
}
if len(events) != 1 {
// this will happen for all non-state events
return nil
}
redactionEvent := redactedBecause.Unwrap()
eventBeingRedacted := events[0].Unwrap()
redactedEvent, err := eventutil.RedactEvent(&redactionEvent, &eventBeingRedacted)
if err != nil {
return fmt.Errorf("RedactEvent failed: %w", err)
}
// replace the state event with a redacted version of itself
return d.StoreStateEvents(ctx, []gomatrixserverlib.HeaderedEvent{redactedEvent.Headered(redactedBecause.RoomVersion)}, []string{redactedEventID})
}
func (d *Database) StoreStateEvents(ctx context.Context, addStateEvents []gomatrixserverlib.HeaderedEvent,
removeStateEventIDs []string) error {
return d.Writer.Do(d.DB, nil, func(txn *sql.Tx) error {
// remove first, then add, as we do not ever delete state, but do replace state which is a remove followed by an add.
for _, eventID := range removeStateEventIDs {
if err := d.CurrentRoomState.DeleteRoomStateByEventID(ctx, txn, eventID); err != nil {
return err
}
}
for _, event := range addStateEvents {
if event.StateKey() == nil {
// ignore non state events
continue
}
contentVal := tables.ExtractContentValue(&event)
if err := d.CurrentRoomState.UpsertRoomState(ctx, txn, event, contentVal); err != nil {
return err
}
}
return nil
})
}
func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership string) ([]string, error) {
return d.CurrentRoomState.SelectRoomIDsWithMembership(ctx, nil, userID, membership)
}
func (d *Database) JoinedUsersSetInRooms(ctx context.Context, roomIDs []string) (map[string]int, error) {
return d.CurrentRoomState.SelectJoinedUsersSetForRooms(ctx, roomIDs)
}
func (d *Database) GetKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
return d.CurrentRoomState.SelectKnownUsers(ctx, userID, searchString, limit)
}
func (d *Database) GetKnownRooms(ctx context.Context) ([]string, error) {
return d.CurrentRoomState.SelectKnownRooms(ctx)
}

View file

@ -1,365 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqlite3
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/gomatrixserverlib"
)
const currentRoomStateSchema = `
-- Stores the current room state for every room.
CREATE TABLE IF NOT EXISTS currentstate_current_room_state (
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
type TEXT NOT NULL,
sender TEXT NOT NULL,
state_key TEXT NOT NULL,
headered_event_json TEXT NOT NULL,
content_value TEXT NOT NULL DEFAULT '',
UNIQUE (room_id, type, state_key)
);
-- for event deletion
CREATE UNIQUE INDEX IF NOT EXISTS currentstate_event_id_idx ON currentstate_current_room_state(event_id, room_id, type, sender);
`
const upsertRoomStateSQL = "" +
"INSERT INTO currentstate_current_room_state (room_id, event_id, type, sender, state_key, headered_event_json, content_value)" +
" VALUES ($1, $2, $3, $4, $5, $6, $7)" +
" ON CONFLICT (event_id, room_id, type, sender)" +
" DO UPDATE SET event_id = $2, sender=$4, headered_event_json = $6, content_value = $7"
const deleteRoomStateByEventIDSQL = "" +
"DELETE FROM currentstate_current_room_state WHERE event_id = $1"
const selectRoomIDsWithMembershipSQL = "" +
"SELECT room_id FROM currentstate_current_room_state WHERE type = 'm.room.member' AND state_key = $1 AND content_value = $2"
const selectStateEventSQL = "" +
"SELECT headered_event_json FROM currentstate_current_room_state WHERE room_id = $1 AND type = $2 AND state_key = $3"
const selectEventsWithEventIDsSQL = "" +
"SELECT headered_event_json FROM currentstate_current_room_state WHERE event_id IN ($1)"
const selectBulkStateContentSQL = "" +
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id IN ($1) AND type IN ($2) AND state_key IN ($3)"
const selectBulkStateContentWildSQL = "" +
"SELECT room_id, type, state_key, content_value FROM currentstate_current_room_state WHERE room_id IN ($1) AND type IN ($2)"
const selectJoinedUsersSetForRoomsSQL = "" +
"SELECT state_key, COUNT(room_id) FROM currentstate_current_room_state WHERE room_id IN ($1) AND type = 'm.room.member' and content_value = 'join' GROUP BY state_key"
const selectKnownRoomsSQL = "" +
"SELECT DISTINCT room_id FROM currentstate_current_room_state"
// selectKnownUsersSQL uses a sub-select statement here to find rooms that the user is
// joined to. Since this information is used to populate the user directory, we will
// only return users that the user would ordinarily be able to see anyway.
const selectKnownUsersSQL = "" +
"SELECT DISTINCT state_key FROM currentstate_current_room_state WHERE room_id IN (" +
" SELECT DISTINCT room_id FROM currentstate_current_room_state WHERE state_key=$1 AND TYPE='m.room.member' AND content_value='join'" +
") AND TYPE='m.room.member' AND content_value='join' AND state_key LIKE $2 LIMIT $3"
type currentRoomStateStatements struct {
db *sql.DB
upsertRoomStateStmt *sql.Stmt
deleteRoomStateByEventIDStmt *sql.Stmt
selectRoomIDsWithMembershipStmt *sql.Stmt
selectStateEventStmt *sql.Stmt
selectJoinedUsersSetForRoomsStmt *sql.Stmt
selectKnownRoomsStmt *sql.Stmt
selectKnownUsersStmt *sql.Stmt
}
func NewSqliteCurrentRoomStateTable(db *sql.DB) (tables.CurrentRoomState, error) {
s := &currentRoomStateStatements{
db: db,
}
_, err := db.Exec(currentRoomStateSchema)
if err != nil {
return nil, err
}
if s.upsertRoomStateStmt, err = db.Prepare(upsertRoomStateSQL); err != nil {
return nil, err
}
if s.deleteRoomStateByEventIDStmt, err = db.Prepare(deleteRoomStateByEventIDSQL); err != nil {
return nil, err
}
if s.selectRoomIDsWithMembershipStmt, err = db.Prepare(selectRoomIDsWithMembershipSQL); err != nil {
return nil, err
}
if s.selectStateEventStmt, err = db.Prepare(selectStateEventSQL); err != nil {
return nil, err
}
if s.selectJoinedUsersSetForRoomsStmt, err = db.Prepare(selectJoinedUsersSetForRoomsSQL); err != nil {
return nil, err
}
if s.selectKnownRoomsStmt, err = db.Prepare(selectKnownRoomsSQL); err != nil {
return nil, err
}
if s.selectKnownUsersStmt, err = db.Prepare(selectKnownUsersSQL); err != nil {
return nil, err
}
return s, nil
}
func (s *currentRoomStateStatements) SelectJoinedUsersSetForRooms(ctx context.Context, roomIDs []string) (map[string]int, error) {
iRoomIDs := make([]interface{}, len(roomIDs))
for i, v := range roomIDs {
iRoomIDs[i] = v
}
query := strings.Replace(selectJoinedUsersSetForRoomsSQL, "($1)", sqlutil.QueryVariadic(len(iRoomIDs)), 1)
rows, err := s.db.QueryContext(ctx, query, iRoomIDs...)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectJoinedUsersSetForRooms: rows.close() failed")
result := make(map[string]int)
for rows.Next() {
var userID string
var count int
if err := rows.Scan(&userID, &count); err != nil {
return nil, err
}
result[userID] = count
}
return result, rows.Err()
}
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
func (s *currentRoomStateStatements) SelectRoomIDsWithMembership(
ctx context.Context,
txn *sql.Tx,
userID string,
membership string,
) ([]string, error) {
stmt := sqlutil.TxStmt(txn, s.selectRoomIDsWithMembershipStmt)
rows, err := stmt.QueryContext(ctx, userID, membership)
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectRoomIDsWithMembership: rows.close() failed")
var result []string
for rows.Next() {
var roomID string
if err := rows.Scan(&roomID); err != nil {
return nil, err
}
result = append(result, roomID)
}
return result, nil
}
func (s *currentRoomStateStatements) DeleteRoomStateByEventID(
ctx context.Context, txn *sql.Tx, eventID string,
) error {
stmt := sqlutil.TxStmt(txn, s.deleteRoomStateByEventIDStmt)
_, err := stmt.ExecContext(ctx, eventID)
return err
}
func (s *currentRoomStateStatements) UpsertRoomState(
ctx context.Context, txn *sql.Tx,
event gomatrixserverlib.HeaderedEvent, contentVal string,
) error {
headeredJSON, err := json.Marshal(event)
if err != nil {
return err
}
// upsert state event
stmt := sqlutil.TxStmt(txn, s.upsertRoomStateStmt)
_, err = stmt.ExecContext(
ctx,
event.RoomID(),
event.EventID(),
event.Type(),
event.Sender(),
*event.StateKey(),
headeredJSON,
contentVal,
)
return err
}
func (s *currentRoomStateStatements) SelectEventsWithEventIDs(
ctx context.Context, txn *sql.Tx, eventIDs []string,
) ([]gomatrixserverlib.HeaderedEvent, error) {
iEventIDs := make([]interface{}, len(eventIDs))
for k, v := range eventIDs {
iEventIDs[k] = v
}
query := strings.Replace(selectEventsWithEventIDsSQL, "($1)", sqlutil.QueryVariadic(len(iEventIDs)), 1)
var rows *sql.Rows
var err error
if txn != nil {
rows, err = txn.QueryContext(ctx, query, iEventIDs...)
} else {
rows, err = s.db.QueryContext(ctx, query, iEventIDs...)
}
if err != nil {
return nil, err
}
defer internal.CloseAndLogIfError(ctx, rows, "selectEventsWithEventIDs: rows.close() failed")
result := []gomatrixserverlib.HeaderedEvent{}
for rows.Next() {
var eventBytes []byte
if err := rows.Scan(&eventBytes); err != nil {
return nil, err
}
var ev gomatrixserverlib.HeaderedEvent
if err := json.Unmarshal(eventBytes, &ev); err != nil {
return nil, err
}
result = append(result, ev)
}
return result, nil
}
func (s *currentRoomStateStatements) SelectStateEvent(
ctx context.Context, roomID, evType, stateKey string,
) (*gomatrixserverlib.HeaderedEvent, error) {
stmt := s.selectStateEventStmt
var res []byte
err := stmt.QueryRowContext(ctx, roomID, evType, stateKey).Scan(&res)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, err
}
var ev gomatrixserverlib.HeaderedEvent
if err = json.Unmarshal(res, &ev); err != nil {
return nil, err
}
return &ev, err
}
func (s *currentRoomStateStatements) SelectBulkStateContent(
ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool,
) ([]tables.StrippedEvent, error) {
hasWildcards := false
eventTypeSet := make(map[string]bool)
stateKeySet := make(map[string]bool)
var eventTypes []string
var stateKeys []string
for _, tuple := range tuples {
if !eventTypeSet[tuple.EventType] {
eventTypeSet[tuple.EventType] = true
eventTypes = append(eventTypes, tuple.EventType)
}
if !stateKeySet[tuple.StateKey] {
stateKeySet[tuple.StateKey] = true
stateKeys = append(stateKeys, tuple.StateKey)
}
if tuple.StateKey == "*" {
hasWildcards = true
}
}
iRoomIDs := make([]interface{}, len(roomIDs))
for i, v := range roomIDs {
iRoomIDs[i] = v
}
iEventTypes := make([]interface{}, len(eventTypes))
for i, v := range eventTypes {
iEventTypes[i] = v
}
iStateKeys := make([]interface{}, len(stateKeys))
for i, v := range stateKeys {
iStateKeys[i] = v
}
var query string
var args []interface{}
if hasWildcards && allowWildcards {
query = strings.Replace(selectBulkStateContentWildSQL, "($1)", sqlutil.QueryVariadic(len(iRoomIDs)), 1)
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(iEventTypes), len(iRoomIDs)), 1)
args = append(iRoomIDs, iEventTypes...)
} else {
query = strings.Replace(selectBulkStateContentSQL, "($1)", sqlutil.QueryVariadic(len(iRoomIDs)), 1)
query = strings.Replace(query, "($2)", sqlutil.QueryVariadicOffset(len(iEventTypes), len(iRoomIDs)), 1)
query = strings.Replace(query, "($3)", sqlutil.QueryVariadicOffset(len(iStateKeys), len(iEventTypes)+len(iRoomIDs)), 1)
args = append(iRoomIDs, iEventTypes...)
args = append(args, iStateKeys...)
}
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
strippedEvents := []tables.StrippedEvent{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectBulkStateContent: rows.close() failed")
for rows.Next() {
var roomID string
var eventType string
var stateKey string
var contentVal string
if err = rows.Scan(&roomID, &eventType, &stateKey, &contentVal); err != nil {
return nil, err
}
strippedEvents = append(strippedEvents, tables.StrippedEvent{
RoomID: roomID,
ContentValue: contentVal,
EventType: eventType,
StateKey: stateKey,
})
}
return strippedEvents, rows.Err()
}
func (s *currentRoomStateStatements) SelectKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error) {
rows, err := s.selectKnownUsersStmt.QueryContext(ctx, userID, fmt.Sprintf("%%%s%%", searchString), limit)
if err != nil {
return nil, err
}
result := []string{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownUsers: rows.close() failed")
for rows.Next() {
var userID string
if err := rows.Scan(&userID); err != nil {
return nil, err
}
result = append(result, userID)
}
return result, rows.Err()
}
func (s *currentRoomStateStatements) SelectKnownRooms(ctx context.Context) ([]string, error) {
rows, err := s.selectKnownRoomsStmt.QueryContext(ctx)
if err != nil {
return nil, err
}
result := []string{}
defer internal.CloseAndLogIfError(ctx, rows, "SelectKnownRooms: rows.close() failed")
for rows.Next() {
var roomID string
if err := rows.Scan(&roomID); err != nil {
return nil, err
}
result = append(result, roomID)
}
return result, rows.Err()
}

View file

@ -1,40 +0,0 @@
package sqlite3
import (
"database/sql"
"github.com/matrix-org/dendrite/currentstateserver/storage/shared"
"github.com/matrix-org/dendrite/internal/config"
"github.com/matrix-org/dendrite/internal/sqlutil"
)
type Database struct {
shared.Database
db *sql.DB
writer sqlutil.Writer
sqlutil.PartitionOffsetStatements
}
// NewDatabase creates a new sync server database
// nolint: gocyclo
func NewDatabase(dbProperties *config.DatabaseOptions) (*Database, error) {
var d Database
var err error
if d.db, err = sqlutil.Open(dbProperties); err != nil {
return nil, err
}
d.writer = sqlutil.NewExclusiveWriter()
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "currentstate"); err != nil {
return nil, err
}
currRoomState, err := NewSqliteCurrentRoomStateTable(d.db)
if err != nil {
return nil, err
}
d.Database = shared.Database{
DB: d.db,
Writer: d.writer,
CurrentRoomState: currRoomState,
}
return &d, nil
}

View file

@ -1,37 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package storage
import (
"fmt"
"github.com/matrix-org/dendrite/currentstateserver/storage/postgres"
"github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
"github.com/matrix-org/dendrite/internal/config"
)
// NewDatabase opens a database connection.
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return postgres.NewDatabase(dbProperties)
default:
return nil, fmt.Errorf("unexpected database type")
}
}

View file

@ -1,34 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package storage
import (
"fmt"
"github.com/matrix-org/dendrite/currentstateserver/storage/sqlite3"
"github.com/matrix-org/dendrite/internal/config"
)
// NewDatabase opens a database connection.
func NewDatabase(dbProperties *config.DatabaseOptions) (Database, error) {
switch {
case dbProperties.ConnectionString.IsSQLite():
return sqlite3.NewDatabase(dbProperties)
case dbProperties.ConnectionString.IsPostgres():
return nil, fmt.Errorf("can't use Postgres implementation")
default:
return nil, fmt.Errorf("unexpected database type")
}
}

View file

@ -1,88 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tables
import (
"context"
"database/sql"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
)
type CurrentRoomState interface {
SelectStateEvent(ctx context.Context, roomID, evType, stateKey string) (*gomatrixserverlib.HeaderedEvent, error)
// SelectEventsWithEventIDs returns the events for the given event IDs. If the event(s) are missing, they are not returned
// and no error is returned.
SelectEventsWithEventIDs(ctx context.Context, txn *sql.Tx, eventIDs []string) ([]gomatrixserverlib.HeaderedEvent, error)
// UpsertRoomState stores the given event in the database, along with an extracted piece of content.
// The piece of content will vary depending on the event type, and table implementations may use this information to optimise
// lookups e.g membership lookups. The mapped value of `contentVal` is outlined in ExtractContentValue. An empty `contentVal`
// means there is nothing to store for this field.
UpsertRoomState(ctx context.Context, txn *sql.Tx, event gomatrixserverlib.HeaderedEvent, contentVal string) error
DeleteRoomStateByEventID(ctx context.Context, txn *sql.Tx, eventID string) error
// SelectRoomIDsWithMembership returns the list of room IDs which have the given user in the given membership state.
SelectRoomIDsWithMembership(ctx context.Context, txn *sql.Tx, userID string, membership string) ([]string, error)
SelectBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]StrippedEvent, error)
// SelectJoinedUsersSetForRooms returns the set of all users in the rooms who are joined to any of these rooms, along with the
// counts of how many rooms they are joined.
SelectJoinedUsersSetForRooms(ctx context.Context, roomIDs []string) (map[string]int, error)
// SelectKnownUsers searches all users that userID knows about.
SelectKnownUsers(ctx context.Context, userID, searchString string, limit int) ([]string, error)
// SelectKnownRooms returns all rooms that we know about.
SelectKnownRooms(ctx context.Context) ([]string, error)
}
// StrippedEvent represents a stripped event for returning extracted content values.
type StrippedEvent struct {
RoomID string
EventType string
StateKey string
ContentValue string
}
// ExtractContentValue from the given state event. For example, given an m.room.name event with:
// content: { name: "Foo" }
// this returns "Foo".
func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
content := ev.Content()
key := ""
switch ev.Type() {
case gomatrixserverlib.MRoomCreate:
key = "creator"
case gomatrixserverlib.MRoomCanonicalAlias:
key = "alias"
case gomatrixserverlib.MRoomHistoryVisibility:
key = "history_visibility"
case gomatrixserverlib.MRoomJoinRules:
key = "join_rule"
case gomatrixserverlib.MRoomMember:
key = "membership"
case gomatrixserverlib.MRoomName:
key = "name"
case "m.room.avatar":
key = "url"
case "m.room.topic":
key = "topic"
case "m.room.guest_access":
key = "guest_access"
}
result := gjson.GetBytes(content, key)
if !result.Exists() {
return ""
}
// this returns the empty string if this is not a string type
return result.Str
}

View file

@ -141,17 +141,6 @@ client_api:
threshold: 5
cooloff_ms: 500
# Configuration for the Current State Server.
current_state_server:
internal_api:
listen: http://localhost:7782
connect: http://localhost:7782
database:
connection_string: file:currentstate.db
max_open_conns: 100
max_idle_conns: 2
conn_max_lifetime: -1
# Configuration for the EDU server.
edu_server:
internal_api:

View file

@ -109,7 +109,7 @@ Assuming that Postgres 9.5 (or later) is installed:
* Create the component databases:
```bash
for i in account device mediaapi syncapi roomserver serverkey federationsender currentstate appservice e2ekey naffka; do
for i in account device mediaapi syncapi roomserver serverkey federationsender appservice e2ekey naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i
done
```
@ -239,16 +239,6 @@ This is what implements the room DAG. Clients do not talk to this.
./bin/dendrite-room-server --config=dendrite.yaml
```
#### Current state server
This tracks the current state of rooms which various components need to know. For example,
`/publicRooms` implemented by client API asks this server for the room names, joined member
counts, etc.
```bash
./bin/dendrite-current-state-server --config=dendrite.yaml
```
#### Federation sender
This sends events from our users to other servers. This is only required if

View file

@ -16,7 +16,6 @@ package federationapi
import (
"github.com/gorilla/mux"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
@ -38,12 +37,11 @@ func AddPublicRoutes(
rsAPI roomserverAPI.RoomserverInternalAPI,
federationSenderAPI federationSenderAPI.FederationSenderInternalAPI,
eduAPI eduserverAPI.EDUServerInputAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
) {
routing.Setup(
fedRouter, keyRouter, cfg, rsAPI,
eduAPI, federationSenderAPI, keyRing,
federation, userAPI, stateAPI, keyAPI,
federation, userAPI, keyAPI,
)
}

View file

@ -31,7 +31,7 @@ func TestRoomsV3URLEscapeDoNot404(t *testing.T) {
fsAPI := base.FederationSenderHTTPClient()
// TODO: This is pretty fragile, as if anything calls anything on these nils this test will break.
// Unfortunately, it makes little sense to instantiate these dependencies when we just want to test routing.
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil, nil)
federationapi.AddPublicRoutes(base.PublicFederationAPIMux, base.PublicKeyAPIMux, &cfg.FederationAPI, nil, nil, keyRing, nil, fsAPI, nil, nil)
baseURL, cancel := test.ListenAndServe(t, base.PublicFederationAPIMux, true)
defer cancel()
serverName := gomatrixserverlib.ServerName(strings.TrimPrefix(baseURL, "https://"))

View file

@ -7,7 +7,6 @@ import (
"github.com/matrix-org/dendrite/clientapi/httputil"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
@ -24,7 +23,7 @@ type filter struct {
}
// GetPostPublicRooms implements GET and POST /publicRooms
func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) util.JSONResponse {
func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI) util.JSONResponse {
var request PublicRoomReq
if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil {
return *fillErr
@ -32,7 +31,7 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna
if request.Limit == 0 {
request.Limit = 50
}
response, err := publicRooms(req.Context(), request, rsAPI, stateAPI)
response, err := publicRooms(req.Context(), request, rsAPI)
if err != nil {
return jsonerror.InternalServerError()
}
@ -42,8 +41,9 @@ func GetPostPublicRooms(req *http.Request, rsAPI roomserverAPI.RoomserverInterna
}
}
func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) {
func publicRooms(
ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI,
) (*gomatrixserverlib.RespPublicRooms, error) {
var response gomatrixserverlib.RespPublicRooms
var limit int16
@ -80,7 +80,7 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI
nextIndex = len(queryRes.RoomIDs)
}
roomIDs := queryRes.RoomIDs[offset:nextIndex]
response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI)
response.Chunk, err = fillInRooms(ctx, roomIDs, rsAPI)
return &response, err
}
@ -112,7 +112,7 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO
// due to lots of switches
// nolint:gocyclo
func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
func fillInRooms(ctx context.Context, roomIDs []string, rsAPI roomserverAPI.RoomserverInternalAPI) ([]gomatrixserverlib.PublicRoom, error) {
avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""}
nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""}
canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""}
@ -121,8 +121,8 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI
visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""}
joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err := stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
var stateRes roomserverAPI.QueryBulkStateContentResponse
err := rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: roomIDs,
AllowWildcards: true,
StateTuples: []gomatrixserverlib.StateKeyTuple{

View file

@ -19,7 +19,6 @@ import (
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
"github.com/matrix-org/dendrite/internal/config"
@ -48,7 +47,6 @@ func Setup(
keys gomatrixserverlib.JSONVerifier,
federation *gomatrixserverlib.FederationClient,
userAPI userapi.UserInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
keyAPI keyserverAPI.KeyInternalAPI,
) {
v2keysmux := keyMux.PathPrefix("/v2").Subrouter()
@ -390,7 +388,7 @@ func Setup(
v1fedmux.Handle("/publicRooms",
httputil.MakeExternalAPI("federation_public_rooms", func(req *http.Request) util.JSONResponse {
return GetPostPublicRooms(req, rsAPI, stateAPI)
return GetPostPublicRooms(req, rsAPI)
}),
).Methods(http.MethodGet)

View file

@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
eduserverAPI "github.com/matrix-org/dendrite/eduserver/api"
@ -43,7 +44,6 @@ func Send(
federation *gomatrixserverlib.FederationClient,
) util.JSONResponse {
t := txnReq{
context: httpReq.Context(),
rsAPI: rsAPI,
eduAPI: eduAPI,
keys: keys,
@ -82,7 +82,7 @@ func Send(
util.GetLogger(httpReq.Context()).Infof("Received transaction %q containing %d PDUs, %d EDUs", txnID, len(t.PDUs), len(t.EDUs))
resp, jsonErr := t.processTransaction()
resp, jsonErr := t.processTransaction(httpReq.Context())
if jsonErr != nil {
util.GetLogger(httpReq.Context()).WithField("jsonErr", jsonErr).Error("t.processTransaction failed")
return *jsonErr
@ -100,7 +100,6 @@ func Send(
type txnReq struct {
gomatrixserverlib.Transaction
context context.Context
rsAPI api.RoomserverInternalAPI
eduAPI eduserverAPI.EDUServerInputAPI
keyAPI keyapi.KeyInternalAPI
@ -124,7 +123,7 @@ type txnFederationClient interface {
roomVersion gomatrixserverlib.RoomVersion) (res gomatrixserverlib.RespMissingEvents, err error)
}
func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONResponse) {
func (t *txnReq) processTransaction(ctx context.Context) (*gomatrixserverlib.RespSend, *util.JSONResponse) {
results := make(map[string]gomatrixserverlib.PDUResult)
pdus := []gomatrixserverlib.HeaderedEvent{}
@ -133,15 +132,15 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONRe
RoomID string `json:"room_id"`
}
if err := json.Unmarshal(pdu, &header); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to extract room ID from event")
util.GetLogger(ctx).WithError(err).Warn("Transaction: Failed to extract room ID from event")
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
}
verReq := api.QueryRoomVersionForRoomRequest{RoomID: header.RoomID}
verRes := api.QueryRoomVersionForRoomResponse{}
if err := t.rsAPI.QueryRoomVersionForRoom(t.context, &verReq, &verRes); err != nil {
util.GetLogger(t.context).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID)
if err := t.rsAPI.QueryRoomVersionForRoom(ctx, &verReq, &verRes); err != nil {
util.GetLogger(ctx).WithError(err).Warn("Transaction: Failed to query room version for room", verReq.RoomID)
// We don't know the event ID at this point so we can't return the
// failure in the PDU results
continue
@ -161,17 +160,17 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONRe
JSON: jsonerror.BadJSON("PDU contains bad JSON"),
}
}
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %s", string(pdu))
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %s", string(pdu))
continue
}
if api.IsServerBannedFromRoom(t.context, t.rsAPI, event.RoomID(), t.Origin) {
if api.IsServerBannedFromRoom(ctx, t.rsAPI, event.RoomID(), t.Origin) {
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: "Forbidden by server ACLs",
}
continue
}
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
if err = gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
results[event.EventID()] = gomatrixserverlib.PDUResult{
Error: err.Error(),
}
@ -182,7 +181,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONRe
// Process the events.
for _, e := range pdus {
if err := t.processEvent(e.Unwrap(), true); err != nil {
if err := t.processEvent(ctx, e.Unwrap(), true); err != nil {
// If the error is due to the event itself being bad then we skip
// it and move onto the next event. We report an error so that the
// sender knows that we have skipped processing it.
@ -201,7 +200,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONRe
if isProcessingErrorFatal(err) {
// Any other error should be the result of a temporary error in
// our server so we should bail processing the transaction entirely.
util.GetLogger(t.context).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
util.GetLogger(ctx).Warnf("Processing %s failed fatally: %s", e.EventID(), err)
jsonErr := util.ErrorResponse(err)
return nil, &jsonErr
} else {
@ -211,7 +210,7 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONRe
if rejected {
errMsg = ""
}
util.GetLogger(t.context).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
util.GetLogger(ctx).WithError(err).WithField("event_id", e.EventID()).WithField("rejected", rejected).Warn(
"Failed to process incoming federation event, skipping",
)
results[e.EventID()] = gomatrixserverlib.PDUResult{
@ -223,9 +222,9 @@ func (t *txnReq) processTransaction() (*gomatrixserverlib.RespSend, *util.JSONRe
}
}
t.processEDUs(t.EDUs)
t.processEDUs(ctx)
if c := len(results); c > 0 {
util.GetLogger(t.context).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)
util.GetLogger(ctx).Infof("Processed %d PDUs from transaction %q", c, t.TransactionID)
}
return &gomatrixserverlib.RespSend{PDUs: results}, nil
}
@ -284,8 +283,9 @@ func (t *txnReq) haveEventIDs() map[string]bool {
return result
}
func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
for _, e := range edus {
// nolint:gocyclo
func (t *txnReq) processEDUs(ctx context.Context) {
for _, e := range t.EDUs {
switch e.Type {
case gomatrixserverlib.MTyping:
// https://matrix.org/docs/spec/server_server/latest#typing-notifications
@ -295,24 +295,24 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
Typing bool `json:"typing"`
}
if err := json.Unmarshal(e.Content, &typingPayload); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal typing event")
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal typing event")
continue
}
if err := eduserverAPI.SendTyping(t.context, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to send typing event to edu server")
if err := eduserverAPI.SendTyping(ctx, t.eduAPI, typingPayload.UserID, typingPayload.RoomID, typingPayload.Typing, 30*1000); err != nil {
util.GetLogger(ctx).WithError(err).Error("Failed to send typing event to edu server")
}
case gomatrixserverlib.MDirectToDevice:
// https://matrix.org/docs/spec/server_server/r0.1.3#m-direct-to-device-schema
var directPayload gomatrixserverlib.ToDeviceMessage
if err := json.Unmarshal(e.Content, &directPayload); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal send-to-device events")
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal send-to-device events")
continue
}
for userID, byUser := range directPayload.Messages {
for deviceID, message := range byUser {
// TODO: check that the user and the device actually exist here
if err := eduserverAPI.SendToDevice(t.context, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(t.context).WithError(err).WithFields(logrus.Fields{
if err := eduserverAPI.SendToDevice(ctx, t.eduAPI, directPayload.Sender, userID, deviceID, directPayload.Type, message); err != nil {
util.GetLogger(ctx).WithError(err).WithFields(logrus.Fields{
"sender": directPayload.Sender,
"user_id": userID,
"device_id": deviceID,
@ -321,17 +321,17 @@ func (t *txnReq) processEDUs(edus []gomatrixserverlib.EDU) {
}
}
case gomatrixserverlib.MDeviceListUpdate:
t.processDeviceListUpdate(e)
t.processDeviceListUpdate(ctx, e)
default:
util.GetLogger(t.context).WithField("type", e.Type).Debug("Unhandled EDU")
util.GetLogger(ctx).WithField("type", e.Type).Debug("Unhandled EDU")
}
}
}
func (t *txnReq) processDeviceListUpdate(e gomatrixserverlib.EDU) {
func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverlib.EDU) {
var payload gomatrixserverlib.DeviceListUpdateEvent
if err := json.Unmarshal(e.Content, &payload); err != nil {
util.GetLogger(t.context).WithError(err).Error("Failed to unmarshal device list update event")
util.GetLogger(ctx).WithError(err).Error("Failed to unmarshal device list update event")
return
}
var inputRes keyapi.InputDeviceListUpdateResponse
@ -339,11 +339,11 @@ func (t *txnReq) processDeviceListUpdate(e gomatrixserverlib.EDU) {
Event: payload,
}, &inputRes)
if inputRes.Error != nil {
util.GetLogger(t.context).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
util.GetLogger(ctx).WithError(inputRes.Error).WithField("user_id", payload.UserID).Error("failed to InputDeviceListUpdate")
}
}
func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) error {
func (t *txnReq) processEvent(ctx context.Context, e gomatrixserverlib.Event, isInboundTxn bool) error {
prevEventIDs := e.PrevEventIDs()
// Fetch the state needed to authenticate the event.
@ -354,7 +354,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
StateToFetch: needed.Tuples(),
}
var stateResp api.QueryStateAfterEventsResponse
if err := t.rsAPI.QueryStateAfterEvents(t.context, &stateReq, &stateResp); err != nil {
if err := t.rsAPI.QueryStateAfterEvents(ctx, &stateReq, &stateResp); err != nil {
return err
}
@ -369,7 +369,7 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
}
if !stateResp.PrevEventsExist {
return t.processEventWithMissingState(e, stateResp.RoomVersion, isInboundTxn)
return t.processEventWithMissingState(ctx, e, stateResp.RoomVersion, isInboundTxn)
}
// Check that the event is allowed by the state at the event.
@ -379,7 +379,8 @@ func (t *txnReq) processEvent(e gomatrixserverlib.Event, isInboundTxn bool) erro
// pass the event to the roomserver
return api.SendEvents(
t.context, t.rsAPI,
context.Background(),
t.rsAPI,
[]gomatrixserverlib.HeaderedEvent{
e.Headered(stateResp.RoomVersion),
},
@ -399,7 +400,12 @@ func checkAllowedByState(e gomatrixserverlib.Event, stateEvents []gomatrixserver
return gomatrixserverlib.Allowed(e, &authUsingState)
}
func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
func (t *txnReq) processEventWithMissingState(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) error {
// Do this with a fresh context, so that we keep working even if the
// original request times out. With any luck, by the time the remote
// side retries, we'll have fetched the missing state.
gmectx, cancel := context.WithTimeout(context.Background(), time.Minute*5)
defer cancel()
// We are missing the previous events for this events.
// This means that there is a gap in our view of the history of the
// room. There two ways that we can handle such a gap:
@ -420,7 +426,7 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// - fill in the gap completely then process event `e` returning no backwards extremity
// - fail to fill in the gap and tell us to terminate the transaction err=not nil
// - fail to fill in the gap and tell us to fetch state at the new backwards extremity, and to not terminate the transaction
backwardsExtremity, err := t.getMissingEvents(e, roomVersion, isInboundTxn)
backwardsExtremity, err := t.getMissingEvents(gmectx, e, roomVersion, isInboundTxn)
if err != nil {
return err
}
@ -437,16 +443,16 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{*backwardsExtremity}).Tuples()
for _, prevEventID := range backwardsExtremity.PrevEventIDs() {
var prevState *gomatrixserverlib.RespState
prevState, err = t.lookupStateAfterEvent(roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
prevState, err = t.lookupStateAfterEvent(gmectx, roomVersion, backwardsExtremity.RoomID(), prevEventID, needed)
if err != nil {
util.GetLogger(t.context).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
util.GetLogger(ctx).WithError(err).Errorf("Failed to lookup state after prev_event: %s", prevEventID)
return err
}
states = append(states, prevState)
}
resolvedState, err := t.resolveStatesAndCheck(roomVersion, states, backwardsExtremity)
resolvedState, err := t.resolveStatesAndCheck(gmectx, roomVersion, states, backwardsExtremity)
if err != nil {
util.GetLogger(t.context).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
util.GetLogger(ctx).WithError(err).Errorf("Failed to resolve state conflicts for event %s", backwardsExtremity.EventID())
return err
}
@ -457,21 +463,26 @@ func (t *txnReq) processEventWithMissingState(e gomatrixserverlib.Event, roomVer
// lookupStateAfterEvent returns the room state after `eventID`, which is the state before eventID with the state of `eventID` (if it's a state event)
// added into the mix.
func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) {
func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) (*gomatrixserverlib.RespState, error) {
// try doing all this locally before we resort to querying federation
respState := t.lookupStateAfterEventLocally(roomID, eventID, needed)
respState := t.lookupStateAfterEventLocally(ctx, roomID, eventID, needed)
if respState != nil {
return respState, nil
}
respState, err := t.lookupStateBeforeEvent(roomVersion, roomID, eventID)
respState, err := t.lookupStateBeforeEvent(ctx, roomVersion, roomID, eventID)
if err != nil {
return nil, err
}
// fetch the event we're missing and add it to the pile
h, err := t.lookupEvent(roomVersion, eventID, false)
if err != nil {
h, err := t.lookupEvent(ctx, roomVersion, eventID, false)
switch err.(type) {
case verifySigError:
return respState, nil
case nil:
// do nothing
default:
return nil, err
}
t.haveEvents[h.EventID()] = h
@ -493,15 +504,15 @@ func (t *txnReq) lookupStateAfterEvent(roomVersion gomatrixserverlib.RoomVersion
return respState, nil
}
func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState {
func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, eventID string, needed []gomatrixserverlib.StateKeyTuple) *gomatrixserverlib.RespState {
var res api.QueryStateAfterEventsResponse
err := t.rsAPI.QueryStateAfterEvents(t.context, &api.QueryStateAfterEventsRequest{
err := t.rsAPI.QueryStateAfterEvents(ctx, &api.QueryStateAfterEventsRequest{
RoomID: roomID,
PrevEventIDs: []string{eventID},
StateToFetch: needed,
}, &res)
if err != nil || !res.PrevEventsExist {
util.GetLogger(t.context).WithError(err).Warnf("failed to query state after %s locally", eventID)
util.GetLogger(ctx).WithError(err).Warnf("failed to query state after %s locally", eventID)
return nil
}
for i, ev := range res.StateEvents {
@ -528,9 +539,9 @@ func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []g
queryReq := api.QueryEventsByIDRequest{
EventIDs: missingEventList,
}
util.GetLogger(t.context).Infof("Fetching missing auth events: %v", missingEventList)
util.GetLogger(ctx).Infof("Fetching missing auth events: %v", missingEventList)
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil
}
for i := range queryRes.Events {
@ -548,22 +559,22 @@ func (t *txnReq) lookupStateAfterEventLocally(roomID, eventID string, needed []g
// lookuptStateBeforeEvent returns the room state before the event e, which is just /state_ids and/or /state depending on what
// the server supports.
func (t *txnReq) lookupStateBeforeEvent(roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
func (t *txnReq) lookupStateBeforeEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, eventID string) (
respState *gomatrixserverlib.RespState, err error) {
util.GetLogger(t.context).Infof("lookupStateBeforeEvent %s", eventID)
util.GetLogger(ctx).Infof("lookupStateBeforeEvent %s", eventID)
// Attempt to fetch the missing state using /state_ids and /events
respState, err = t.lookupMissingStateViaStateIDs(roomID, eventID, roomVersion)
respState, err = t.lookupMissingStateViaStateIDs(ctx, roomID, eventID, roomVersion)
if err != nil {
// Fallback to /state
util.GetLogger(t.context).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state")
respState, err = t.lookupMissingStateViaState(roomID, eventID, roomVersion)
util.GetLogger(ctx).WithError(err).Warn("lookupStateBeforeEvent failed to /state_ids, falling back to /state")
respState, err = t.lookupMissingStateViaState(ctx, roomID, eventID, roomVersion)
}
return
}
func (t *txnReq) resolveStatesAndCheck(roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
func (t *txnReq) resolveStatesAndCheck(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, states []*gomatrixserverlib.RespState, backwardsExtremity *gomatrixserverlib.Event) (*gomatrixserverlib.RespState, error) {
var authEventList []gomatrixserverlib.Event
var stateEventList []gomatrixserverlib.Event
for _, state := range states {
@ -579,11 +590,19 @@ retryAllowedState:
if err = checkAllowedByState(*backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError:
h, err2 := t.lookupEvent(roomVersion, missing.AuthEventID, true)
if err2 != nil {
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true)
switch err2.(type) {
case verifySigError:
return &gomatrixserverlib.RespState{
AuthEvents: authEventList,
StateEvents: resolvedStateEvents,
}, nil
case nil:
// do nothing
default:
return nil, fmt.Errorf("missing auth event %s and failed to look it up: %w", missing.AuthEventID, err2)
}
util.GetLogger(t.context).Infof("fetched event %s", missing.AuthEventID)
util.GetLogger(ctx).Infof("fetched event %s", missing.AuthEventID)
resolvedStateEvents = append(resolvedStateEvents, h.Unwrap())
goto retryAllowedState
default:
@ -600,12 +619,12 @@ retryAllowedState:
// begin from. Returns an error only if we should terminate the transaction which initiated /get_missing_events
// This function recursively calls txnReq.processEvent with the missing events, which will be processed before this function returns.
// This means that we may recursively call this function, as we spider back up prev_events to the min depth.
func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
func (t *txnReq) getMissingEvents(ctx context.Context, e gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion, isInboundTxn bool) (backwardsExtremity *gomatrixserverlib.Event, err error) {
if !isInboundTxn {
// we've recursed here, so just take a state snapshot please!
return &e, nil
}
logger := util.GetLogger(t.context).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
logger := util.GetLogger(ctx).WithField("event_id", e.EventID()).WithField("room_id", e.RoomID())
needed := gomatrixserverlib.StateNeededForAuth([]gomatrixserverlib.Event{e})
// query latest events (our trusted forward extremities)
req := api.QueryLatestEventsAndStateRequest{
@ -613,7 +632,7 @@ func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatri
StateToFetch: needed.Tuples(),
}
var res api.QueryLatestEventsAndStateResponse
if err = t.rsAPI.QueryLatestEventsAndState(t.context, &req, &res); err != nil {
if err = t.rsAPI.QueryLatestEventsAndState(ctx, &req, &res); err != nil {
logger.WithError(err).Warn("Failed to query latest events")
return &e, nil
}
@ -626,7 +645,7 @@ func (t *txnReq) getMissingEvents(e gomatrixserverlib.Event, roomVersion gomatri
if minDepth < 0 {
minDepth = 0
}
missingResp, err := t.federation.LookupMissingEvents(t.context, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
missingResp, err := t.federation.LookupMissingEvents(ctx, t.Origin, e.RoomID(), gomatrixserverlib.MissingEvents{
Limit: 20,
// synapse uses the min depth they've ever seen in that room
MinDepth: minDepth,
@ -685,7 +704,7 @@ Event:
}
// process the missing events then the event which started this whole thing
for _, ev := range append(newEvents, e) {
err := t.processEvent(ev, false)
err := t.processEvent(ctx, ev, false)
if err != nil {
return nil, err
}
@ -695,24 +714,24 @@ Event:
return nil, nil
}
func (t *txnReq) lookupMissingStateViaState(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
func (t *txnReq) lookupMissingStateViaState(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
respState *gomatrixserverlib.RespState, err error) {
state, err := t.federation.LookupState(t.context, t.Origin, roomID, eventID, roomVersion)
state, err := t.federation.LookupState(ctx, t.Origin, roomID, eventID, roomVersion)
if err != nil {
return nil, err
}
// Check that the returned state is valid.
if err := state.Check(t.context, t.keys, nil); err != nil {
if err := state.Check(ctx, t.keys, nil); err != nil {
return nil, err
}
return &state, nil
}
func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, eventID string, roomVersion gomatrixserverlib.RoomVersion) (
*gomatrixserverlib.RespState, error) {
util.GetLogger(t.context).Infof("lookupMissingStateViaStateIDs %s", eventID)
util.GetLogger(ctx).Infof("lookupMissingStateViaStateIDs %s", eventID)
// fetch the state event IDs at the time of the event
stateIDs, err := t.federation.LookupStateIDs(t.context, t.Origin, roomID, eventID)
stateIDs, err := t.federation.LookupStateIDs(ctx, t.Origin, roomID, eventID)
if err != nil {
return nil, err
}
@ -734,7 +753,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersi
EventIDs: missingEventList,
}
var queryRes api.QueryEventsByIDResponse
if err = t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
if err = t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
return nil, err
}
for i := range queryRes.Events {
@ -745,7 +764,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersi
}
}
util.GetLogger(t.context).WithFields(logrus.Fields{
util.GetLogger(ctx).WithFields(logrus.Fields{
"missing": len(missing),
"event_id": eventID,
"room_id": roomID,
@ -755,8 +774,13 @@ func (t *txnReq) lookupMissingStateViaStateIDs(roomID, eventID string, roomVersi
for missingEventID := range missing {
var h *gomatrixserverlib.HeaderedEvent
h, err = t.lookupEvent(roomVersion, missingEventID, false)
if err != nil {
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false)
switch err.(type) {
case verifySigError:
continue
case nil:
// do nothing
default:
return nil, err
}
t.haveEvents[h.EventID()] = h
@ -793,33 +817,33 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
return &respState, nil
}
func (t *txnReq) lookupEvent(roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst {
// fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{
EventIDs: []string{missingEventID},
}
var queryRes api.QueryEventsByIDResponse
if err := t.rsAPI.QueryEventsByID(t.context, &queryReq, &queryRes); err != nil {
util.GetLogger(t.context).Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
if err := t.rsAPI.QueryEventsByID(ctx, &queryReq, &queryRes); err != nil {
util.GetLogger(ctx).Warnf("Failed to query roomserver for missing event %s: %s - falling back to remote", missingEventID, err)
} else if len(queryRes.Events) == 1 {
return &queryRes.Events[0], nil
}
}
txn, err := t.federation.GetEvent(t.context, t.Origin, missingEventID)
txn, err := t.federation.GetEvent(ctx, t.Origin, missingEventID)
if err != nil || len(txn.PDUs) == 0 {
util.GetLogger(t.context).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
util.GetLogger(ctx).WithError(err).WithField("event_id", missingEventID).Warn("failed to get missing /event for event ID")
return nil, err
}
pdu := txn.PDUs[0]
var event gomatrixserverlib.Event
event, err = gomatrixserverlib.NewEventFromUntrustedJSON(pdu, roomVersion)
if err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Failed to parse event JSON of event %q", event.EventID())
return nil, unmarshalError{err}
}
if err = gomatrixserverlib.VerifyAllEventSignatures(t.context, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(t.context).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
if err = gomatrixserverlib.VerifyAllEventSignatures(ctx, []gomatrixserverlib.Event{event}, t.keys); err != nil {
util.GetLogger(ctx).WithError(err).Warnf("Transaction: Couldn't validate signature of event %q", event.EventID())
return nil, verifySigError{event.EventID(), err}
}
h := event.Headered(roomVersion)

View file

@ -372,7 +372,6 @@ func (c *txnFedClient) LookupMissingEvents(ctx context.Context, s gomatrixserver
func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederationClient, pdus []json.RawMessage) *txnReq {
t := &txnReq{
context: context.Background(),
rsAPI: rsAPI,
eduAPI: &testEDUProducer{},
keys: &test.NopJSONVerifier{},
@ -388,7 +387,7 @@ func mustCreateTransaction(rsAPI api.RoomserverInternalAPI, fedClient txnFederat
}
func mustProcessTransaction(t *testing.T, txn *txnReq, pdusWithErrors []string) {
res, err := txn.processTransaction()
res, err := txn.processTransaction(context.Background())
if err != nil {
t.Errorf("txn.processTransaction returned an error: %v", err)
return

View file

@ -30,7 +30,7 @@ type FederationClientError struct {
}
func (e *FederationClientError) Error() string {
return fmt.Sprintf("%s - (retry_after=%d, blacklisted=%v)", e.Err, e.RetryAfter, e.Blacklisted)
return fmt.Sprintf("%s - (retry_after=%s, blacklisted=%v)", e.Err, e.RetryAfter.String(), e.Blacklisted)
}
// FederationSenderInternalAPI is used to query information from the federation sender.

View file

@ -70,7 +70,10 @@ func failBlacklistableError(err error, stats *statistics.ServerStatistics) (unti
if !ok {
return stats.Failure()
}
if mxerr.Code >= 500 && mxerr.Code < 600 {
if mxerr.Code == 401 { // invalid signature in X-Matrix header
return stats.Failure()
}
if mxerr.Code >= 500 && mxerr.Code < 600 { // internal server errors
return stats.Failure()
}
return

View file

@ -185,20 +185,21 @@ func (r *FederationSenderInternalAPI) performJoinUsingServer(
// Check that the send_join response was valid.
joinCtx := perform.JoinContext(r.federation, r.keyRing)
if err = joinCtx.CheckSendJoinResponse(
respState, err := joinCtx.CheckSendJoinResponse(
ctx, event, serverName, respMakeJoin, respSendJoin,
); err != nil {
)
if err != nil {
return fmt.Errorf("joinCtx.CheckSendJoinResponse: %w", err)
}
// If we successfully performed a send_join above then the other
// server now thinks we're a part of the room. Send the newly
// returned state to the roomserver to update our local view.
respState := respSendJoin.ToRespState()
if err = roomserverAPI.SendEventWithState(
ctx, r.rsAPI,
&respState,
event.Headered(respMakeJoin.RoomVersion), nil,
respState,
event.Headered(respMakeJoin.RoomVersion),
nil,
); err != nil {
return fmt.Errorf("r.producer.SendEventWithState: %w", err)
}

View file

@ -30,7 +30,7 @@ func (r joinContext) CheckSendJoinResponse(
server gomatrixserverlib.ServerName,
respMakeJoin gomatrixserverlib.RespMakeJoin,
respSendJoin gomatrixserverlib.RespSendJoin,
) error {
) (*gomatrixserverlib.RespState, error) {
// A list of events that we have retried, if they were not included in
// the auth events supplied in the send_join.
retries := map[string][]gomatrixserverlib.Event{}
@ -97,8 +97,9 @@ func (r joinContext) CheckSendJoinResponse(
// TODO: Can we expand Check here to return a list of missing auth
// events rather than failing one at a time?
if err := respSendJoin.Check(ctx, r.keyRing, event, missingAuth); err != nil {
return fmt.Errorf("respSendJoin: %w", err)
rs, err := respSendJoin.Check(ctx, r.keyRing, event, missingAuth)
if err != nil {
return nil, fmt.Errorf("respSendJoin: %w", err)
}
return nil
return rs, nil
}

2
go.mod
View file

@ -21,7 +21,7 @@ require (
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20200903230638-083d02f49d4d
github.com/matrix-org/gomatrixserverlib v0.0.0-20200907151926-38f437f2b2a6
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.2

4
go.sum
View file

@ -567,8 +567,8 @@ github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26 h1:Hr3zjRsq2bh
github.com/matrix-org/gomatrix v0.0.0-20190528120928-7df988a63f26/go.mod h1:3fxX6gUjWyI/2Bt7J1OLhpCzOfO/bB3AiX0cJtEKud0=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd h1:xVrqJK3xHREMNjwjljkAUaadalWc0rRbmVuQatzmgwg=
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200903230638-083d02f49d4d h1:nA6S6qtLsUgwfJusQJLeNjvTLjZ7F9w+eWV9RzRZrzk=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200903230638-083d02f49d4d/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200907151926-38f437f2b2a6 h1:43gla6bLt4opWY1mQkAasF/LUCipZl7x2d44TY0wf40=
github.com/matrix-org/gomatrixserverlib v0.0.0-20200907151926-38f437f2b2a6/go.mod h1:JsAzE1Ll3+gDWS9JSUHPJiiyAksvOOnGWF2nXdg4ZzU=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 h1:HJ6U3S3ljJqNffYMcIeAncp5qT/i+ZMiJ2JC2F0aXP4=
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91/go.mod h1:sjyPyRxKM5uw1nD2cJ6O2OxI6GOqyVBfNXqKjBZTBZE=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7 h1:ntrLa/8xVzeSs8vHFHK25k0C+NV74sYMJnNSg5NoSRo=

View file

@ -51,19 +51,18 @@ type Dendrite struct {
// been a breaking change to the config file format.
Version int `yaml:"version"`
Global Global `yaml:"global"`
AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
ClientAPI ClientAPI `yaml:"client_api"`
CurrentStateServer CurrentStateServer `yaml:"current_state_server"`
EDUServer EDUServer `yaml:"edu_server"`
FederationAPI FederationAPI `yaml:"federation_api"`
FederationSender FederationSender `yaml:"federation_sender"`
KeyServer KeyServer `yaml:"key_server"`
MediaAPI MediaAPI `yaml:"media_api"`
RoomServer RoomServer `yaml:"room_server"`
ServerKeyAPI ServerKeyAPI `yaml:"server_key_api"`
SyncAPI SyncAPI `yaml:"sync_api"`
UserAPI UserAPI `yaml:"user_api"`
Global Global `yaml:"global"`
AppServiceAPI AppServiceAPI `yaml:"app_service_api"`
ClientAPI ClientAPI `yaml:"client_api"`
EDUServer EDUServer `yaml:"edu_server"`
FederationAPI FederationAPI `yaml:"federation_api"`
FederationSender FederationSender `yaml:"federation_sender"`
KeyServer KeyServer `yaml:"key_server"`
MediaAPI MediaAPI `yaml:"media_api"`
RoomServer RoomServer `yaml:"room_server"`
ServerKeyAPI ServerKeyAPI `yaml:"server_key_api"`
SyncAPI SyncAPI `yaml:"sync_api"`
UserAPI UserAPI `yaml:"user_api"`
// The config for tracing the dendrite servers.
Tracing struct {
@ -289,7 +288,6 @@ func (c *Dendrite) Defaults() {
c.Global.Defaults()
c.ClientAPI.Defaults()
c.CurrentStateServer.Defaults()
c.EDUServer.Defaults()
c.FederationAPI.Defaults()
c.FederationSender.Defaults()
@ -309,7 +307,7 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
Verify(configErrs *ConfigErrors, isMonolith bool)
}
for _, c := range []verifiable{
&c.Global, &c.ClientAPI, &c.CurrentStateServer,
&c.Global, &c.ClientAPI,
&c.EDUServer, &c.FederationAPI, &c.FederationSender,
&c.KeyServer, &c.MediaAPI, &c.RoomServer,
&c.ServerKeyAPI, &c.SyncAPI, &c.UserAPI,
@ -321,7 +319,6 @@ func (c *Dendrite) Verify(configErrs *ConfigErrors, isMonolith bool) {
func (c *Dendrite) Wiring() {
c.ClientAPI.Matrix = &c.Global
c.CurrentStateServer.Matrix = &c.Global
c.EDUServer.Matrix = &c.Global
c.FederationAPI.Matrix = &c.Global
c.FederationSender.Matrix = &c.Global
@ -512,15 +509,6 @@ func (config *Dendrite) UserAPIURL() string {
return string(config.UserAPI.InternalAPI.Connect)
}
// CurrentStateAPIURL returns an HTTP URL for where the currentstateserver is listening.
func (config *Dendrite) CurrentStateAPIURL() string {
// Hard code the currentstateserver to talk HTTP for now.
// If we support HTTPS we need to think of a practical way to do certificate validation.
// People setting up servers shouldn't need to get a certificate valid for the public
// internet for an internal API.
return string(config.CurrentStateServer.InternalAPI.Connect)
}
// EDUServerURL returns an HTTP URL for where the EDU server is listening.
func (config *Dendrite) EDUServerURL() string {
// Hard code the EDU server to talk HTTP for now.

View file

@ -1,24 +0,0 @@
package config
type CurrentStateServer struct {
Matrix *Global `yaml:"-"`
InternalAPI InternalAPIOptions `yaml:"internal_api"`
// The CurrentState database stores the current state of all rooms.
// It is accessed by the CurrentStateServer.
Database DatabaseOptions `yaml:"database"`
}
func (c *CurrentStateServer) Defaults() {
c.InternalAPI.Listen = "http://localhost:7782"
c.InternalAPI.Connect = "http://localhost:7782"
c.Database.Defaults()
c.Database.ConnectionString = "file:currentstate.db"
}
func (c *CurrentStateServer) Verify(configErrs *ConfigErrors, isMonolith bool) {
checkURL(configErrs, "current_state_server.internal_api.listen", string(c.InternalAPI.Listen))
checkURL(configErrs, "current_state_server.internal_api.connect", string(c.InternalAPI.Connect))
checkNotEmpty(configErrs, "current_state_server.database.connection_string", string(c.Database.ConnectionString))
}

View file

@ -21,7 +21,6 @@ import (
"net/url"
"time"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/gomatrixserverlib"
@ -38,7 +37,6 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
asinthttp "github.com/matrix-org/dendrite/appservice/inthttp"
currentstateinthttp "github.com/matrix-org/dendrite/currentstateserver/inthttp"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
eduinthttp "github.com/matrix-org/dendrite/eduserver/inthttp"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
@ -188,15 +186,6 @@ func (b *BaseDendrite) UserAPIClient() userapi.UserInternalAPI {
return userAPI
}
// CurrentStateAPIClient returns CurrentStateInternalAPI for hitting the currentstateserver over HTTP.
func (b *BaseDendrite) CurrentStateAPIClient() currentstateAPI.CurrentStateInternalAPI {
stateAPI, err := currentstateinthttp.NewCurrentStateAPIClient(b.Cfg.CurrentStateAPIURL(), b.httpClient)
if err != nil {
logrus.WithError(err).Panic("UserAPIClient failed", b.httpClient)
}
return stateAPI
}
// EDUServerClient returns EDUServerInputAPI for hitting the EDU server over HTTP
func (b *BaseDendrite) EDUServerClient() eduServerAPI.EDUServerInputAPI {
e, err := eduinthttp.NewEDUServerClient(b.Cfg.EDUServerURL(), b.httpClient)

View file

@ -20,7 +20,6 @@ import (
appserviceAPI "github.com/matrix-org/dendrite/appservice/api"
"github.com/matrix-org/dendrite/clientapi"
"github.com/matrix-org/dendrite/clientapi/api"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
eduServerAPI "github.com/matrix-org/dendrite/eduserver/api"
"github.com/matrix-org/dendrite/federationapi"
federationSenderAPI "github.com/matrix-org/dendrite/federationsender/api"
@ -53,7 +52,6 @@ type Monolith struct {
RoomserverAPI roomserverAPI.RoomserverInternalAPI
ServerKeyAPI serverKeyAPI.ServerKeyInternalAPI
UserAPI userapi.UserInternalAPI
StateAPI currentstateAPI.CurrentStateInternalAPI
KeyAPI keyAPI.KeyInternalAPI
// Optional
@ -65,17 +63,17 @@ func (m *Monolith) AddAllPublicRoutes(csMux, ssMux, keyMux, mediaMux *mux.Router
clientapi.AddPublicRoutes(
csMux, &m.Config.ClientAPI, m.KafkaProducer, m.AccountDB,
m.FedClient, m.RoomserverAPI,
m.EDUInternalAPI, m.AppserviceAPI, m.StateAPI, transactions.New(),
m.EDUInternalAPI, m.AppserviceAPI, transactions.New(),
m.FederationSenderAPI, m.UserAPI, m.KeyAPI, m.ExtPublicRoomsProvider,
)
federationapi.AddPublicRoutes(
ssMux, keyMux, &m.Config.FederationAPI, m.UserAPI, m.FedClient,
m.KeyRing, m.RoomserverAPI, m.FederationSenderAPI,
m.EDUInternalAPI, m.StateAPI, m.KeyAPI,
m.EDUInternalAPI, m.KeyAPI,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
csMux, m.KafkaConsumer, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.StateAPI, m.FedClient, &m.Config.SyncAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,
)
}

View file

@ -87,7 +87,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
// the table names are globally unique. But we might not want to
// rely on that in the future.
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(database)
cfg.CurrentStateServer.Database.ConnectionString = config.DataSource(database)
cfg.FederationSender.Database.ConnectionString = config.DataSource(database)
cfg.KeyServer.Database.ConnectionString = config.DataSource(database)
cfg.MediaAPI.Database.ConnectionString = config.DataSource(database)
@ -98,7 +97,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.UserAPI.DeviceDatabase.ConnectionString = config.DataSource(database)
cfg.AppServiceAPI.InternalAPI.Listen = assignAddress()
cfg.CurrentStateServer.InternalAPI.Listen = assignAddress()
cfg.EDUServer.InternalAPI.Listen = assignAddress()
cfg.FederationAPI.InternalAPI.Listen = assignAddress()
cfg.FederationSender.InternalAPI.Listen = assignAddress()
@ -110,7 +108,6 @@ func MakeConfig(configDir, kafkaURI, database, host string, startPort int) (*con
cfg.UserAPI.InternalAPI.Listen = assignAddress()
cfg.AppServiceAPI.InternalAPI.Connect = cfg.AppServiceAPI.InternalAPI.Listen
cfg.CurrentStateServer.InternalAPI.Connect = cfg.CurrentStateServer.InternalAPI.Listen
cfg.EDUServer.InternalAPI.Connect = cfg.EDUServer.InternalAPI.Listen
cfg.FederationAPI.InternalAPI.Connect = cfg.FederationAPI.InternalAPI.Listen
cfg.FederationSender.InternalAPI.Connect = cfg.FederationSender.InternalAPI.Listen

View file

@ -303,6 +303,39 @@ type QueryServerBannedFromRoomResponse struct {
Banned bool `json:"banned"`
}
// MarshalJSON stringifies the room ID and StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
func (r *QueryBulkStateContentResponse) MarshalJSON() ([]byte, error) {
se := make(map[string]string)
for roomID, tupleToEvent := range r.Rooms {
for tuple, event := range tupleToEvent {
// use 0x1F (unit separator) as the delimiter between room ID/type/state key,
se[fmt.Sprintf("%s\x1F%s\x1F%s", roomID, tuple.EventType, tuple.StateKey)] = event
}
}
return json.Marshal(se)
}
func (r *QueryBulkStateContentResponse) UnmarshalJSON(data []byte) error {
wireFormat := make(map[string]string)
err := json.Unmarshal(data, &wireFormat)
if err != nil {
return err
}
r.Rooms = make(map[string]map[gomatrixserverlib.StateKeyTuple]string)
for roomTuple, value := range wireFormat {
fields := strings.Split(roomTuple, "\x1F")
roomID := fields[0]
if r.Rooms[roomID] == nil {
r.Rooms[roomID] = make(map[gomatrixserverlib.StateKeyTuple]string)
}
r.Rooms[roomID][gomatrixserverlib.StateKeyTuple{
EventType: fields[1],
StateKey: fields[2],
}] = value
}
return nil
}
// MarshalJSON stringifies the StateKeyTuple keys so they can be sent over the wire in HTTP API mode.
func (r *QueryCurrentStateResponse) MarshalJSON() ([]byte, error) {
se := make(map[string]*gomatrixserverlib.HeaderedEvent, len(r.StateEvents))

View file

@ -140,6 +140,9 @@ func (r *Queryer) QueryMembershipForUser(
if err != nil {
return err
}
if info == nil {
return fmt.Errorf("QueryMembershipForUser: unknown room %s", request.RoomID)
}
membershipEventNID, stillInRoom, err := r.DB.GetMembership(ctx, info.RoomNID, request.UserID)
if err != nil {

View file

@ -17,9 +17,9 @@ package storage
import (
"context"
"github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/roomserver/storage/shared"
"github.com/matrix-org/dendrite/roomserver/storage/tables"
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
)

View file

@ -7,7 +7,6 @@ import (
"fmt"
"sort"
csstables "github.com/matrix-org/dendrite/currentstateserver/storage/tables"
"github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/roomserver/api"
@ -724,6 +723,10 @@ func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey s
return nil, err
}
eventTypeNID, err := d.EventTypesTable.SelectEventTypeNID(ctx, nil, evType)
if err == sql.ErrNoRows {
// No rooms have an event of this type, otherwise we'd have an event type NID
return nil, nil
}
if err != nil {
return nil, err
}
@ -754,7 +757,7 @@ func (d *Database) GetStateEvent(ctx context.Context, roomID, evType, stateKey s
}
}
return nil, fmt.Errorf("GetStateEvent: no event type '%s' with key '%s' exists in room %s", evType, stateKey, roomID)
return nil, nil
}
// GetRoomsByMembership returns a list of room IDs matching the provided membership and user ID (as state_key).
@ -795,8 +798,90 @@ func (d *Database) GetRoomsByMembership(ctx context.Context, userID, membership
// GetBulkStateContent returns all state events which match a given room ID and a given state key tuple. Both must be satisfied for a match.
// If a tuple has the StateKey of '*' and allowWildcards=true then all state events with the EventType should be returned.
func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]csstables.StrippedEvent, error) {
return nil, fmt.Errorf("not implemented yet")
// nolint:gocyclo
func (d *Database) GetBulkStateContent(ctx context.Context, roomIDs []string, tuples []gomatrixserverlib.StateKeyTuple, allowWildcards bool) ([]tables.StrippedEvent, error) {
eventTypes := make([]string, 0, len(tuples))
for _, tuple := range tuples {
eventTypes = append(eventTypes, tuple.EventType)
}
// we don't bother failing the request if we get asked for event types we don't know about, as all that would result in is no matches which
// isn't a failure.
eventTypeNIDMap, err := d.EventTypesTable.BulkSelectEventTypeNID(ctx, eventTypes)
if err != nil {
return nil, fmt.Errorf("GetBulkStateContent: failed to map event type nids: %w", err)
}
typeNIDSet := make(map[types.EventTypeNID]bool)
for _, nid := range eventTypeNIDMap {
typeNIDSet[nid] = true
}
allowWildcard := make(map[types.EventTypeNID]bool)
eventStateKeys := make([]string, 0, len(tuples))
for _, tuple := range tuples {
if allowWildcards && tuple.StateKey == "*" {
allowWildcard[eventTypeNIDMap[tuple.EventType]] = true
continue
}
eventStateKeys = append(eventStateKeys, tuple.StateKey)
}
eventStateKeyNIDMap, err := d.EventStateKeysTable.BulkSelectEventStateKeyNID(ctx, eventStateKeys)
if err != nil {
return nil, fmt.Errorf("GetBulkStateContent: failed to map state key nids: %w", err)
}
stateKeyNIDSet := make(map[types.EventStateKeyNID]bool)
for _, nid := range eventStateKeyNIDMap {
stateKeyNIDSet[nid] = true
}
var eventNIDs []types.EventNID
eventNIDToVer := make(map[types.EventNID]gomatrixserverlib.RoomVersion)
// TODO: This feels like this is going to be really slow...
for _, roomID := range roomIDs {
roomInfo, err2 := d.RoomInfo(ctx, roomID)
if err2 != nil {
return nil, fmt.Errorf("GetBulkStateContent: failed to load room info for room %s : %w", roomID, err2)
}
// for unknown rooms or rooms which we don't have the current state, skip them.
if roomInfo == nil || roomInfo.IsStub {
continue
}
entries, err2 := d.loadStateAtSnapshot(ctx, roomInfo.StateSnapshotNID)
if err2 != nil {
return nil, fmt.Errorf("GetBulkStateContent: failed to load state for room %s : %w", roomID, err2)
}
for _, entry := range entries {
if typeNIDSet[entry.EventTypeNID] {
if allowWildcard[entry.EventTypeNID] || stateKeyNIDSet[entry.EventStateKeyNID] {
eventNIDs = append(eventNIDs, entry.EventNID)
eventNIDToVer[entry.EventNID] = roomInfo.RoomVersion
}
}
}
}
events, err := d.EventJSONTable.BulkSelectEventJSON(ctx, eventNIDs)
if err != nil {
return nil, fmt.Errorf("GetBulkStateContent: failed to load event JSON for event nids: %w", err)
}
result := make([]tables.StrippedEvent, len(events))
for i := range events {
roomVer := eventNIDToVer[events[i].EventNID]
ev, err := gomatrixserverlib.NewEventFromTrustedJSON(events[i].EventJSON, false, roomVer)
if err != nil {
return nil, fmt.Errorf("GetBulkStateContent: failed to load event JSON for event NID %v : %w", events[i].EventNID, err)
}
hev := ev.Headered(roomVer)
result[i] = tables.StrippedEvent{
EventType: ev.Type(),
RoomID: ev.RoomID(),
StateKey: *ev.StateKey(),
ContentValue: tables.ExtractContentValue(&hev),
}
}
return result, nil
}
// JoinedUsersSetInRooms returns all joined users in the rooms given, along with the count of how many times they appear.

View file

@ -6,6 +6,7 @@ import (
"github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
)
type EventJSONPair struct {
@ -155,3 +156,45 @@ type Redactions interface {
// successfully redacted the event JSON.
MarkRedactionValidated(ctx context.Context, txn *sql.Tx, redactionEventID string, validated bool) error
}
// StrippedEvent represents a stripped event for returning extracted content values.
type StrippedEvent struct {
RoomID string
EventType string
StateKey string
ContentValue string
}
// ExtractContentValue from the given state event. For example, given an m.room.name event with:
// content: { name: "Foo" }
// this returns "Foo".
func ExtractContentValue(ev *gomatrixserverlib.HeaderedEvent) string {
content := ev.Content()
key := ""
switch ev.Type() {
case gomatrixserverlib.MRoomCreate:
key = "creator"
case gomatrixserverlib.MRoomCanonicalAlias:
key = "alias"
case gomatrixserverlib.MRoomHistoryVisibility:
key = "history_visibility"
case gomatrixserverlib.MRoomJoinRules:
key = "join_rule"
case gomatrixserverlib.MRoomMember:
key = "membership"
case gomatrixserverlib.MRoomName:
key = "name"
case "m.room.avatar":
key = "url"
case "m.room.topic":
key = "topic"
case "m.room.guest_access":
key = "guest_access"
}
result := gjson.GetBytes(content, key)
if !result.Exists() {
return ""
}
// this returns the empty string if this is not a string type
return result.Str
}

View file

@ -20,7 +20,6 @@ import (
"sync"
"github.com/Shopify/sarama"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -38,7 +37,6 @@ type OutputKeyChangeEventConsumer struct {
db storage.Database
serverName gomatrixserverlib.ServerName // our server name
rsAPI roomserverAPI.RoomserverInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
keyAPI api.KeyInternalAPI
partitionToOffset map[int32]int64
partitionToOffsetMu sync.Mutex
@ -54,7 +52,6 @@ func NewOutputKeyChangeEventConsumer(
n *syncapi.Notifier,
keyAPI api.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
store storage.Database,
) *OutputKeyChangeEventConsumer {
@ -71,7 +68,6 @@ func NewOutputKeyChangeEventConsumer(
serverName: serverName,
keyAPI: keyAPI,
rsAPI: rsAPI,
stateAPI: stateAPI,
partitionToOffset: make(map[int32]int64),
partitionToOffsetMu: sync.Mutex{},
notifier: n,
@ -133,7 +129,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are now sharing rooms with which we previously were not and notify them about the joining
// users keys:
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
changed, _, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), []string{ev.RoomID()}, nil)
if err != nil {
log.WithError(err).Error("OnJoinEvent: failed to work out changed users")
return
@ -146,7 +142,7 @@ func (s *OutputKeyChangeEventConsumer) OnJoinEvent(ev *gomatrixserverlib.Headere
func (s *OutputKeyChangeEventConsumer) OnLeaveEvent(ev *gomatrixserverlib.HeaderedEvent) {
// work out who we are no longer sharing any rooms with and notify them about the leaving user
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, s.stateAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
_, left, err := syncinternal.TrackChangedUsers(context.Background(), s.rsAPI, *ev.StateKey(), nil, []string{ev.RoomID()})
if err != nil {
log.WithError(err).Error("OnLeaveEvent: failed to work out left users")
return

View file

@ -19,7 +19,6 @@ import (
"strings"
"github.com/Shopify/sarama"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/keyserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
@ -50,7 +49,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
// nolint:gocyclo
func DeviceListCatchup(
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
stateAPI currentstateAPI.CurrentStateInternalAPI,
userID string, res *types.Response, from, to types.StreamingToken,
) (hasNew bool, err error) {
@ -58,7 +56,7 @@ func DeviceListCatchup(
newlyJoinedRooms := joinedRooms(res, userID)
newlyLeftRooms := leftRooms(res)
if len(newlyJoinedRooms) > 0 || len(newlyLeftRooms) > 0 {
changed, left, err := TrackChangedUsers(ctx, rsAPI, stateAPI, userID, newlyJoinedRooms, newlyLeftRooms)
changed, left, err := TrackChangedUsers(ctx, rsAPI, userID, newlyJoinedRooms, newlyLeftRooms)
if err != nil {
return false, err
}
@ -144,7 +142,7 @@ func DeviceListCatchup(
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
// nolint:gocyclo
func TrackChangedUsers(
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
) (changed, left []string, err error) {
// process leaves first, then joins afterwards so if we join/leave/join/leave we err on the side of including users.
@ -161,8 +159,8 @@ func TrackChangedUsers(
if err != nil {
return nil, nil, err
}
var stateRes currentstateAPI.QueryBulkStateContentResponse
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
var stateRes roomserverAPI.QueryBulkStateContentResponse
err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: newlyLeftRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{
@ -202,7 +200,7 @@ func TrackChangedUsers(
if err != nil {
return nil, left, err
}
err = stateAPI.QueryBulkStateContent(ctx, &currentstateAPI.QueryBulkStateContentRequest{
err = rsAPI.QueryBulkStateContent(ctx, &roomserverAPI.QueryBulkStateContentRequest{
RoomIDs: newlyJoinedRooms,
StateTuples: []gomatrixserverlib.StateKeyTuple{
{

View file

@ -7,7 +7,6 @@ import (
"testing"
"github.com/Shopify/sarama"
stateapi "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
@ -108,25 +107,6 @@ func (s *mockRoomserverAPI) QuerySharedUsers(ctx context.Context, req *api.Query
return nil
}
type mockStateAPI struct {
rsAPI *mockRoomserverAPI
}
// QueryBulkStateContent does a bulk query for state event content in the given rooms.
func (s *mockStateAPI) QueryBulkStateContent(ctx context.Context, req *stateapi.QueryBulkStateContentRequest, res *stateapi.QueryBulkStateContentResponse) error {
var res2 api.QueryBulkStateContentResponse
err := s.rsAPI.QueryBulkStateContent(ctx, &api.QueryBulkStateContentRequest{
RoomIDs: req.RoomIDs,
AllowWildcards: req.AllowWildcards,
StateTuples: req.StateTuples,
}, &res2)
if err != nil {
return err
}
res.Rooms = res2.Rooms
return nil
}
type wantCatchup struct {
hasNew bool
changed []string
@ -200,7 +180,7 @@ func TestKeyChangeCatchupOnJoinShareNewUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -223,7 +203,7 @@ func TestKeyChangeCatchupOnLeaveShareLeftUser(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -246,7 +226,7 @@ func TestKeyChangeCatchupOnJoinShareNoNewUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -268,7 +248,7 @@ func TestKeyChangeCatchupOnLeaveShareNoUsers(t *testing.T) {
"!another:room": {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -327,7 +307,7 @@ func TestKeyChangeCatchupNoNewJoinsButMessages(t *testing.T) {
roomID: {syncingUser, existingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)
}
@ -355,7 +335,7 @@ func TestKeyChangeCatchupChangeAndLeft(t *testing.T) {
"!another:room": {syncingUser},
},
}
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken)
hasNew, err := DeviceListCatchup(context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken)
if err != nil {
t.Fatalf("Catchup returned an error: %s", err)
}
@ -441,7 +421,7 @@ func TestKeyChangeCatchupChangeAndLeftSameRoom(t *testing.T) {
},
}
hasNew, err := DeviceListCatchup(
context.Background(), &mockKeyAPI{}, rsAPI, &mockStateAPI{rsAPI}, syncingUser, syncResponse, emptyToken, newestToken,
context.Background(), &mockKeyAPI{}, rsAPI, syncingUser, syncResponse, emptyToken, newestToken,
)
if err != nil {
t.Fatalf("DeviceListCatchup returned an error: %s", err)

View file

@ -23,7 +23,6 @@ import (
"time"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/internal"
@ -42,15 +41,14 @@ type RequestPool struct {
notifier *Notifier
keyAPI keyapi.KeyInternalAPI
rsAPI roomserverAPI.RoomserverInternalAPI
stateAPI currentstateAPI.CurrentStateInternalAPI
}
// NewRequestPool makes a new RequestPool
func NewRequestPool(
db storage.Database, n *Notifier, userAPI userapi.UserInternalAPI, keyAPI keyapi.KeyInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI,
rsAPI roomserverAPI.RoomserverInternalAPI,
) *RequestPool {
return &RequestPool{db, userAPI, n, keyAPI, rsAPI, stateAPI}
return &RequestPool{db, userAPI, n, keyAPI, rsAPI}
}
// OnIncomingSyncRequest is called when a client makes a /sync request. This function MUST be
@ -267,7 +265,7 @@ func (rp *RequestPool) currentSyncForUser(req syncRequest, latestPos types.Strea
func (rp *RequestPool) appendDeviceLists(
data *types.Response, userID string, since, to types.StreamingToken,
) (*types.Response, error) {
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, rp.stateAPI, userID, data, since, to)
_, err := internal.DeviceListCatchup(context.Background(), rp.keyAPI, rp.rsAPI, userID, data, since, to)
if err != nil {
return nil, fmt.Errorf("internal.DeviceListCatchup: %w", err)
}

View file

@ -21,7 +21,6 @@ import (
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
currentstateapi "github.com/matrix-org/dendrite/currentstateserver/api"
"github.com/matrix-org/dendrite/internal/config"
keyapi "github.com/matrix-org/dendrite/keyserver/api"
"github.com/matrix-org/dendrite/roomserver/api"
@ -42,7 +41,6 @@ func AddPublicRoutes(
userAPI userapi.UserInternalAPI,
rsAPI api.RoomserverInternalAPI,
keyAPI keyapi.KeyInternalAPI,
currentStateAPI currentstateapi.CurrentStateInternalAPI,
federation *gomatrixserverlib.FederationClient,
cfg *config.SyncAPI,
) {
@ -62,11 +60,11 @@ func AddPublicRoutes(
logrus.WithError(err).Panicf("failed to start notifier")
}
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI, currentStateAPI)
requestPool := sync.NewRequestPool(syncDB, notifier, userAPI, keyAPI, rsAPI)
keyChangeConsumer := consumers.NewOutputKeyChangeEventConsumer(
cfg.Matrix.ServerName, string(cfg.Matrix.Kafka.TopicFor(config.TopicOutputKeyChangeEvent)),
consumer, notifier, keyAPI, rsAPI, currentStateAPI, syncDB,
consumer, notifier, keyAPI, rsAPI, syncDB,
)
if err = keyChangeConsumer.Start(); err != nil {
logrus.WithError(err).Panicf("failed to start key change consumer")