Merge branch 'nats' into add-nats-support

This commit is contained in:
Neil Alexander 2021-12-16 11:40:36 +00:00
commit 3f44499ad1
No known key found for this signature in database
GPG key ID: A02A2019A2BB0944
48 changed files with 371 additions and 235 deletions

1
.github/CODEOWNERS vendored Normal file
View file

@ -0,0 +1 @@
* @matrix-org/dendrite-core

3
.gitignore vendored
View file

@ -60,3 +60,6 @@ cmd/dendrite-demo-yggdrasil/embed/fs*.go
# Test dependencies
test/wasm/node_modules
media_store/

View file

@ -135,6 +135,7 @@ func generateAppServiceAccount(
AccessToken: as.ASToken,
DeviceID: &as.SenderLocalpart,
DeviceDisplayName: &as.SenderLocalpart,
NoDeviceListUpdate: true,
}, &devRes)
return err
}

View file

@ -3,7 +3,7 @@ services:
# PostgreSQL is needed for both polylith and monolith modes.
postgres:
hostname: postgres
image: postgres:11
image: postgres:14
restart: always
volumes:
- ./postgres/create_db.sh:/docker-entrypoint-initdb.d/20-create_db.sh

View file

@ -13,6 +13,7 @@ import (
"net"
"net/http"
"os"
"strings"
"sync"
"time"
@ -87,15 +88,15 @@ func (m *DendriteMonolith) SetMulticastEnabled(enabled bool) {
m.PineconeMulticast.Start()
} else {
m.PineconeMulticast.Stop()
m.DisconnectType(pineconeRouter.PeerTypeMulticast)
m.DisconnectType(int(pineconeRouter.PeerTypeMulticast))
}
}
func (m *DendriteMonolith) SetStaticPeer(uri string) {
m.staticPeerMutex.Lock()
m.staticPeerURI = uri
m.staticPeerURI = strings.TrimSpace(uri)
m.staticPeerMutex.Unlock()
m.DisconnectType(pineconeRouter.PeerTypeRemote)
m.DisconnectType(int(pineconeRouter.PeerTypeRemote))
if uri != "" {
go func() {
m.staticPeerAttempt <- struct{}{}
@ -105,7 +106,7 @@ func (m *DendriteMonolith) SetStaticPeer(uri string) {
func (m *DendriteMonolith) DisconnectType(peertype int) {
for _, p := range m.PineconeRouter.Peers() {
if peertype == p.PeerType {
if int(peertype) == p.PeerType {
m.PineconeRouter.Disconnect(types.SwitchPortID(p.Port), nil)
}
}
@ -133,7 +134,11 @@ func (m *DendriteMonolith) Conduit(zone string, peertype int) (*Conduit, error)
for i := 1; i <= 10; i++ {
logrus.Errorf("Attempting authenticated connect (attempt %d)", i)
var err error
conduit.port, err = m.PineconeRouter.AuthenticatedConnect(l, zone, peertype, true)
conduit.port, err = m.PineconeRouter.Connect(
l,
pineconeRouter.ConnectionZone(zone),
pineconeRouter.ConnectionPeerType(peertype),
)
switch err {
case io.ErrClosedPipe:
logrus.Errorf("Authenticated connect failed due to closed pipe (attempt %d)", i)
@ -195,19 +200,31 @@ func (m *DendriteMonolith) RegisterDevice(localpart, deviceID string) (string, e
}
func (m *DendriteMonolith) staticPeerConnect() {
connected := map[string]bool{} // URI -> connected?
attempt := func() {
if m.PineconeRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
m.staticPeerMutex.RLock()
uri := m.staticPeerURI
m.staticPeerMutex.RUnlock()
if uri == "" {
return
}
if err := conn.ConnectToPeer(m.PineconeRouter, uri); err != nil {
for k := range connected {
delete(connected, k)
}
for _, uri := range strings.Split(uri, ",") {
connected[strings.TrimSpace(uri)] = false
}
for _, info := range m.PineconeRouter.Peers() {
connected[info.URI] = true
}
for k, online := range connected {
if !online {
if err := conn.ConnectToPeer(m.PineconeRouter, k); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
}
for {
select {
case <-m.processContext.Context().Done():
@ -271,7 +288,7 @@ func (m *DendriteMonolith) Start() {
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-syncapi.db", m.StorageDirectory, prefix))
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-roomserver.db", m.StorageDirectory, prefix))
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-keyserver.db", m.StorageDirectory, prefix))
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-federationapi.db", m.StorageDirectory, prefix))
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-federationsender.db", m.StorageDirectory, prefix))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/%s-appservice.db", m.StorageDirectory, prefix))
cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory))
cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/media", m.CacheDirectory))
@ -292,7 +309,7 @@ func (m *DendriteMonolith) Start() {
rsAPI := roomserver.NewInternalAPI(base)
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, true,
base, federation, rsAPI, base.Caches, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
@ -307,8 +324,7 @@ func (m *DendriteMonolith) Start() {
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationAPI(fsAPI)
rsAPI.SetKeyring(keyRing)
rsAPI.SetFederationAPI(fsAPI, keyRing)
monolith := setup.Monolith{
Config: base.Cfg,

View file

@ -93,7 +93,7 @@ func (m *DendriteMonolith) Start() {
cfg.SyncAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-syncapi.db", m.StorageDirectory))
cfg.RoomServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-roomserver.db", m.StorageDirectory))
cfg.KeyServer.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-keyserver.db", m.StorageDirectory))
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-federationapi.db", m.StorageDirectory))
cfg.FederationAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-federationsender.db", m.StorageDirectory))
cfg.AppServiceAPI.Database.ConnectionString = config.DataSource(fmt.Sprintf("file:%s/dendrite-p2p-appservice.db", m.StorageDirectory))
cfg.MediaAPI.BasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
cfg.MediaAPI.AbsBasePath = config.Path(fmt.Sprintf("%s/tmp", m.StorageDirectory))
@ -113,7 +113,7 @@ func (m *DendriteMonolith) Start() {
rsAPI := roomserver.NewInternalAPI(base)
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, true,
base, federation, rsAPI, base.Caches, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, federation)
@ -129,8 +129,7 @@ func (m *DendriteMonolith) Start() {
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsAPI.SetFederationAPI(fsAPI)
rsAPI.SetKeyring(keyRing)
rsAPI.SetFederationAPI(fsAPI, keyRing)
monolith := setup.Monolith{
Config: base.Cfg,

View file

@ -70,11 +70,11 @@ func VerifyUserFromRequest(
jsonErr := jsonerror.InternalServerError()
return nil, &jsonErr
}
if res.Err != nil {
if forbidden, ok := res.Err.(*api.ErrorForbidden); ok {
if res.Err != "" {
if strings.HasPrefix(strings.ToLower(res.Err), "forbidden:") { // TODO: use actual error and no string comparison
return nil, &util.JSONResponse{
Code: http.StatusForbidden,
JSON: jsonerror.Forbidden(forbidden.Message),
JSON: jsonerror.Forbidden(res.Err),
}
}
}

View file

@ -17,6 +17,7 @@ package auth
import (
"context"
"net/http"
"strings"
"github.com/matrix-org/dendrite/clientapi/jsonerror"
"github.com/matrix-org/dendrite/clientapi/userutil"
@ -48,7 +49,8 @@ func (t *LoginTypePassword) Request() interface{} {
func (t *LoginTypePassword) Login(ctx context.Context, req interface{}) (*Login, *util.JSONResponse) {
r := req.(*PasswordRequest)
username := r.Username()
// Squash username to all lowercase letters
username := strings.ToLower(r.Username())
if username == "" {
return nil, &util.JSONResponse{
Code: http.StatusUnauthorized,

View file

@ -62,12 +62,14 @@ func CreateKeyBackupVersion(req *http.Request, userAPI userapi.UserInternalAPI,
return *resErr
}
var performKeyBackupResp userapi.PerformKeyBackupResponse
userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
if err := userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
UserID: device.UserID,
Version: "",
AuthData: kb.AuthData,
Algorithm: kb.Algorithm,
}, &performKeyBackupResp)
}, &performKeyBackupResp); err != nil {
return jsonerror.InternalServerError()
}
if performKeyBackupResp.Error != "" {
if performKeyBackupResp.BadInput {
return util.JSONResponse{
@ -123,12 +125,14 @@ func ModifyKeyBackupVersionAuthData(req *http.Request, userAPI userapi.UserInter
return *resErr
}
var performKeyBackupResp userapi.PerformKeyBackupResponse
userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
if err := userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
UserID: device.UserID,
Version: version,
AuthData: kb.AuthData,
Algorithm: kb.Algorithm,
}, &performKeyBackupResp)
}, &performKeyBackupResp); err != nil {
return jsonerror.InternalServerError()
}
if performKeyBackupResp.Error != "" {
if performKeyBackupResp.BadInput {
return util.JSONResponse{
@ -157,11 +161,13 @@ func ModifyKeyBackupVersionAuthData(req *http.Request, userAPI userapi.UserInter
// Implements DELETE /_matrix/client/r0/room_keys/version/{version}
func DeleteKeyBackupVersion(req *http.Request, userAPI userapi.UserInternalAPI, device *userapi.Device, version string) util.JSONResponse {
var performKeyBackupResp userapi.PerformKeyBackupResponse
userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
if err := userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
UserID: device.UserID,
Version: version,
DeleteBackup: true,
}, &performKeyBackupResp)
}, &performKeyBackupResp); err != nil {
return jsonerror.InternalServerError()
}
if performKeyBackupResp.Error != "" {
if performKeyBackupResp.BadInput {
return util.JSONResponse{
@ -191,11 +197,13 @@ func UploadBackupKeys(
req *http.Request, userAPI userapi.UserInternalAPI, device *userapi.Device, version string, keys *keyBackupSessionRequest,
) util.JSONResponse {
var performKeyBackupResp userapi.PerformKeyBackupResponse
userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
if err := userAPI.PerformKeyBackup(req.Context(), &userapi.PerformKeyBackupRequest{
UserID: device.UserID,
Version: version,
Keys: *keys,
}, &performKeyBackupResp)
}, &performKeyBackupResp); err != nil && performKeyBackupResp.Error == "" {
return jsonerror.InternalServerError()
}
if performKeyBackupResp.Error != "" {
if performKeyBackupResp.BadInput {
return util.JSONResponse{

View file

@ -61,7 +61,7 @@ func Setup(
extRoomsProvider api.ExtraPublicRoomsProvider,
mscCfg *config.MSCs,
) {
rateLimits := newRateLimits(&cfg.RateLimiting)
rateLimits := httputil.NewRateLimits(&cfg.RateLimiting)
userInteractiveAuth := auth.NewUserInteractive(accountDB.GetAccountByPassword, cfg)
unstableFeatures := map[string]bool{
@ -127,7 +127,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/join/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -143,7 +143,7 @@ func Setup(
if mscCfg.Enabled("msc2753") {
r0mux.Handle("/peek/{roomIDOrAlias}",
httputil.MakeAuthAPI(gomatrixserverlib.Peek, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -163,7 +163,7 @@ func Setup(
).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/join",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -177,7 +177,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/leave",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -211,7 +211,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomID}/invite",
httputil.MakeAuthAPI("membership", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -329,14 +329,14 @@ func Setup(
).Methods(http.MethodPut, http.MethodOptions)
r0mux.Handle("/register", httputil.MakeExternalAPI("register", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return Register(req, userAPI, accountDB, cfg)
})).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/register/available", httputil.MakeExternalAPI("registerAvailable", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return RegisterAvailable(req, cfg, accountDB)
@ -410,7 +410,7 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/typing/{userID}",
httputil.MakeAuthAPI("rooms_typing", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -466,7 +466,7 @@ func Setup(
r0mux.Handle("/account/whoami",
httputil.MakeAuthAPI("whoami", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return Whoami(req, device)
@ -475,7 +475,7 @@ func Setup(
r0mux.Handle("/account/password",
httputil.MakeAuthAPI("password", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return Password(req, userAPI, accountDB, device, cfg)
@ -484,7 +484,7 @@ func Setup(
r0mux.Handle("/account/deactivate",
httputil.MakeAuthAPI("deactivate", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return Deactivate(req, userInteractiveAuth, userAPI, device)
@ -495,7 +495,7 @@ func Setup(
r0mux.Handle("/login",
httputil.MakeExternalAPI("login", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return Login(req, accountDB, userAPI, cfg)
@ -552,7 +552,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/avatar_url",
httputil.MakeAuthAPI("profile_avatar_url", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -577,7 +577,7 @@ func Setup(
r0mux.Handle("/profile/{userID}/displayname",
httputil.MakeAuthAPI("profile_displayname", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -617,7 +617,7 @@ func Setup(
// Element logs get flooded unless this is handled
r0mux.Handle("/presence/{userID}/status",
httputil.MakeExternalAPI("presence", func(req *http.Request) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
// TODO: Set presence (probably the responsibility of a presence server not clientapi)
@ -630,7 +630,7 @@ func Setup(
r0mux.Handle("/voip/turnServer",
httputil.MakeAuthAPI("turn_server", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return RequestTurnServer(req, device, cfg)
@ -709,7 +709,7 @@ func Setup(
r0mux.Handle("/user/{userID}/openid/request_token",
httputil.MakeAuthAPI("openid_request_token", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -722,7 +722,7 @@ func Setup(
r0mux.Handle("/user_directory/search",
httputil.MakeAuthAPI("userdirectory_search", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
postContent := struct {
@ -767,7 +767,7 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/read_markers",
httputil.MakeAuthAPI("rooms_read_markers", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -780,7 +780,7 @@ func Setup(
r0mux.Handle("/rooms/{roomID}/forget",
httputil.MakeAuthAPI("rooms_forget", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))
@ -884,7 +884,7 @@ func Setup(
r0mux.Handle("/capabilities",
httputil.MakeAuthAPI("capabilities", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return GetCapabilities(req, rsAPI)
@ -1100,7 +1100,7 @@ func Setup(
).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/rooms/{roomId}/receipt/{receiptType}/{eventId}",
httputil.MakeAuthAPI(gomatrixserverlib.Join, userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.rateLimit(req); r != nil {
if r := rateLimits.Limit(req); r != nil {
return *r
}
vars, err := httputil.URLDecodeMapValues(mux.Vars(req))

View file

@ -157,10 +157,10 @@ func main() {
asAPI := appservice.NewInternalAPI(&base.Base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI(
&base.Base, federation, rsAPI, base.Base.Caches, true,
&base.Base, federation, rsAPI, base.Base.Caches, nil, true,
)
keyRing := fsAPI.KeyRing()
rsAPI.SetFederationAPI(fsAPI)
rsAPI.SetFederationAPI(fsAPI, keyRing)
provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI)
err = provider.Start()
if err != nil {

View file

@ -34,7 +34,12 @@ func ConnectToPeer(pRouter *pineconeRouter.Router, peer string) error {
if parent == nil {
return fmt.Errorf("failed to wrap connection")
}
_, err := pRouter.AuthenticatedConnect(parent, "static", pineconeRouter.PeerTypeRemote, true)
_, err := pRouter.Connect(
parent,
pineconeRouter.ConnectionZone("static"),
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
pineconeRouter.ConnectionURI(peer),
)
return err
}

View file

@ -26,6 +26,7 @@ import (
"net"
"net/http"
"os"
"strings"
"time"
"github.com/gorilla/mux"
@ -61,7 +62,7 @@ import (
var (
instanceName = flag.String("name", "dendrite-p2p-pinecone", "the name of this P2P demo instance")
instancePort = flag.Int("port", 8008, "the port that the client API will listen on")
instancePeer = flag.String("peer", "", "the static Pinecone peer to connect to")
instancePeer = flag.String("peer", "", "the static Pinecone peers to connect to, comma separated-list")
instanceListen = flag.String("listen", ":0", "the port Pinecone peers can connect to")
)
@ -109,9 +110,12 @@ func main() {
continue
}
port, err := pRouter.AuthenticatedConnect(conn, "", pineconeRouter.PeerTypeRemote, true)
port, err := pRouter.Connect(
conn,
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
)
if err != nil {
logrus.WithError(err).Error("pSwitch.AuthenticatedConnect failed")
logrus.WithError(err).Error("pSwitch.Connect failed")
continue
}
@ -124,17 +128,25 @@ func main() {
pMulticast.Start()
connectToStaticPeer := func() {
attempt := func() {
if pRouter.PeerCount(pineconeRouter.PeerTypeRemote) == 0 {
uri := *instancePeer
if uri == "" {
return
connected := map[string]bool{} // URI -> connected?
for _, uri := range strings.Split(*instancePeer, ",") {
connected[strings.TrimSpace(uri)] = false
}
if err := conn.ConnectToPeer(pRouter, uri); err != nil {
attempt := func() {
for k := range connected {
connected[k] = false
}
for _, info := range pRouter.Peers() {
connected[info.URI] = true
}
for k, online := range connected {
if !online {
if err := conn.ConnectToPeer(pRouter, k); err != nil {
logrus.WithError(err).Error("Failed to connect to static peer")
}
}
}
}
for {
attempt()
time.Sleep(time.Second * 5)
@ -172,7 +184,7 @@ func main() {
rsComponent := roomserver.NewInternalAPI(base)
rsAPI := rsComponent
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, true,
base, federation, rsAPI, base.Caches, keyRing, true,
)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
@ -185,8 +197,7 @@ func main() {
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsComponent.SetFederationAPI(fsAPI)
rsComponent.SetKeyring(keyRing)
rsComponent.SetFederationAPI(fsAPI, keyRing)
monolith := setup.Monolith{
Config: base.Cfg,
@ -229,7 +240,11 @@ func main() {
return
}
conn := conn.WrapWebSocketConn(c)
if _, err = pRouter.AuthenticatedConnect(conn, "websocket", pineconeRouter.PeerTypeRemote, true); err != nil {
if _, err = pRouter.Connect(
conn,
pineconeRouter.ConnectionZone("websocket"),
pineconeRouter.ConnectionPeerType(pineconeRouter.PeerTypeRemote),
); err != nil {
logrus.WithError(err).Error("Failed to connect WebSocket peer to Pinecone switch")
}
})

View file

@ -117,11 +117,10 @@ func main() {
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
rsAPI.SetAppserviceAPI(asAPI)
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, true,
base, federation, rsAPI, base.Caches, keyRing, true,
)
rsComponent.SetFederationAPI(fsAPI)
rsComponent.SetKeyring(keyRing)
rsComponent.SetFederationAPI(fsAPI, keyRing)
monolith := setup.Monolith{
Config: base.Cfg,

View file

@ -67,6 +67,7 @@ func main() {
cfg.MediaAPI.InternalAPI.Connect = httpAPIAddr
cfg.RoomServer.InternalAPI.Connect = httpAPIAddr
cfg.SyncAPI.InternalAPI.Connect = httpAPIAddr
cfg.UserAPI.InternalAPI.Connect = httpAPIAddr
options = append(options, basepkg.UseHTTPAPIs)
}
@ -90,7 +91,7 @@ func main() {
}
fsAPI := federationapi.NewInternalAPI(
base, federation, rsAPI, base.Caches, false,
base, federation, rsAPI, base.Caches, nil, false,
)
if base.UseHTTPAPIs {
federationapi.AddInternalRoutes(base.InternalAPIMux, fsAPI)
@ -100,22 +101,43 @@ func main() {
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this dependency
rsImpl.SetFederationAPI(fsAPI)
rsImpl.SetFederationAPI(fsAPI, keyRing)
keyAPI := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
userAPI := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
keyAPI.SetUserAPI(userAPI)
keyImpl := keyserver.NewInternalAPI(base, &base.Cfg.KeyServer, fsAPI)
keyAPI := keyImpl
if base.UseHTTPAPIs {
keyserver.AddInternalRoutes(base.InternalAPIMux, keyAPI)
keyAPI = base.KeyServerHTTPClient()
}
userImpl := userapi.NewInternalAPI(accountDB, &cfg.UserAPI, cfg.Derived.ApplicationServices, keyAPI)
userAPI := userImpl
if base.UseHTTPAPIs {
userapi.AddInternalRoutes(base.InternalAPIMux, userAPI)
userAPI = base.UserAPIClient()
}
if traceInternal {
userAPI = &uapi.UserInternalAPITrace{
Impl: userAPI,
}
}
// needs to be after the SetUserAPI call above
// TODO: This should use userAPI, not userImpl, but the appservice setup races with
// the listeners and panics at startup if it tries to create appservice accounts
// before the listeners are up.
asAPI := appservice.NewInternalAPI(base, userImpl, rsAPI)
if base.UseHTTPAPIs {
keyserver.AddInternalRoutes(base.InternalAPIMux, keyAPI)
keyAPI = base.KeyServerHTTPClient()
appservice.AddInternalRoutes(base.InternalAPIMux, asAPI)
asAPI = base.AppserviceHTTPClient()
}
// The underlying roomserver implementation needs to be able to call the fedsender.
// This is different to rsAPI which can be the http client which doesn't need this
// dependency. Other components also need updating after their dependencies are up.
rsImpl.SetFederationAPI(fsAPI, keyRing)
rsImpl.SetAppserviceAPI(asAPI)
keyImpl.SetUserAPI(userAPI)
eduInputAPI := eduserver.NewInternalAPI(
base, cache.New(), userAPI,
)
@ -124,13 +146,6 @@ func main() {
eduInputAPI = base.EDUServerClient()
}
asAPI := appservice.NewInternalAPI(base, userAPI, rsAPI)
if base.UseHTTPAPIs {
appservice.AddInternalRoutes(base.InternalAPIMux, asAPI)
asAPI = base.AppserviceHTTPClient()
}
rsAPI.SetAppserviceAPI(asAPI)
monolith := setup.Monolith{
Config: base.Cfg,
AccountDB: accountDB,

View file

@ -35,6 +35,9 @@ func FederationAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
&base.Cfg.MSCs, nil,
)
intAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, nil, true)
federationapi.AddInternalRoutes(base.InternalAPIMux, intAPI)
base.SetupAndServeHTTP(
base.Cfg.FederationAPI.InternalAPI.Listen,
base.Cfg.FederationAPI.ExternalAPI.Listen,

View file

@ -24,7 +24,7 @@ func MediaAPI(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
userAPI := base.UserAPIClient()
client := base.CreateClient()
mediaapi.AddPublicRoutes(base.PublicMediaAPIMux, &base.Cfg.MediaAPI, userAPI, client)
mediaapi.AddPublicRoutes(base.PublicMediaAPIMux, &base.Cfg.MediaAPI, &base.Cfg.ClientAPI.RateLimiting, userAPI, client)
base.SetupAndServeHTTP(
base.Cfg.MediaAPI.InternalAPI.Listen,

View file

@ -24,7 +24,7 @@ func RoomServer(base *basepkg.BaseDendrite, cfg *config.Dendrite) {
asAPI := base.AppserviceHTTPClient()
fsAPI := base.FederationAPIHTTPClient()
rsAPI := roomserver.NewInternalAPI(base)
rsAPI.SetFederationAPI(fsAPI)
rsAPI.SetFederationAPI(fsAPI, fsAPI.KeyRing())
rsAPI.SetAppserviceAPI(asAPI)
roomserver.AddInternalRoutes(base.InternalAPIMux, rsAPI)

View file

@ -197,9 +197,8 @@ func startup() {
base, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
rsAPI.SetFederationAPI(fedSenderAPI)
rsAPI.SetKeyring(keyRing)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
monolith := setup.Monolith{
Config: base.Cfg,

View file

@ -209,9 +209,8 @@ func main() {
base, userAPI, rsAPI,
)
rsAPI.SetAppserviceAPI(asQuery)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, true)
rsAPI.SetFederationAPI(fedSenderAPI)
rsAPI.SetKeyring(keyRing)
fedSenderAPI := federationapi.NewInternalAPI(base, federation, rsAPI, base.Caches, keyRing, true)
rsAPI.SetFederationAPI(fedSenderAPI, keyRing)
p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation)
monolith := setup.Monolith{

View file

@ -73,6 +73,7 @@ func NewInternalAPI(
federation *gomatrixserverlib.FederationClient,
rsAPI roomserverAPI.RoomserverInternalAPI,
caches *caching.Caches,
keyRing *gomatrixserverlib.KeyRing,
resetBlacklist bool,
) api.FederationInternalAPI {
cfg := &base.Cfg.FederationAPI
@ -125,5 +126,5 @@ func NewInternalAPI(
logrus.WithError(err).Panic("failed to start key server consumer")
}
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues)
return internal.NewFederationInternalAPI(federationDB, cfg, rsAPI, federation, stats, caches, queues, keyRing)
}

View file

@ -94,7 +94,7 @@ func TestMain(m *testing.M) {
// Finally, build the server key APIs.
sbase := base.NewBaseDendrite(cfg, "Monolith", base.NoCacheMetrics)
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, true)
s.api = NewInternalAPI(sbase, s.fedclient, nil, s.cache, nil, true)
}
// Now that we have built our server key APIs, start the

View file

@ -39,13 +39,15 @@ func NewFederationInternalAPI(
statistics *statistics.Statistics,
caches *caching.Caches,
queues *queue.OutgoingQueues,
keyRing *gomatrixserverlib.KeyRing,
) *FederationInternalAPI {
serverKeyDB, err := cache.NewKeyDatabase(db, caches)
if err != nil {
logrus.WithError(err).Panicf("failed to set up caching wrapper for server key database")
}
keyRing := &gomatrixserverlib.KeyRing{
if keyRing == nil {
keyRing = &gomatrixserverlib.KeyRing{
KeyFetchers: []gomatrixserverlib.KeyFetcher{},
KeyDatabase: serverKeyDB,
}
@ -92,6 +94,7 @@ func NewFederationInternalAPI(
"num_public_keys": len(ps.Keys),
}).Info("Enabled perspective key fetcher")
}
}
return &FederationInternalAPI{
db: db,

2
go.mod
View file

@ -40,7 +40,7 @@ require (
github.com/matrix-org/go-sqlite3-js v0.0.0-20210709140738-b0d1ba599a6d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d
github.com/matrix-org/pinecone v0.0.0-20211216094739-095c5ea64d02
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.8
github.com/morikuni/aec v1.0.0 // indirect

4
go.sum
View file

@ -990,8 +990,8 @@ github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16 h1:ZtO5uywdd5d
github.com/matrix-org/gomatrix v0.0.0-20210324163249-be2af5ef2e16/go.mod h1:/gBX06Kw0exX1HrwmoBibFA98yBk/jxKpGVeyQbff+s=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2 h1:RFsBN3509Ql6NJ7TDVkcKoN3bb/tmqUqzur5c0AwIHQ=
github.com/matrix-org/gomatrixserverlib v0.0.0-20211115192839-15a64d244aa2/go.mod h1:rB8tBUUUo1rzUqpzklRDSooxZ6YMhoaEPx4SO5fGeUc=
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d h1:V1b6GZVvL95qTkjYSEWH9Pja6c0WcJKBt2MlAILlw+Q=
github.com/matrix-org/pinecone v0.0.0-20211116111603-febf3501584d/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/pinecone v0.0.0-20211216094739-095c5ea64d02 h1:tLn95Nqq3KPOZAjogGZTKMEkn4mMIzKu09biRTz/Ack=
github.com/matrix-org/pinecone v0.0.0-20211216094739-095c5ea64d02/go.mod h1:r6dsL+ylE0yXe/7zh8y/Bdh6aBYI1r+u4yZni9A4iyk=
github.com/matrix-org/util v0.0.0-20190711121626-527ce5ddefc7/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 h1:eCEHXWDv9Rm335MSuB49mFUK44bwZPFSDde3ORE3syk=
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4/go.mod h1:vVQlW/emklohkZnOPwD3LrZUBqdfsbiyO3p1lNV8F6U=

View file

@ -23,6 +23,7 @@ import (
"net/url"
"strings"
"github.com/matrix-org/dendrite/userapi/api"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
)
@ -72,6 +73,9 @@ func PostJSON(
var errorBody struct {
Message string `json:"message"`
}
if _, ok := response.(*api.PerformKeyBackupResponse); ok { // TODO: remove this, once cross-boundary errors are a thing
return nil
}
if msgerr := json.NewDecoder(res.Body).Decode(&errorBody); msgerr == nil {
return fmt.Errorf("internal API: %d from %s: %s", res.StatusCode, apiURL, errorBody.Message)
}

View file

@ -1,4 +1,4 @@
package routing
package httputil
import (
"net/http"
@ -10,7 +10,7 @@ import (
"github.com/matrix-org/util"
)
type rateLimits struct {
type RateLimits struct {
limits map[string]chan struct{}
limitsMutex sync.RWMutex
cleanMutex sync.RWMutex
@ -19,8 +19,8 @@ type rateLimits struct {
cooloffDuration time.Duration
}
func newRateLimits(cfg *config.RateLimiting) *rateLimits {
l := &rateLimits{
func NewRateLimits(cfg *config.RateLimiting) *RateLimits {
l := &RateLimits{
limits: make(map[string]chan struct{}),
enabled: cfg.Enabled,
requestThreshold: cfg.Threshold,
@ -32,7 +32,7 @@ func newRateLimits(cfg *config.RateLimiting) *rateLimits {
return l
}
func (l *rateLimits) clean() {
func (l *RateLimits) clean() {
for {
// On a 30 second interval, we'll take an exclusive write
// lock of the entire map and see if any of the channels are
@ -52,7 +52,7 @@ func (l *rateLimits) clean() {
}
}
func (l *rateLimits) rateLimit(req *http.Request) *util.JSONResponse {
func (l *RateLimits) Limit(req *http.Request) *util.JSONResponse {
// If rate limiting is disabled then do nothing.
if !l.enabled {
return nil

View file

@ -26,7 +26,9 @@ import (
// AddPublicRoutes sets up and registers HTTP handlers for the MediaAPI component.
func AddPublicRoutes(
router *mux.Router, cfg *config.MediaAPI,
router *mux.Router,
cfg *config.MediaAPI,
rateLimit *config.RateLimiting,
userAPI userapi.UserInternalAPI,
client *gomatrixserverlib.Client,
) {
@ -36,6 +38,6 @@ func AddPublicRoutes(
}
routing.Setup(
router, cfg, mediaDB, userAPI, client,
router, cfg, rateLimit, mediaDB, userAPI, client,
)
}

View file

@ -15,16 +15,16 @@
package routing
import (
"encoding/json"
"net/http"
"strings"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/gorilla/mux"
"github.com/matrix-org/dendrite/internal/httputil"
"github.com/matrix-org/dendrite/mediaapi/storage"
"github.com/matrix-org/dendrite/mediaapi/types"
"github.com/matrix-org/dendrite/setup/config"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/matrix-org/util"
"github.com/prometheus/client_golang/prometheus"
@ -32,6 +32,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// configResponse is the response to GET /_matrix/media/r0/config
// https://matrix.org/docs/spec/client_server/latest#get-matrix-media-r0-config
type configResponse struct {
UploadSize config.FileSizeBytes `json:"m.upload.size"`
}
// Setup registers the media API HTTP handlers
//
// Due to Setup being used to call many other functions, a gocyclo nolint is
@ -40,10 +46,13 @@ import (
func Setup(
publicAPIMux *mux.Router,
cfg *config.MediaAPI,
rateLimit *config.RateLimiting,
db storage.Database,
userAPI userapi.UserInternalAPI,
client *gomatrixserverlib.Client,
) {
rateLimits := httputil.NewRateLimits(rateLimit)
r0mux := publicAPIMux.PathPrefix("/r0").Subrouter()
v1mux := publicAPIMux.PathPrefix("/v1").Subrouter()
@ -54,31 +63,46 @@ func Setup(
uploadHandler := httputil.MakeAuthAPI(
"upload", userAPI,
func(req *http.Request, dev *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return Upload(req, cfg, dev, db, activeThumbnailGeneration)
},
)
configHandler := httputil.MakeAuthAPI("config", userAPI, func(req *http.Request, device *userapi.Device) util.JSONResponse {
if r := rateLimits.Limit(req); r != nil {
return *r
}
return util.JSONResponse{
Code: http.StatusOK,
JSON: configResponse{UploadSize: *cfg.MaxFileSizeBytes},
}
})
r0mux.Handle("/upload", uploadHandler).Methods(http.MethodPost, http.MethodOptions)
r0mux.Handle("/config", configHandler).Methods(http.MethodGet, http.MethodOptions)
v1mux.Handle("/upload", uploadHandler).Methods(http.MethodPost, http.MethodOptions)
activeRemoteRequests := &types.ActiveRemoteRequests{
MXCToResult: map[string]*types.RemoteRequestResult{},
}
downloadHandler := makeDownloadAPI("download", cfg, db, client, activeRemoteRequests, activeThumbnailGeneration)
downloadHandler := makeDownloadAPI("download", cfg, rateLimits, db, client, activeRemoteRequests, activeThumbnailGeneration)
r0mux.Handle("/download/{serverName}/{mediaId}", downloadHandler).Methods(http.MethodGet, http.MethodOptions)
r0mux.Handle("/download/{serverName}/{mediaId}/{downloadName}", downloadHandler).Methods(http.MethodGet, http.MethodOptions)
v1mux.Handle("/download/{serverName}/{mediaId}", downloadHandler).Methods(http.MethodGet, http.MethodOptions) // TODO: remove when synapse is fixed
v1mux.Handle("/download/{serverName}/{mediaId}/{downloadName}", downloadHandler).Methods(http.MethodGet, http.MethodOptions) // TODO: remove when synapse is fixed
r0mux.Handle("/thumbnail/{serverName}/{mediaId}",
makeDownloadAPI("thumbnail", cfg, db, client, activeRemoteRequests, activeThumbnailGeneration),
makeDownloadAPI("thumbnail", cfg, rateLimits, db, client, activeRemoteRequests, activeThumbnailGeneration),
).Methods(http.MethodGet, http.MethodOptions)
}
func makeDownloadAPI(
name string,
cfg *config.MediaAPI,
rateLimits *httputil.RateLimits,
db storage.Database,
client *gomatrixserverlib.Client,
activeRemoteRequests *types.ActiveRemoteRequests,
@ -99,6 +123,16 @@ func makeDownloadAPI(
// Content-Type will be overridden in case of returning file data, else we respond with JSON-formatted errors
w.Header().Set("Content-Type", "application/json")
// Ratelimit requests
if r := rateLimits.Limit(req); r != nil {
if err := json.NewEncoder(w).Encode(r); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusTooManyRequests)
return
}
vars, _ := httputil.URLDecodeMapValues(mux.Vars(req))
serverName := gomatrixserverlib.ServerName(vars["serverName"])

View file

@ -12,9 +12,8 @@ import (
type RoomserverInternalAPI interface {
// needed to avoid chicken and egg scenario when setting up the
// interdependencies between the roomserver and other input APIs
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI)
SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing)
SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI)
SetKeyring(keyRing *gomatrixserverlib.KeyRing)
InputRoomEvents(
ctx context.Context,

View file

@ -17,12 +17,8 @@ type RoomserverInternalAPITrace struct {
Impl RoomserverInternalAPI
}
func (t *RoomserverInternalAPITrace) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
t.Impl.SetKeyring(keyRing)
}
func (t *RoomserverInternalAPITrace) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) {
t.Impl.SetFederationAPI(fsAPI)
func (t *RoomserverInternalAPITrace) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) {
t.Impl.SetFederationAPI(fsAPI, keyRing)
}
func (t *RoomserverInternalAPITrace) SetAppserviceAPI(asAPI asAPI.AppServiceQueryAPI) {

View file

@ -78,18 +78,12 @@ func NewRoomserverAPI(
return a
}
// SetKeyring sets the keyring to a given keyring. This is only useful for the P2P
// demos and must be called after SetFederationSenderInputAPI.
func (r *RoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
r.KeyRing = keyRing
}
// SetFederationInputAPI passes in a federation input API reference so that we can
// avoid the chicken-and-egg problem of both the roomserver input API and the
// federation input API being interdependent.
func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI) {
func (r *RoomserverInternalAPI) SetFederationAPI(fsAPI fsAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) {
r.fsAPI = fsAPI
r.SetKeyring(fsAPI.KeyRing())
r.KeyRing = keyRing
r.Inviter = &perform.Inviter{
DB: r.DB,

View file

@ -122,7 +122,7 @@ func (r *Inputer) processRoomEvent(
}
// Store the event.
_, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
_, _, stateAtEvent, redactionEvent, redactedEventID, err := r.DB.StoreEvent(ctx, event, authEventNIDs, isRejected)
if err != nil {
return "", fmt.Errorf("r.DB.StoreEvent: %w", err)
}

View file

@ -546,6 +546,7 @@ func joinEventsFromHistoryVisibility(
func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixserverlib.HeaderedEvent) (types.RoomNID, map[string]types.Event) {
var roomNID types.RoomNID
var eventNID types.EventNID
backfilledEventMap := make(map[string]types.Event)
for j, ev := range events {
nidMap, err := db.EventNIDs(ctx, ev.AuthEventIDs())
@ -559,10 +560,9 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
authNids[i] = nid
i++
}
var stateAtEvent types.StateAtEvent
var redactedEventID string
var redactionEvent *gomatrixserverlib.Event
roomNID, stateAtEvent, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
eventNID, roomNID, _, redactionEvent, redactedEventID, err = db.StoreEvent(ctx, ev.Unwrap(), authNids, false)
if err != nil {
logrus.WithError(err).WithField("event_id", ev.EventID()).Error("Failed to persist event")
continue
@ -581,7 +581,7 @@ func persistEvents(ctx context.Context, db storage.Database, events []*gomatrixs
events[j] = ev
}
backfilledEventMap[ev.EventID()] = types.Event{
EventNID: stateAtEvent.StateEntry.EventNID,
EventNID: eventNID,
Event: ev.Unwrap(),
}
}

View file

@ -83,12 +83,8 @@ func NewRoomserverClient(
}, nil
}
// SetKeyring no-ops in HTTP client mode as there is no chicken/egg scenario
func (h *httpRoomserverInternalAPI) SetKeyring(keyRing *gomatrixserverlib.KeyRing) {
}
// SetFederationInputAPI no-ops in HTTP client mode as there is no chicken/egg scenario
func (h *httpRoomserverInternalAPI) SetFederationAPI(fsAPI fsInputAPI.FederationInternalAPI) {
func (h *httpRoomserverInternalAPI) SetFederationAPI(fsAPI fsInputAPI.FederationInternalAPI, keyRing *gomatrixserverlib.KeyRing) {
}
// SetAppserviceAPI no-ops in HTTP client mode as there is no chicken/egg scenario

View file

@ -70,7 +70,7 @@ type Database interface {
StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event, authEventNIDs []types.EventNID,
isRejected bool,
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error)
// Look up the state entries for a list of string event IDs
// Returns an error if the there is an error talking to the database
// Returns a types.MissingEventError if the event IDs aren't in the database.

View file

@ -461,7 +461,7 @@ func (d *Database) GetLatestEventsForUpdate(
func (d *Database) StoreEvent(
ctx context.Context, event *gomatrixserverlib.Event,
authEventNIDs []types.EventNID, isRejected bool,
) (types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
) (types.EventNID, types.RoomNID, types.StateAtEvent, *gomatrixserverlib.Event, string, error) {
var (
roomNID types.RoomNID
eventTypeNID types.EventTypeNID
@ -538,7 +538,7 @@ func (d *Database) StoreEvent(
return nil
})
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.Writer.Do: %w", err)
}
// We should attempt to update the previous events table with any
@ -551,10 +551,10 @@ func (d *Database) StoreEvent(
if prevEvents := event.PrevEvents(); len(prevEvents) > 0 {
roomInfo, err = d.RoomInfo(ctx, event.RoomID())
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("d.RoomInfo: %w", err)
}
if roomInfo == nil && len(prevEvents) > 0 {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("expected room %q to exist", event.RoomID())
}
// Create an updater - NB: on sqlite this WILL create a txn as we are directly calling the shared DB form of
// GetLatestEventsForUpdate - not via the SQLiteDatabase form which has `nil` txns. This
@ -563,7 +563,7 @@ func (d *Database) StoreEvent(
// to do writes however then this will need to go inside `Writer.Do`.
updater, err = d.GetLatestEventsForUpdate(ctx, *roomInfo)
if err != nil {
return 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
return 0, 0, types.StateAtEvent{}, nil, "", fmt.Errorf("NewLatestEventsUpdater: %w", err)
}
// Ensure that we atomically store prev events AND commit them. If we don't wrap StorePreviousEvents
// and EndTransaction in a writer then it's possible for a new write txn to be made between the two
@ -580,11 +580,11 @@ func (d *Database) StoreEvent(
return err
})
if err != nil {
return 0, types.StateAtEvent{}, nil, "", err
return 0, 0, types.StateAtEvent{}, nil, "", err
}
}
return roomNID, types.StateAtEvent{
return eventNID, roomNID, types.StateAtEvent{
BeforeStateSnapshotNID: stateNID,
StateEntry: types.StateEntry{
StateKeyTuple: types.StateKeyTuple{

View file

@ -49,7 +49,8 @@ const eventsSchema = `
const insertEventSQL = `
INSERT INTO roomserver_events (room_nid, event_type_nid, event_state_key_nid, event_id, reference_sha256, auth_event_nids, depth, is_rejected)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT DO NOTHING;
ON CONFLICT DO NOTHING
RETURNING event_nid, state_snapshot_nid;
`
const selectEventSQL = "" +
@ -161,20 +162,13 @@ func (s *eventStatements) InsertEvent(
) (types.EventNID, types.StateSnapshotNID, error) {
// attempt to insert: the last_row_id is the event NID
var eventNID int64
var stateNID int64
insertStmt := sqlutil.TxStmt(txn, s.insertEventStmt)
result, err := insertStmt.ExecContext(
err := insertStmt.QueryRowContext(
ctx, int64(roomNID), int64(eventTypeNID), int64(eventStateKeyNID),
eventID, referenceSHA256, eventNIDsAsArray(authEventNIDs), depth, isRejected,
)
if err != nil {
return 0, 0, err
}
modified, err := result.RowsAffected()
if modified == 0 && err == nil {
return 0, 0, sql.ErrNoRows
}
eventNID, err = result.LastInsertId()
return types.EventNID(eventNID), 0, err
).Scan(&eventNID, &stateNID)
return types.EventNID(eventNID), types.StateSnapshotNID(stateNID), err
}
func (s *eventStatements) SelectEvent(

View file

@ -21,7 +21,6 @@ import (
"io"
"net"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
@ -79,7 +78,6 @@ type BaseDendrite struct {
SynapseAdminMux *mux.Router
UseHTTPAPIs bool
apiHttpClient *http.Client
httpClient *http.Client
Cfg *config.Dendrite
Caches *caching.Caches
DNSCache *gomatrixserverlib.DNSCache
@ -181,13 +179,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
},
},
}
client := http.Client{Timeout: HTTPClientTimeout}
if cfg.FederationAPI.Proxy.Enabled {
client.Transport = &http.Transport{Proxy: http.ProxyURL(&url.URL{
Scheme: cfg.FederationAPI.Proxy.Protocol,
Host: fmt.Sprintf("%s:%d", cfg.FederationAPI.Proxy.Host, cfg.FederationAPI.Proxy.Port),
})}
}
// Ideally we would only use SkipClean on routes which we know can allow '/' but due to
// https://github.com/gorilla/mux/issues/460 we have to attach this at the top router.
@ -217,7 +208,6 @@ func NewBaseDendrite(cfg *config.Dendrite, componentName string, options ...Base
InternalAPIMux: mux.NewRouter().SkipClean(true).PathPrefix(httputil.InternalPathPrefix).Subrouter().UseEncodedPath(),
SynapseAdminMux: mux.NewRouter().SkipClean(true).PathPrefix("/_synapse/").Subrouter().UseEncodedPath(),
apiHttpClient: &apiClient,
httpClient: &client,
}
}

View file

@ -68,7 +68,7 @@ func (m *Monolith) AddAllPublicRoutes(process *process.ProcessContext, csMux, ss
m.KeyRing, m.RoomserverAPI, m.FederationAPI,
m.EDUInternalAPI, m.KeyAPI, &m.Config.MSCs, nil,
)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, m.UserAPI, m.Client)
mediaapi.AddPublicRoutes(mediaMux, &m.Config.MediaAPI, &m.Config.ClientAPI.RateLimiting, m.UserAPI, m.Client)
syncapi.AddPublicRoutes(
process, csMux, m.UserAPI, m.RoomserverAPI,
m.KeyAPI, m.FedClient, &m.Config.SyncAPI,

View file

@ -7,7 +7,6 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sort"
@ -504,7 +503,7 @@ type testUserAPI struct {
func (u *testUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
dev, ok := u.accessTokens[req.AccessToken]
if !ok {
res.Err = fmt.Errorf("unknown token")
res.Err = "unknown token"
return nil
}
res.Device = &dev

View file

@ -19,7 +19,6 @@ import (
"context"
"crypto/ed25519"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
@ -347,7 +346,7 @@ type testUserAPI struct {
func (u *testUserAPI) QueryAccessToken(ctx context.Context, req *userapi.QueryAccessTokenRequest, res *userapi.QueryAccessTokenResponse) error {
dev, ok := u.accessTokens[req.AccessToken]
if !ok {
res.Err = fmt.Errorf("unknown token")
res.Err = "unknown token"
return nil
}
res.Device = &dev

View file

@ -556,6 +556,7 @@ can fetch self-signing keys over federation
Changing master key notifies local users
Changing user-signing key notifies local users
Inbound federation correctly handles soft failed events as extremities
Can read configuration endpoint
User can create and send/receive messages in a room with version 7
local user can join room with version 7
User can invite local user to room with version 7

View file

@ -33,7 +33,7 @@ type UserInternalAPI interface {
PerformDeviceUpdate(ctx context.Context, req *PerformDeviceUpdateRequest, res *PerformDeviceUpdateResponse) error
PerformAccountDeactivation(ctx context.Context, req *PerformAccountDeactivationRequest, res *PerformAccountDeactivationResponse) error
PerformOpenIDTokenCreation(ctx context.Context, req *PerformOpenIDTokenCreationRequest, res *PerformOpenIDTokenCreationResponse) error
PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse)
PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error
QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse)
QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error
QueryAccessToken(ctx context.Context, req *QueryAccessTokenRequest, res *QueryAccessTokenResponse) error
@ -181,7 +181,7 @@ type QueryAccessTokenRequest struct {
// QueryAccessTokenResponse is the response for QueryAccessToken
type QueryAccessTokenResponse struct {
Device *Device
Err error // e.g ErrorForbidden
Err string // e.g ErrorForbidden
}
// QueryAccountDataRequest is the request for QueryAccountData
@ -290,6 +290,10 @@ type PerformDeviceCreationRequest struct {
IPAddr string
// Useragent for this device
UserAgent string
// NoDeviceListUpdate determines whether we should avoid sending a device list
// update for this account. Generally the only reason to do this is if the account
// is an appservice account.
NoDeviceListUpdate bool
}
// PerformDeviceCreationResponse is the response for PerformDeviceCreation

View file

@ -74,11 +74,14 @@ func (t *UserInternalAPITrace) PerformOpenIDTokenCreation(ctx context.Context, r
util.GetLogger(ctx).Infof("PerformOpenIDTokenCreation req=%+v res=%+v", js(req), js(res))
return err
}
func (t *UserInternalAPITrace) PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) {
t.Impl.PerformKeyBackup(ctx, req, res)
func (t *UserInternalAPITrace) PerformKeyBackup(ctx context.Context, req *PerformKeyBackupRequest, res *PerformKeyBackupResponse) error {
err := t.Impl.PerformKeyBackup(ctx, req, res)
util.GetLogger(ctx).Infof("PerformKeyBackup req=%+v res=%+v", js(req), js(res))
return err
}
func (t *UserInternalAPITrace) QueryKeyBackup(ctx context.Context, req *QueryKeyBackupRequest, res *QueryKeyBackupResponse) {
t.Impl.QueryKeyBackup(ctx, req, res)
util.GetLogger(ctx).Infof("QueryKeyBackup req=%+v res=%+v", js(req), js(res))
}
func (t *UserInternalAPITrace) QueryProfile(ctx context.Context, req *QueryProfileRequest, res *QueryProfileResponse) error {
err := t.Impl.QueryProfile(ctx, req, res)

View file

@ -119,6 +119,9 @@ func (a *UserInternalAPI) PerformDeviceCreation(ctx context.Context, req *api.Pe
}
res.DeviceCreated = true
res.Device = dev
if req.NoDeviceListUpdate {
return nil
}
// create empty device keys and upload them to trigger device list changes
return a.deviceListUpdate(dev.UserID, []string{dev.ID})
}
@ -358,8 +361,11 @@ func (a *UserInternalAPI) QueryAccountData(ctx context.Context, req *api.QueryAc
func (a *UserInternalAPI) QueryAccessToken(ctx context.Context, req *api.QueryAccessTokenRequest, res *api.QueryAccessTokenResponse) error {
if req.AppServiceUserID != "" {
appServiceDevice, err := a.queryAppServiceToken(ctx, req.AccessToken, req.AppServiceUserID)
if err != nil {
res.Err = err.Error()
}
res.Device = appServiceDevice
res.Err = err
return nil
}
device, err := a.DeviceDB.GetDeviceByAccessToken(ctx, req.AccessToken)
@ -455,13 +461,16 @@ func (a *UserInternalAPI) QueryOpenIDToken(ctx context.Context, req *api.QueryOp
return nil
}
func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) {
func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) error {
// Delete metadata
if req.DeleteBackup {
if req.Version == "" {
res.BadInput = true
res.Error = "must specify a version to delete"
return
if res.Error != "" {
return fmt.Errorf(res.Error)
}
return nil
}
exists, err := a.AccountDB.DeleteKeyBackup(ctx, req.UserID, req.Version)
if err != nil {
@ -469,7 +478,10 @@ func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.Perform
}
res.Exists = exists
res.Version = req.Version
return
if res.Error != "" {
return fmt.Errorf(res.Error)
}
return nil
}
// Create metadata
if req.Version == "" {
@ -479,7 +491,10 @@ func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.Perform
}
res.Exists = err == nil
res.Version = version
return
if res.Error != "" {
return fmt.Errorf(res.Error)
}
return nil
}
// Update metadata
if len(req.Keys.Rooms) == 0 {
@ -489,10 +504,17 @@ func (a *UserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.Perform
}
res.Exists = err == nil
res.Version = req.Version
return
if res.Error != "" {
return fmt.Errorf(res.Error)
}
return nil
}
// Upload Keys for a specific version metadata
a.uploadBackupKeys(ctx, req, res)
if res.Error != "" {
return fmt.Errorf(res.Error)
}
return nil
}
func (a *UserInternalAPI) uploadBackupKeys(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) {

View file

@ -228,7 +228,7 @@ func (h *httpUserInternalAPI) QueryOpenIDToken(ctx context.Context, req *api.Que
return httputil.PostJSON(ctx, span, h.httpClient, apiURL, req, res)
}
func (h *httpUserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) {
func (h *httpUserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.PerformKeyBackupRequest, res *api.PerformKeyBackupResponse) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "PerformKeyBackup")
defer span.Finish()
@ -237,6 +237,7 @@ func (h *httpUserInternalAPI) PerformKeyBackup(ctx context.Context, req *api.Per
if err != nil {
res.Error = err.Error()
}
return nil
}
func (h *httpUserInternalAPI) QueryKeyBackup(ctx context.Context, req *api.QueryKeyBackupRequest, res *api.QueryKeyBackupResponse) {
span, ctx := opentracing.StartSpanFromContext(ctx, "QueryKeyBackup")

View file

@ -16,6 +16,7 @@ package inthttp
import (
"encoding/json"
"fmt"
"net/http"
"github.com/gorilla/mux"
@ -234,4 +235,32 @@ func AddRoutes(internalAPIMux *mux.Router, s api.UserInternalAPI) {
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(QueryKeyBackupPath,
httputil.MakeInternalAPI("queryKeyBackup", func(req *http.Request) util.JSONResponse {
request := api.QueryKeyBackupRequest{}
response := api.QueryKeyBackupResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
s.QueryKeyBackup(req.Context(), &request, &response)
if response.Error != "" {
return util.ErrorResponse(fmt.Errorf("QueryKeyBackup: %s", response.Error))
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
internalAPIMux.Handle(PerformKeyBackupPath,
httputil.MakeInternalAPI("performKeyBackup", func(req *http.Request) util.JSONResponse {
request := api.PerformKeyBackupRequest{}
response := api.PerformKeyBackupResponse{}
if err := json.NewDecoder(req.Body).Decode(&request); err != nil {
return util.MessageResponse(http.StatusBadRequest, err.Error())
}
err := s.PerformKeyBackup(req.Context(), &request, &response)
if err != nil {
return util.JSONResponse{Code: http.StatusBadRequest, JSON: &response}
}
return util.JSONResponse{Code: http.StatusOK, JSON: &response}
}),
)
}