Merge branch 'master' into neilalexander/rsconcurrency

This commit is contained in:
Neil Alexander 2021-03-03 11:26:06 +00:00 committed by GitHub
commit 6cc1d1e518
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
37 changed files with 685 additions and 524 deletions

View file

@ -1,5 +1,43 @@
# Changelog # Changelog
## Dendrite 0.3.11 (2021-03-02)
### Fixes
- **SECURITY:** A bug in SQLite mode which could cause the registration flow to complete unexpectedly for existing accounts has been fixed (PostgreSQL deployments are not affected)
- A panic in the federation sender has been fixed when shutting down destination queues
- The `/keys/upload` endpoint now correctly returns the number of one-time keys in response to an empty upload request
## Dendrite 0.3.10 (2021-02-17)
### Features
* In-memory caches will now gradually evict old entries, reducing idle memory usage
* Federation sender queues will now be fully unloaded when idle, reducing idle memory usage
* The `power_level_content_override` option is now supported in `/createRoom`
* The `/send` endpoint will now attempt more servers in the room when trying to fetch missing events or state
### Fixes
* A panic in the membership updater has been fixed
* Events in the sync API that weren't excluded from sync can no longer be incorrectly excluded from sync by backfill
* Retrieving remote media now correcly respects the locally configured maximum file size, even when the `Content-Length` header is unavailable
* The `/send` endpoint will no longer hit the database more than once to find servers in the room
## Dendrite 0.3.9 (2021-02-04)
### Features
* Performance of initial/complete syncs has been improved dramatically
* State events that can't be authed are now dropped when joining a room rather than unexpectedly causing the room join to fail
* State events that already appear in the timeline will no longer be requested from the sync API database more than once, which may reduce memory usage in some cases
### Fixes
* A crash at startup due to a conflict in the sync API account data has been fixed
* A crash at startup due to mismatched event IDs in the federation sender has been fixed
* A redundant check which may cause the roomserver memberships table to get out of sync has been removed
## Dendrite 0.3.8 (2021-01-28) ## Dendrite 0.3.8 (2021-01-28)
### Fixes ### Fixes

View file

@ -23,13 +23,13 @@ RUN apt-get update && apt-get -y install python
WORKDIR /build WORKDIR /build
ADD https://github.com/matrix-org/go-http-js-libp2p/archive/master.tar.gz /build/libp2p.tar.gz ADD https://github.com/matrix-org/go-http-js-libp2p/archive/master.tar.gz /build/libp2p.tar.gz
RUN tar xvfz libp2p.tar.gz RUN tar xvfz libp2p.tar.gz
ADD https://github.com/vector-im/riot-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz ADD https://github.com/vector-im/element-web/archive/matthew/p2p.tar.gz /build/p2p.tar.gz
RUN tar xvfz p2p.tar.gz RUN tar xvfz p2p.tar.gz
# Install deps for riot-web, symlink in libp2p repo and build that too # Install deps for element-web, symlink in libp2p repo and build that too
WORKDIR /build/riot-web-matthew-p2p WORKDIR /build/element-web-matthew-p2p
RUN yarn install RUN yarn install
RUN ln -s /build/go-http-js-libp2p-master /build/riot-web-matthew-p2p/node_modules/go-http-js-libp2p RUN ln -s /build/go-http-js-libp2p-master /build/element-web-matthew-p2p/node_modules/go-http-js-libp2p
RUN (cd node_modules/go-http-js-libp2p && yarn install) RUN (cd node_modules/go-http-js-libp2p && yarn install)
COPY --from=gobuild /build/dendrite-master/main.wasm ./src/vector/dendrite.wasm COPY --from=gobuild /build/dendrite-master/main.wasm ./src/vector/dendrite.wasm
# build it all # build it all
@ -108,4 +108,4 @@ server { \n\
} \n\ } \n\
}' > /etc/nginx/conf.d/default.conf }' > /etc/nginx/conf.d/default.conf
RUN sed -i 's/}/ application\/wasm wasm;\n}/g' /etc/nginx/mime.types RUN sed -i 's/}/ application\/wasm wasm;\n}/g' /etc/nginx/mime.types
COPY --from=jsbuild /build/riot-web-matthew-p2p/webapp /usr/share/nginx/html COPY --from=jsbuild /build/element-web-matthew-p2p/webapp /usr/share/nginx/html

View file

@ -14,7 +14,7 @@ RUN go build -trimpath -o bin/ ./cmd/generate-keys
FROM alpine:latest FROM alpine:latest
COPY --from=base /build/bin/* /usr/bin COPY --from=base /build/bin/* /usr/bin/
VOLUME /etc/dendrite VOLUME /etc/dendrite
WORKDIR /etc/dendrite WORKDIR /etc/dendrite

View file

@ -14,7 +14,7 @@ RUN go build -trimpath -o bin/ ./cmd/generate-keys
FROM alpine:latest FROM alpine:latest
COPY --from=base /build/bin/* /usr/bin COPY --from=base /build/bin/* /usr/bin/
VOLUME /etc/dendrite VOLUME /etc/dendrite
WORKDIR /etc/dendrite WORKDIR /etc/dendrite

View file

@ -309,12 +309,12 @@ user_api:
listen: http://0.0.0.0:7781 listen: http://0.0.0.0:7781
connect: http://user_api:7781 connect: http://user_api:7781
account_database: account_database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_account?sslmode=disable connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_userapi_accounts?sslmode=disable
max_open_conns: 10 max_open_conns: 10
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1
device_database: device_database:
connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_device?sslmode=disable connection_string: postgresql://dendrite:itsasecret@postgres/dendrite_userapi_devices?sslmode=disable
max_open_conns: 10 max_open_conns: 10
max_idle_conns: 2 max_idle_conns: 2
conn_max_lifetime: -1 conn_max_lifetime: -1

View file

@ -12,6 +12,7 @@ services:
- 8448:8448 - 8448:8448
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
- ./media:/var/dendrite/media
networks: networks:
- internal - internal

View file

@ -15,6 +15,7 @@ services:
command: mediaapi command: mediaapi
volumes: volumes:
- ./config:/etc/dendrite - ./config:/etc/dendrite
- ./media:/var/dendrite/media
networks: networks:
- internal - internal

View file

@ -1,5 +1,5 @@
#!/bin/sh #!/bin/sh
for db in account device mediaapi syncapi roomserver signingkeyserver keyserver federationsender appservice naffka; do for db in userapi_accounts userapi_devices mediaapi syncapi roomserver signingkeyserver keyserver federationsender appservice naffka; do
createdb -U dendrite -O dendrite dendrite_$db createdb -U dendrite -O dendrite dendrite_$db
done done

View file

@ -48,6 +48,7 @@ type createRoomRequest struct {
RoomAliasName string `json:"room_alias_name"` RoomAliasName string `json:"room_alias_name"`
GuestCanJoin bool `json:"guest_can_join"` GuestCanJoin bool `json:"guest_can_join"`
RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"` RoomVersion gomatrixserverlib.RoomVersion `json:"room_version"`
PowerLevelContentOverride json.RawMessage `json:"power_level_content_override"`
} }
const ( const (
@ -257,6 +258,18 @@ func createRoom(
var builtEvents []*gomatrixserverlib.HeaderedEvent var builtEvents []*gomatrixserverlib.HeaderedEvent
powerLevelContent := eventutil.InitialPowerLevelsContent(userID)
if r.PowerLevelContentOverride != nil {
// Merge powerLevelContentOverride fields by unmarshalling it atop the defaults
err = json.Unmarshal(r.PowerLevelContentOverride, &powerLevelContent)
if err != nil {
return util.JSONResponse{
Code: http.StatusBadRequest,
JSON: jsonerror.BadJSON("malformed power_level_content_override"),
}
}
}
// send events into the room in order of: // send events into the room in order of:
// 1- m.room.create // 1- m.room.create
// 2- room creator join member // 2- room creator join member
@ -278,7 +291,7 @@ func createRoom(
eventsToMake := []fledglingEvent{ eventsToMake := []fledglingEvent{
{"m.room.create", "", r.CreationContent}, {"m.room.create", "", r.CreationContent},
{"m.room.member", userID, membershipContent}, {"m.room.member", userID, membershipContent},
{"m.room.power_levels", "", eventutil.InitialPowerLevelsContent(userID)}, {"m.room.power_levels", "", powerLevelContent},
{"m.room.join_rules", "", gomatrixserverlib.JoinRuleContent{JoinRule: joinRules}}, {"m.room.join_rules", "", gomatrixserverlib.JoinRuleContent{JoinRule: joinRules}},
{"m.room.history_visibility", "", eventutil.HistoryVisibilityContent{HistoryVisibility: historyVisibility}}, {"m.room.history_visibility", "", eventutil.HistoryVisibilityContent{HistoryVisibility: historyVisibility}},
} }

View file

@ -38,7 +38,10 @@ func UploadKeys(req *http.Request, keyAPI api.KeyInternalAPI, device *userapi.De
return *resErr return *resErr
} }
uploadReq := &api.PerformUploadKeysRequest{} uploadReq := &api.PerformUploadKeysRequest{
DeviceID: device.ID,
UserID: device.UserID,
}
if r.DeviceKeys != nil { if r.DeviceKeys != nil {
uploadReq.DeviceKeys = []api.DeviceKeys{ uploadReq.DeviceKeys = []api.DeviceKeys{
{ {

View file

@ -8,7 +8,6 @@ import (
"strconv" "strconv"
"github.com/matrix-org/dendrite/internal/caching" "github.com/matrix-org/dendrite/internal/caching"
"github.com/matrix-org/dendrite/roomserver/state"
"github.com/matrix-org/dendrite/roomserver/storage" "github.com/matrix-org/dendrite/roomserver/storage"
"github.com/matrix-org/dendrite/roomserver/types" "github.com/matrix-org/dendrite/roomserver/types"
"github.com/matrix-org/dendrite/setup" "github.com/matrix-org/dendrite/setup"
@ -105,7 +104,7 @@ func main() {
} }
fmt.Println("Resolving state") fmt.Println("Resolving state")
resolved, err := state.ResolveConflictsAdhoc( resolved, err := gomatrixserverlib.ResolveConflicts(
gomatrixserverlib.RoomVersion(*roomVersion), gomatrixserverlib.RoomVersion(*roomVersion),
events, events,
authEvents, authEvents,

View file

@ -109,7 +109,7 @@ On macOS, omit `sudo -u postgres` from the below commands.
* If you want to run each Dendrite component with its own database: * If you want to run each Dendrite component with its own database:
```bash ```bash
for i in mediaapi syncapi roomserver signingkeyserver federationsender appservice keyserver userapi_account userapi_device naffka; do for i in mediaapi syncapi roomserver signingkeyserver federationsender appservice keyserver userapi_accounts userapi_devices naffka; do
sudo -u postgres createdb -O dendrite dendrite_$i sudo -u postgres createdb -O dendrite dendrite_$i
done done
``` ```

View file

@ -107,6 +107,8 @@ type txnReq struct {
keyAPI keyapi.KeyInternalAPI keyAPI keyapi.KeyInternalAPI
keys gomatrixserverlib.JSONVerifier keys gomatrixserverlib.JSONVerifier
federation txnFederationClient federation txnFederationClient
servers []gomatrixserverlib.ServerName
serversMutex sync.RWMutex
// local cache of events for auth checks, etc - this may include events // local cache of events for auth checks, etc - this may include events
// which the roomserver is unaware of. // which the roomserver is unaware of.
haveEvents map[string]*gomatrixserverlib.HeaderedEvent haveEvents map[string]*gomatrixserverlib.HeaderedEvent
@ -404,16 +406,21 @@ func (t *txnReq) processDeviceListUpdate(ctx context.Context, e gomatrixserverli
} }
func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName { func (t *txnReq) getServers(ctx context.Context, roomID string) []gomatrixserverlib.ServerName {
servers := []gomatrixserverlib.ServerName{t.Origin} t.serversMutex.Lock()
defer t.serversMutex.Unlock()
if t.servers != nil {
return t.servers
}
t.servers = []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{ serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: roomID, RoomID: roomID,
} }
serverRes := &api.QueryServerJoinedToRoomResponse{} serverRes := &api.QueryServerJoinedToRoomResponse{}
if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil { if err := t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
servers = append(servers, serverRes.ServerNames...) t.servers = append(t.servers, serverRes.ServerNames...)
util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(servers), roomID) util.GetLogger(ctx).Infof("Found %d server(s) to query for missing events in %q", len(t.servers), roomID)
} }
return servers return t.servers
} }
func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error { func (t *txnReq) processEvent(ctx context.Context, e *gomatrixserverlib.Event) error {
@ -482,14 +489,10 @@ func (t *txnReq) retrieveMissingAuthEvents(
missingAuthEvents[missingAuthEventID] = struct{}{} missingAuthEvents[missingAuthEventID] = struct{}{}
} }
servers := t.getServers(ctx, e.RoomID())
if len(servers) > 5 {
servers = servers[:5]
}
withNextEvent: withNextEvent:
for missingAuthEventID := range missingAuthEvents { for missingAuthEventID := range missingAuthEvents {
withNextServer: withNextServer:
for _, server := range servers { for _, server := range t.getServers(ctx, e.RoomID()) {
logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server) logger.Infof("Retrieving missing auth event %q from %q", missingAuthEventID, server)
tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID) tx, err := t.federation.GetEvent(ctx, server, missingAuthEventID)
if err != nil { if err != nil {
@ -692,13 +695,8 @@ func (t *txnReq) lookupStateAfterEvent(ctx context.Context, roomVersion gomatrix
return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err) return nil, false, fmt.Errorf("t.lookupStateBeforeEvent: %w", err)
} }
servers := t.getServers(ctx, roomID)
if len(servers) > 5 {
servers = servers[:5]
}
// fetch the event we're missing and add it to the pile // fetch the event we're missing and add it to the pile
h, err := t.lookupEvent(ctx, roomVersion, eventID, false, servers) h, err := t.lookupEvent(ctx, roomVersion, roomID, eventID, false)
switch err.(type) { switch err.(type) {
case verifySigError: case verifySigError:
return respState, false, nil return respState, false, nil
@ -740,11 +738,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
t.haveEvents[ev.EventID()] = res.StateEvents[i] t.haveEvents[ev.EventID()] = res.StateEvents[i]
} }
var authEvents []*gomatrixserverlib.Event var authEvents []*gomatrixserverlib.Event
missingAuthEvents := make(map[string]bool) missingAuthEvents := map[string]bool{}
for _, ev := range res.StateEvents { for _, ev := range res.StateEvents {
for _, ae := range ev.AuthEventIDs() { for _, ae := range ev.AuthEventIDs() {
aev, ok := t.haveEvents[ae] if aev, ok := t.haveEvents[ae]; ok {
if ok {
authEvents = append(authEvents, aev.Unwrap()) authEvents = append(authEvents, aev.Unwrap())
} else { } else {
missingAuthEvents[ae] = true missingAuthEvents[ae] = true
@ -753,6 +750,7 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
} }
// QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't // QueryStateAfterEvents does not return the auth events, so fetch them now. We know the roomserver has them else it wouldn't
// have stored the event. // have stored the event.
if len(missingAuthEvents) > 0 {
var missingEventList []string var missingEventList []string
for evID := range missingAuthEvents { for evID := range missingAuthEvents {
missingEventList = append(missingEventList, evID) missingEventList = append(missingEventList, evID)
@ -770,10 +768,10 @@ func (t *txnReq) lookupStateAfterEventLocally(ctx context.Context, roomID, event
t.haveEvents[evID] = queryRes.Events[i] t.haveEvents[evID] = queryRes.Events[i]
authEvents = append(authEvents, queryRes.Events[i].Unwrap()) authEvents = append(authEvents, queryRes.Events[i].Unwrap())
} }
}
evs := gomatrixserverlib.UnwrapEventHeaders(res.StateEvents)
return &gomatrixserverlib.RespState{ return &gomatrixserverlib.RespState{
StateEvents: evs, StateEvents: gomatrixserverlib.UnwrapEventHeaders(res.StateEvents),
AuthEvents: authEvents, AuthEvents: authEvents,
} }
} }
@ -805,11 +803,7 @@ retryAllowedState:
if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil { if err = checkAllowedByState(backwardsExtremity, resolvedStateEvents); err != nil {
switch missing := err.(type) { switch missing := err.(type) {
case gomatrixserverlib.MissingAuthEventError: case gomatrixserverlib.MissingAuthEventError:
servers := t.getServers(ctx, backwardsExtremity.RoomID()) h, err2 := t.lookupEvent(ctx, roomVersion, backwardsExtremity.RoomID(), missing.AuthEventID, true)
if len(servers) > 5 {
servers = servers[:5]
}
h, err2 := t.lookupEvent(ctx, roomVersion, missing.AuthEventID, true, servers)
switch err2.(type) { switch err2.(type) {
case verifySigError: case verifySigError:
return &gomatrixserverlib.RespState{ return &gomatrixserverlib.RespState{
@ -857,17 +851,8 @@ func (t *txnReq) getMissingEvents(ctx context.Context, e *gomatrixserverlib.Even
latestEvents[i] = res.LatestEvents[i].EventID latestEvents[i] = res.LatestEvents[i].EventID
} }
servers := []gomatrixserverlib.ServerName{t.Origin}
serverReq := &api.QueryServerJoinedToRoomRequest{
RoomID: e.RoomID(),
}
serverRes := &api.QueryServerJoinedToRoomResponse{}
if err = t.rsAPI.QueryServerJoinedToRoom(ctx, serverReq, serverRes); err == nil {
servers = append(servers, serverRes.ServerNames...)
logger.Infof("Found %d server(s) to query for missing events", len(servers))
}
var missingResp *gomatrixserverlib.RespMissingEvents var missingResp *gomatrixserverlib.RespMissingEvents
servers := t.getServers(ctx, e.RoomID())
for _, server := range servers { for _, server := range servers {
var m gomatrixserverlib.RespMissingEvents var m gomatrixserverlib.RespMissingEvents
if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{ if m, err = t.federation.LookupMissingEvents(ctx, server, e.RoomID(), gomatrixserverlib.MissingEvents{
@ -1015,12 +1000,6 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
"concurrent_requests": concurrentRequests, "concurrent_requests": concurrentRequests,
}).Info("Fetching missing state at event") }).Info("Fetching missing state at event")
// Get a list of servers to fetch from.
servers := t.getServers(ctx, roomID)
if len(servers) > 5 {
servers = servers[:5]
}
// Create a queue containing all of the missing event IDs that we want // Create a queue containing all of the missing event IDs that we want
// to retrieve. // to retrieve.
pending := make(chan string, missingCount) pending := make(chan string, missingCount)
@ -1046,7 +1025,7 @@ func (t *txnReq) lookupMissingStateViaStateIDs(ctx context.Context, roomID, even
// Define what we'll do in order to fetch the missing event ID. // Define what we'll do in order to fetch the missing event ID.
fetch := func(missingEventID string) { fetch := func(missingEventID string) {
var h *gomatrixserverlib.HeaderedEvent var h *gomatrixserverlib.HeaderedEvent
h, err = t.lookupEvent(ctx, roomVersion, missingEventID, false, servers) h, err = t.lookupEvent(ctx, roomVersion, roomID, missingEventID, false)
switch err.(type) { switch err.(type) {
case verifySigError: case verifySigError:
return return
@ -1112,7 +1091,7 @@ func (t *txnReq) createRespStateFromStateIDs(stateIDs gomatrixserverlib.RespStat
return &respState, nil return &respState, nil
} }
func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, missingEventID string, localFirst bool, servers []gomatrixserverlib.ServerName) (*gomatrixserverlib.HeaderedEvent, error) { func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.RoomVersion, roomID, missingEventID string, localFirst bool) (*gomatrixserverlib.HeaderedEvent, error) {
if localFirst { if localFirst {
// fetch from the roomserver // fetch from the roomserver
queryReq := api.QueryEventsByIDRequest{ queryReq := api.QueryEventsByIDRequest{
@ -1127,6 +1106,7 @@ func (t *txnReq) lookupEvent(ctx context.Context, roomVersion gomatrixserverlib.
} }
var event *gomatrixserverlib.Event var event *gomatrixserverlib.Event
found := false found := false
servers := t.getServers(ctx, roomID)
for _, serverName := range servers { for _, serverName := range servers {
txn, err := t.federation.GetEvent(ctx, serverName, missingEventID) txn, err := t.federation.GetEvent(ctx, serverName, missingEventID)
if err != nil || len(txn.PDUs) == 0 { if err != nil || len(txn.PDUs) == 0 {

View file

@ -46,6 +46,7 @@ const (
// ensures that only one request is in flight to a given destination // ensures that only one request is in flight to a given destination
// at a time. // at a time.
type destinationQueue struct { type destinationQueue struct {
queues *OutgoingQueues
db storage.Database db storage.Database
process *process.ProcessContext process *process.ProcessContext
signing *SigningInfo signing *SigningInfo
@ -246,6 +247,7 @@ func (oq *destinationQueue) backgroundSend() {
} }
destinationQueueRunning.Inc() destinationQueueRunning.Inc()
defer destinationQueueRunning.Dec() defer destinationQueueRunning.Dec()
defer oq.queues.clearQueue(oq)
defer oq.running.Store(false) defer oq.running.Store(false)
// Mark the queue as overflowed, so we will consult the database // Mark the queue as overflowed, so we will consult the database

View file

@ -120,7 +120,7 @@ func NewOutgoingQueues(
log.WithError(err).Error("Failed to get EDU server names for destination queue hydration") log.WithError(err).Error("Failed to get EDU server names for destination queue hydration")
} }
for serverName := range serverNames { for serverName := range serverNames {
if queue := queues.getQueue(serverName); !queue.statistics.Blacklisted() { if queue := queues.getQueue(serverName); queue != nil {
queue.wakeQueueIfNeeded() queue.wakeQueueIfNeeded()
} }
} }
@ -148,12 +148,16 @@ type queuedEDU struct {
} }
func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue { func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *destinationQueue {
if oqs.statistics.ForServer(destination).Blacklisted() {
return nil
}
oqs.queuesMutex.Lock() oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock() defer oqs.queuesMutex.Unlock()
oq := oqs.queues[destination] oq, ok := oqs.queues[destination]
if oq == nil { if !ok || oq != nil {
destinationQueueTotal.Inc() destinationQueueTotal.Inc()
oq = &destinationQueue{ oq = &destinationQueue{
queues: oqs,
db: oqs.db, db: oqs.db,
process: oqs.process, process: oqs.process,
rsAPI: oqs.rsAPI, rsAPI: oqs.rsAPI,
@ -170,6 +174,14 @@ func (oqs *OutgoingQueues) getQueue(destination gomatrixserverlib.ServerName) *d
return oq return oq
} }
func (oqs *OutgoingQueues) clearQueue(oq *destinationQueue) {
oqs.queuesMutex.Lock()
defer oqs.queuesMutex.Unlock()
delete(oqs.queues, oq.destination)
destinationQueueTotal.Dec()
}
type ErrorFederationDisabled struct { type ErrorFederationDisabled struct {
Message string Message string
} }
@ -236,7 +248,9 @@ func (oqs *OutgoingQueues) SendEvent(
} }
for destination := range destmap { for destination := range destmap {
oqs.getQueue(destination).sendEvent(ev, nid) if queue := oqs.getQueue(destination); queue != nil {
queue.sendEvent(ev, nid)
}
} }
return nil return nil
@ -306,7 +320,9 @@ func (oqs *OutgoingQueues) SendEDU(
} }
for destination := range destmap { for destination := range destmap {
oqs.getQueue(destination).sendEDU(e, nid) if queue := oqs.getQueue(destination); queue != nil {
queue.sendEDU(e, nid)
}
} }
return nil return nil
@ -317,9 +333,7 @@ func (oqs *OutgoingQueues) RetryServer(srv gomatrixserverlib.ServerName) {
if oqs.disabled { if oqs.disabled {
return return
} }
q := oqs.getQueue(srv) if queue := oqs.getQueue(srv); queue != nil {
if q == nil { queue.wakeQueueIfNeeded()
return
} }
q.wakeQueueIfNeeded()
} }

52
go.mod
View file

@ -2,48 +2,46 @@ module github.com/matrix-org/dendrite
require ( require (
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.27.0 github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/Shopify/sarama v1.28.0
github.com/gologme/log v1.2.0 github.com/gologme/log v1.2.0
github.com/gorilla/mux v1.8.0 github.com/gorilla/mux v1.8.0
github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/golang-lru v0.5.4
github.com/lib/pq v1.8.0 github.com/lib/pq v1.9.0
github.com/libp2p/go-libp2p v0.11.0 github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-circuit v0.3.1 github.com/libp2p/go-libp2p-circuit v0.4.0
github.com/libp2p/go-libp2p-core v0.6.1 github.com/libp2p/go-libp2p-core v0.8.3
github.com/libp2p/go-libp2p-gostream v0.2.1 github.com/libp2p/go-libp2p-gostream v0.3.1
github.com/libp2p/go-libp2p-http v0.1.5 github.com/libp2p/go-libp2p-http v0.2.0
github.com/libp2p/go-libp2p-kad-dht v0.9.0 github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-pubsub v0.3.5 github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/libp2p/go-libp2p-record v0.1.3 github.com/libp2p/go-libp2p-record v0.1.3
github.com/libp2p/go-yamux v1.3.9 // indirect
github.com/lucas-clemente/quic-go v0.17.3 github.com/lucas-clemente/quic-go v0.17.3
github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b github.com/matrix-org/dugong v0.0.0-20180820122854-51a565b5666b
github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4 github.com/matrix-org/go-http-js-libp2p v0.0.0-20200518170932-783164aeeda4
github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3 github.com/matrix-org/go-sqlite3-js v0.0.0-20200522092705-bc8506ccbcf3
github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd github.com/matrix-org/gomatrix v0.0.0-20200827122206-7dd5e2a05bcd
github.com/matrix-org/gomatrixserverlib v0.0.0-20210129163316-dd4d53729ead github.com/matrix-org/gomatrixserverlib v0.0.0-20210302161955-6142fe3f8c2c
github.com/matrix-org/naffka v0.0.0-20200901083833-bcdd62999a91 github.com/matrix-org/naffka v0.0.0-20201009174903-d26a3b9cb161
github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4 github.com/matrix-org/util v0.0.0-20200807132607-55161520e1d4
github.com/mattn/go-sqlite3 v1.14.2 github.com/mattn/go-sqlite3 v1.14.6
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646
github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6 github.com/ngrok/sqlmw v0.0.0-20200129213757-d5c93a81bec6
github.com/opentracing/opentracing-go v1.2.0 github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/pressly/goose v2.7.0-rc5+incompatible github.com/pressly/goose v2.7.0+incompatible
github.com/prometheus/client_golang v1.7.1 github.com/prometheus/client_golang v1.9.0
github.com/sirupsen/logrus v1.7.0 github.com/sirupsen/logrus v1.8.0
github.com/tidwall/gjson v1.6.7 github.com/tidwall/gjson v1.6.8
github.com/tidwall/sjson v1.1.4 github.com/tidwall/sjson v1.1.5
github.com/uber/jaeger-client-go v2.25.0+incompatible github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible github.com/uber/jaeger-lib v2.4.0+incompatible
github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20201006093556-760d9a7fd5ee github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20210218094457-e77ca8019daa
go.uber.org/atomic v1.6.0 go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20200528225125-3c3fba18258b golang.org/x/net v0.0.0-20210226172049-e18ecbb05110
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect gopkg.in/h2non/bimg.v1 v1.1.5
gopkg.in/h2non/bimg.v1 v1.1.4 gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v2 v2.3.0
) )
go 1.13 go 1.13

585
go.sum

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,7 @@ package caching
import ( import (
"fmt" "fmt"
"time"
lru "github.com/hashicorp/golang-lru" lru "github.com/hashicorp/golang-lru"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -72,6 +73,11 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
go cacheCleaner(
roomVersions, serverKeys, roomServerStateKeyNIDs,
roomServerEventTypeNIDs, roomServerRoomIDs,
roomInfos, federationEvents,
)
return &Caches{ return &Caches{
RoomVersions: roomVersions, RoomVersions: roomVersions,
ServerKeys: serverKeys, ServerKeys: serverKeys,
@ -83,6 +89,20 @@ func NewInMemoryLRUCache(enablePrometheus bool) (*Caches, error) {
}, nil }, nil
} }
func cacheCleaner(caches ...*InMemoryLRUCachePartition) {
for {
time.Sleep(time.Minute)
for _, cache := range caches {
// Hold onto the last 10% of the cache entries, since
// otherwise a quiet period might cause us to evict all
// cache entries entirely.
if cache.lru.Len() > cache.maxEntries/10 {
cache.lru.RemoveOldest()
}
}
}
}
type InMemoryLRUCachePartition struct { type InMemoryLRUCachePartition struct {
name string name string
mutable bool mutable bool

View file

@ -17,7 +17,7 @@ var build string
const ( const (
VersionMajor = 0 VersionMajor = 0
VersionMinor = 3 VersionMinor = 3
VersionPatch = 8 VersionPatch = 11
VersionTag = "" // example: "rc1" VersionTag = "" // example: "rc1"
) )

View file

@ -108,6 +108,8 @@ type OneTimeKeysCount struct {
// PerformUploadKeysRequest is the request to PerformUploadKeys // PerformUploadKeysRequest is the request to PerformUploadKeys
type PerformUploadKeysRequest struct { type PerformUploadKeysRequest struct {
UserID string // Required - User performing the request
DeviceID string // Optional - Device performing the request, for fetching OTK count
DeviceKeys []DeviceKeys DeviceKeys []DeviceKeys
OneTimeKeys []OneTimeKeys OneTimeKeys []OneTimeKeys
// OnlyDisplayNameUpdates should be `true` if ALL the DeviceKeys are present to update // OnlyDisplayNameUpdates should be `true` if ALL the DeviceKeys are present to update

View file

@ -513,6 +513,23 @@ func (a *KeyInternalAPI) uploadLocalDeviceKeys(ctx context.Context, req *api.Per
} }
func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) { func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.PerformUploadKeysRequest, res *api.PerformUploadKeysResponse) {
if req.UserID == "" {
res.Error = &api.KeyError{
Err: "user ID missing",
}
}
if req.DeviceID != "" && len(req.OneTimeKeys) == 0 {
counts, err := a.DB.OneTimeKeysCount(ctx, req.UserID, req.DeviceID)
if err != nil {
res.Error = &api.KeyError{
Err: fmt.Sprintf("a.DB.OneTimeKeysCount: %s", err),
}
}
if counts != nil {
res.OneTimeKeyCounts = append(res.OneTimeKeyCounts, *counts)
}
return
}
for _, key := range req.OneTimeKeys { for _, key := range req.OneTimeKeys {
// grab existing keys based on (user/device/algorithm/key ID) // grab existing keys based on (user/device/algorithm/key ID)
keyIDsWithAlgorithms := make([]string, len(key.KeyJSON)) keyIDsWithAlgorithms := make([]string, len(key.KeyJSON))
@ -521,9 +538,9 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
keyIDsWithAlgorithms[i] = keyIDWithAlgo keyIDsWithAlgorithms[i] = keyIDWithAlgo
i++ i++
} }
existingKeys, err := a.DB.ExistingOneTimeKeys(ctx, key.UserID, key.DeviceID, keyIDsWithAlgorithms) existingKeys, err := a.DB.ExistingOneTimeKeys(ctx, req.UserID, req.DeviceID, keyIDsWithAlgorithms)
if err != nil { if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{ res.KeyError(req.UserID, req.DeviceID, &api.KeyError{
Err: "failed to query existing one-time keys: " + err.Error(), Err: "failed to query existing one-time keys: " + err.Error(),
}) })
continue continue
@ -531,8 +548,8 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
for keyIDWithAlgo := range existingKeys { for keyIDWithAlgo := range existingKeys {
// if keys exist and the JSON doesn't match, error out as the key already exists // if keys exist and the JSON doesn't match, error out as the key already exists
if !bytes.Equal(existingKeys[keyIDWithAlgo], key.KeyJSON[keyIDWithAlgo]) { if !bytes.Equal(existingKeys[keyIDWithAlgo], key.KeyJSON[keyIDWithAlgo]) {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{ res.KeyError(req.UserID, req.DeviceID, &api.KeyError{
Err: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", key.UserID, key.DeviceID, keyIDWithAlgo), Err: fmt.Sprintf("%s device %s: algorithm / key ID %s one-time key already exists", req.UserID, req.DeviceID, keyIDWithAlgo),
}) })
continue continue
} }
@ -540,8 +557,8 @@ func (a *KeyInternalAPI) uploadOneTimeKeys(ctx context.Context, req *api.Perform
// store one-time keys // store one-time keys
counts, err := a.DB.StoreOneTimeKeys(ctx, key) counts, err := a.DB.StoreOneTimeKeys(ctx, key)
if err != nil { if err != nil {
res.KeyError(key.UserID, key.DeviceID, &api.KeyError{ res.KeyError(req.UserID, req.DeviceID, &api.KeyError{
Err: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", key.UserID, key.DeviceID, err.Error()), Err: fmt.Sprintf("%s device %s : failed to store one-time keys: %s", req.UserID, req.DeviceID, err.Error()),
}) })
continue continue
} }

View file

@ -109,7 +109,7 @@ func RemoveDir(dir types.Path, logger *log.Entry) {
// WriteTempFile writes to a new temporary file. // WriteTempFile writes to a new temporary file.
// The file is deleted if there was an error while writing. // The file is deleted if there was an error while writing.
func WriteTempFile( func WriteTempFile(
ctx context.Context, reqReader io.Reader, maxFileSizeBytes config.FileSizeBytes, absBasePath config.Path, ctx context.Context, reqReader io.Reader, absBasePath config.Path,
) (hash types.Base64Hash, size types.FileSizeBytes, path types.Path, err error) { ) (hash types.Base64Hash, size types.FileSizeBytes, path types.Path, err error) {
size = -1 size = -1
logger := util.GetLogger(ctx) logger := util.GetLogger(ctx)
@ -124,18 +124,11 @@ func WriteTempFile(
} }
}() }()
// If the max_file_size_bytes configuration option is set to a positive
// number then limit the upload to that size. Otherwise, just read the
// whole file.
limitedReader := reqReader
if maxFileSizeBytes > 0 {
limitedReader = io.LimitReader(reqReader, int64(maxFileSizeBytes))
}
// Hash the file data. The hash will be returned. The hash is useful as a // Hash the file data. The hash will be returned. The hash is useful as a
// method of deduplicating files to save storage, as well as a way to conduct // method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
hasher := sha256.New() hasher := sha256.New()
teeReader := io.TeeReader(limitedReader, hasher) teeReader := io.TeeReader(reqReader, hasher)
bytesWritten, err := io.Copy(tmpFileWriter, teeReader) bytesWritten, err := io.Copy(tmpFileWriter, teeReader)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
RemoveDir(tmpDir, logger) RemoveDir(tmpDir, logger)

View file

@ -19,6 +19,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"mime" "mime"
"net/http" "net/http"
"net/url" "net/url"
@ -214,7 +215,7 @@ func (r *downloadRequest) doDownload(
ctx, r.MediaMetadata.MediaID, r.MediaMetadata.Origin, ctx, r.MediaMetadata.MediaID, r.MediaMetadata.Origin,
) )
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error querying the database") return nil, fmt.Errorf("db.GetMediaMetadata: %w", err)
} }
if mediaMetadata == nil { if mediaMetadata == nil {
if r.MediaMetadata.Origin == cfg.Matrix.ServerName { if r.MediaMetadata.Origin == cfg.Matrix.ServerName {
@ -253,16 +254,16 @@ func (r *downloadRequest) respondFromLocalFile(
) (*types.MediaMetadata, error) { ) (*types.MediaMetadata, error) {
filePath, err := fileutils.GetPathFromBase64Hash(r.MediaMetadata.Base64Hash, absBasePath) filePath, err := fileutils.GetPathFromBase64Hash(r.MediaMetadata.Base64Hash, absBasePath)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to get file path from metadata") return nil, fmt.Errorf("fileutils.GetPathFromBase64Hash: %w", err)
} }
file, err := os.Open(filePath) file, err := os.Open(filePath)
defer file.Close() // nolint: errcheck, staticcheck, megacheck defer file.Close() // nolint: errcheck, staticcheck, megacheck
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to open file") return nil, fmt.Errorf("os.Open: %w", err)
} }
stat, err := file.Stat() stat, err := file.Stat()
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to stat file") return nil, fmt.Errorf("file.Stat: %w", err)
} }
if r.MediaMetadata.FileSizeBytes > 0 && int64(r.MediaMetadata.FileSizeBytes) != stat.Size() { if r.MediaMetadata.FileSizeBytes > 0 && int64(r.MediaMetadata.FileSizeBytes) != stat.Size() {
@ -324,7 +325,7 @@ func (r *downloadRequest) respondFromLocalFile(
w.Header().Set("Content-Security-Policy", contentSecurityPolicy) w.Header().Set("Content-Security-Policy", contentSecurityPolicy)
if _, err := io.Copy(w, responseFile); err != nil { if _, err := io.Copy(w, responseFile); err != nil {
return nil, errors.Wrap(err, "failed to copy from cache") return nil, fmt.Errorf("io.Copy: %w", err)
} }
return responseMetadata, nil return responseMetadata, nil
} }
@ -421,7 +422,7 @@ func (r *downloadRequest) getThumbnailFile(
ctx, r.MediaMetadata.MediaID, r.MediaMetadata.Origin, ctx, r.MediaMetadata.MediaID, r.MediaMetadata.Origin,
) )
if err != nil { if err != nil {
return nil, nil, errors.Wrap(err, "error looking up thumbnails") return nil, nil, fmt.Errorf("db.GetThumbnails: %w", err)
} }
// If we get a thumbnailSize, a pre-generated thumbnail would be best but it is not yet generated. // If we get a thumbnailSize, a pre-generated thumbnail would be best but it is not yet generated.
@ -459,12 +460,12 @@ func (r *downloadRequest) getThumbnailFile(
thumbFile, err := os.Open(string(thumbPath)) thumbFile, err := os.Open(string(thumbPath))
if err != nil { if err != nil {
thumbFile.Close() // nolint: errcheck thumbFile.Close() // nolint: errcheck
return nil, nil, errors.Wrap(err, "failed to open file") return nil, nil, fmt.Errorf("os.Open: %w", err)
} }
thumbStat, err := thumbFile.Stat() thumbStat, err := thumbFile.Stat()
if err != nil { if err != nil {
thumbFile.Close() // nolint: errcheck thumbFile.Close() // nolint: errcheck
return nil, nil, errors.Wrap(err, "failed to stat file") return nil, nil, fmt.Errorf("thumbFile.Stat: %w", err)
} }
if types.FileSizeBytes(thumbStat.Size()) != thumbnail.MediaMetadata.FileSizeBytes { if types.FileSizeBytes(thumbStat.Size()) != thumbnail.MediaMetadata.FileSizeBytes {
thumbFile.Close() // nolint: errcheck thumbFile.Close() // nolint: errcheck
@ -491,7 +492,7 @@ func (r *downloadRequest) generateThumbnail(
activeThumbnailGeneration, maxThumbnailGenerators, db, r.Logger, activeThumbnailGeneration, maxThumbnailGenerators, db, r.Logger,
) )
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error creating thumbnail") return nil, fmt.Errorf("thumbnailer.GenerateThumbnail: %w", err)
} }
if busy { if busy {
return nil, nil return nil, nil
@ -502,7 +503,7 @@ func (r *downloadRequest) generateThumbnail(
thumbnailSize.Width, thumbnailSize.Height, thumbnailSize.ResizeMethod, thumbnailSize.Width, thumbnailSize.Height, thumbnailSize.ResizeMethod,
) )
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error looking up thumbnail") return nil, fmt.Errorf("db.GetThumbnail: %w", err)
} }
return thumbnail, nil return thumbnail, nil
} }
@ -543,7 +544,7 @@ func (r *downloadRequest) getRemoteFile(
ctx, r.MediaMetadata.MediaID, r.MediaMetadata.Origin, ctx, r.MediaMetadata.MediaID, r.MediaMetadata.Origin,
) )
if err != nil { if err != nil {
return errors.Wrap(err, "error querying the database.") return fmt.Errorf("db.GetMediaMetadata: %w", err)
} }
if mediaMetadata == nil { if mediaMetadata == nil {
@ -555,7 +556,7 @@ func (r *downloadRequest) getRemoteFile(
cfg.MaxThumbnailGenerators, cfg.MaxThumbnailGenerators,
) )
if err != nil { if err != nil {
return errors.Wrap(err, "error querying the database.") return fmt.Errorf("r.fetchRemoteFileAndStoreMetadata: %w", err)
} }
} else { } else {
// If we have a record, we can respond from the local file // If we have a record, we can respond from the local file
@ -673,6 +674,43 @@ func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(
return nil return nil
} }
func (r *downloadRequest) GetContentLengthAndReader(contentLengthHeader string, body *io.ReadCloser, maxFileSizeBytes config.FileSizeBytes) (int64, io.Reader, error) {
reader := *body
var contentLength int64
if contentLengthHeader != "" {
// A Content-Length header is provided. Let's try to parse it.
parsedLength, parseErr := strconv.ParseInt(contentLengthHeader, 10, 64)
if parseErr != nil {
r.Logger.WithError(parseErr).Warn("Failed to parse content length")
return 0, nil, fmt.Errorf("strconv.ParseInt: %w", parseErr)
}
if parsedLength > int64(maxFileSizeBytes) {
return 0, nil, fmt.Errorf(
"remote file size (%d bytes) exceeds locally configured max media size (%d bytes)",
parsedLength, maxFileSizeBytes,
)
}
// We successfully parsed the Content-Length, so we'll return a limited
// reader that restricts us to reading only up to this size.
reader = ioutil.NopCloser(io.LimitReader(*body, parsedLength))
contentLength = parsedLength
} else {
// Content-Length header is missing. If we have a maximum file size
// configured then we'll just make sure that the reader is limited to
// that size. We'll return a zero content length, but that's OK, since
// ultimately it will get rewritten later when the temp file is written
// to disk.
if maxFileSizeBytes > 0 {
reader = ioutil.NopCloser(io.LimitReader(*body, int64(maxFileSizeBytes)))
}
contentLength = 0
}
return contentLength, reader, nil
}
func (r *downloadRequest) fetchRemoteFile( func (r *downloadRequest) fetchRemoteFile(
ctx context.Context, ctx context.Context,
client *gomatrixserverlib.Client, client *gomatrixserverlib.Client,
@ -692,16 +730,18 @@ func (r *downloadRequest) fetchRemoteFile(
} }
defer resp.Body.Close() // nolint: errcheck defer resp.Body.Close() // nolint: errcheck
// get metadata from request and set metadata on response // The reader returned here will be limited either by the Content-Length
contentLength, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) // and/or the configured maximum media size.
if err != nil { contentLength, reader, parseErr := r.GetContentLengthAndReader(resp.Header.Get("Content-Length"), &resp.Body, maxFileSizeBytes)
r.Logger.WithError(err).Warn("Failed to parse content length") if parseErr != nil {
return "", false, errors.Wrap(err, "invalid response from remote server") return "", false, parseErr
} }
if contentLength > int64(maxFileSizeBytes) { if contentLength > int64(maxFileSizeBytes) {
// TODO: Bubble up this as a 413 // TODO: Bubble up this as a 413
return "", false, fmt.Errorf("remote file is too large (%v > %v bytes)", contentLength, maxFileSizeBytes) return "", false, fmt.Errorf("remote file is too large (%v > %v bytes)", contentLength, maxFileSizeBytes)
} }
r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(contentLength) r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(contentLength)
r.MediaMetadata.ContentType = types.ContentType(resp.Header.Get("Content-Type")) r.MediaMetadata.ContentType = types.ContentType(resp.Header.Get("Content-Type"))
@ -728,7 +768,7 @@ func (r *downloadRequest) fetchRemoteFile(
// method of deduplicating files to save storage, as well as a way to conduct // method of deduplicating files to save storage, as well as a way to conduct
// integrity checks on the file data in the repository. // integrity checks on the file data in the repository.
// Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK. // Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK.
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, resp.Body, maxFileSizeBytes, absBasePath) hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, reader, absBasePath)
if err != nil { if err != nil {
r.Logger.WithError(err).WithFields(log.Fields{ r.Logger.WithError(err).WithFields(log.Fields{
"MaxFileSizeBytes": maxFileSizeBytes, "MaxFileSizeBytes": maxFileSizeBytes,
@ -747,7 +787,7 @@ func (r *downloadRequest) fetchRemoteFile(
// The database is the source of truth so we need to have moved the file first // The database is the source of truth so we need to have moved the file first
finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger) finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger)
if err != nil { if err != nil {
return "", false, errors.Wrap(err, "failed to move file") return "", false, fmt.Errorf("fileutils.MoveFileWithHashCheck: %w", err)
} }
if duplicate { if duplicate {
r.Logger.WithField("dst", finalPath).Info("File was stored previously - discarding duplicate") r.Logger.WithField("dst", finalPath).Info("File was stored previously - discarding duplicate")

View file

@ -147,7 +147,7 @@ func (r *uploadRequest) doUpload(
// r.storeFileAndMetadata(ctx, tmpDir, ...) // r.storeFileAndMetadata(ctx, tmpDir, ...)
// before you return from doUpload else we will leak a temp file. We could make this nicer with a `WithTransaction` style of // before you return from doUpload else we will leak a temp file. We could make this nicer with a `WithTransaction` style of
// nested function to guarantee either storage or cleanup. // nested function to guarantee either storage or cleanup.
hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, reqReader, *cfg.MaxFileSizeBytes, cfg.AbsBasePath) hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(ctx, reqReader, cfg.AbsBasePath)
if err != nil { if err != nil {
r.Logger.WithError(err).WithFields(log.Fields{ r.Logger.WithError(err).WithFields(log.Fields{
"MaxFileSizeBytes": *cfg.MaxFileSizeBytes, "MaxFileSizeBytes": *cfg.MaxFileSizeBytes,

View file

@ -107,6 +107,23 @@ func (r *Inputer) updateMembership(
return updates, nil return updates, nil
} }
// In an ideal world, we shouldn't ever have "add" be nil and "remove" be
// set, as this implies that we're deleting a state event without replacing
// it (a thing that ordinarily shouldn't happen in Matrix). However, state
// resets are sadly a thing occasionally and we have to account for that.
// Beforehand there used to be a check here which stopped dead if we hit
// this scenario, but that meant that the membership table got out of sync
// after a state reset, often thinking that the user was still joined to
// the room even though the room state said otherwise, and this would prevent
// the user from being able to attempt to rejoin the room without modifying
// the database. So instead what we'll do is we'll just update the membership
// table to say that the user is "leave" and we'll use the old event to
// avoid nil pointer exceptions on the code path that follows.
if add == nil {
add = remove
newMembership = gomatrixserverlib.Leave
}
mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add)) mu, err := updater.MembershipUpdater(targetUserNID, r.isLocalTarget(add))
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -112,7 +112,7 @@ func (r *Queryer) QueryStateAfterEvents(
return fmt.Errorf("getAuthChain: %w", err) return fmt.Errorf("getAuthChain: %w", err)
} }
stateEvents, err = state.ResolveConflictsAdhoc(info.RoomVersion, stateEvents, authEvents) stateEvents, err = gomatrixserverlib.ResolveConflicts(info.RoomVersion, stateEvents, authEvents)
if err != nil { if err != nil {
return fmt.Errorf("state.ResolveConflictsAdhoc: %w", err) return fmt.Errorf("state.ResolveConflictsAdhoc: %w", err)
} }
@ -469,7 +469,7 @@ func (r *Queryer) QueryStateAndAuthChain(
} }
if request.ResolveState { if request.ResolveState {
if stateEvents, err = state.ResolveConflictsAdhoc( if stateEvents, err = gomatrixserverlib.ResolveConflicts(
info.RoomVersion, stateEvents, authEvents, info.RoomVersion, stateEvents, authEvents,
); err != nil { ); err != nil {
return err return err

View file

@ -683,79 +683,6 @@ func (v *StateResolution) calculateStateAfterManyEvents(
return return
} }
// ResolveConflictsAdhoc is a helper function to assist the query API in
// performing state resolution when requested. This is a different code
// path to the rest of state.go because this assumes you already have
// gomatrixserverlib.Event objects and not just a bunch of NIDs like
// elsewhere in the state resolution.
// TODO: Some of this can possibly be deduplicated
func ResolveConflictsAdhoc(
version gomatrixserverlib.RoomVersion,
events []*gomatrixserverlib.Event,
authEvents []*gomatrixserverlib.Event,
) ([]*gomatrixserverlib.Event, error) {
type stateKeyTuple struct {
Type string
StateKey string
}
// Prepare our data structures.
eventMap := make(map[stateKeyTuple][]*gomatrixserverlib.Event)
var conflicted, notConflicted, resolved []*gomatrixserverlib.Event
// Run through all of the events that we were given and sort them
// into a map, sorted by (event_type, state_key) tuple. This means
// that we can easily spot events that are "conflicted", e.g.
// there are duplicate values for the same tuple key.
for _, event := range events {
if event.StateKey() == nil {
// Ignore events that are not state events.
continue
}
// Append the events if there is already a conflicted list for
// this tuple key, create it if not.
tuple := stateKeyTuple{event.Type(), *event.StateKey()}
eventMap[tuple] = append(eventMap[tuple], event)
}
// Split out the events in the map into conflicted and unconflicted
// buckets. The conflicted events will be ran through state res,
// whereas unconfliced events will always going to appear in the
// final resolved state.
for _, list := range eventMap {
if len(list) > 1 {
conflicted = append(conflicted, list...)
} else {
notConflicted = append(notConflicted, list...)
}
}
// Work out which state resolution algorithm we want to run for
// the room version.
stateResAlgo, err := version.StateResAlgorithm()
if err != nil {
return nil, err
}
switch stateResAlgo {
case gomatrixserverlib.StateResV1:
// Currently state res v1 doesn't handle unconflicted events
// for us, like state res v2 does, so we will need to add the
// unconflicted events into the state ourselves.
// TODO: Fix state res v1 so this is handled for the caller.
resolved = gomatrixserverlib.ResolveStateConflicts(conflicted, authEvents)
resolved = append(resolved, notConflicted...)
case gomatrixserverlib.StateResV2:
// TODO: auth difference here?
resolved = gomatrixserverlib.ResolveStateConflictsV2(conflicted, notConflicted, authEvents, authEvents)
default:
return nil, fmt.Errorf("unsupported state resolution algorithm %v", stateResAlgo)
}
// Return the final resolved state events, including both the
// resolved set of conflicted events, and the unconflicted events.
return resolved, nil
}
func (v *StateResolution) resolveConflicts( func (v *StateResolution) resolveConflicts(
ctx context.Context, version gomatrixserverlib.RoomVersion, ctx context.Context, version gomatrixserverlib.RoomVersion,
notConflicted, conflicted []types.StateEntry, notConflicted, conflicted []types.StateEntry,

View file

@ -46,7 +46,7 @@ const (
// Defaults sets the request defaults // Defaults sets the request defaults
func Defaults(r *gomatrixserverlib.MSC2946SpacesRequest) { func Defaults(r *gomatrixserverlib.MSC2946SpacesRequest) {
r.Limit = 100 r.Limit = 2000
r.MaxRoomsPerSpace = -1 r.MaxRoomsPerSpace = -1
} }
@ -70,11 +70,11 @@ func Enable(
} }
}) })
base.PublicClientAPIMux.Handle("/unstable/rooms/{roomID}/spaces", base.PublicClientAPIMux.Handle("/unstable/org.matrix.msc2946/rooms/{roomID}/spaces",
httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI, fsAPI, base.Cfg.Global.ServerName)), httputil.MakeAuthAPI("spaces", userAPI, spacesHandler(db, rsAPI, fsAPI, base.Cfg.Global.ServerName)),
).Methods(http.MethodPost, http.MethodOptions) ).Methods(http.MethodPost, http.MethodOptions)
base.PublicFederationAPIMux.Handle("/unstable/spaces/{roomID}", httputil.MakeExternalAPI( base.PublicFederationAPIMux.Handle("/unstable/org.matrix.msc2946/spaces/{roomID}", httputil.MakeExternalAPI(
"msc2946_fed_spaces", func(req *http.Request) util.JSONResponse { "msc2946_fed_spaces", func(req *http.Request) util.JSONResponse {
fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest( fedReq, errResp := gomatrixserverlib.VerifyHTTPRequest(
req, time.Now(), base.Cfg.Global.ServerName, keyRing, req, time.Now(), base.Cfg.Global.ServerName, keyRing,
@ -108,9 +108,6 @@ func federatedSpacesHandler(
JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()), JSON: jsonerror.BadJSON("The request body could not be decoded into valid JSON. " + err.Error()),
} }
} }
if r.Limit > 100 {
r.Limit = 100
}
w := walker{ w := walker{
req: &r, req: &r,
rootRoomID: roomID, rootRoomID: roomID,
@ -147,9 +144,6 @@ func spacesHandler(
if resErr := chttputil.UnmarshalJSONRequest(req, &r); resErr != nil { if resErr := chttputil.UnmarshalJSONRequest(req, &r); resErr != nil {
return *resErr return *resErr
} }
if r.Limit > 100 {
r.Limit = 100
}
w := walker{ w := walker{
req: &r, req: &r,
rootRoomID: roomID, rootRoomID: roomID,

View file

@ -309,7 +309,7 @@ func postSpaces(t *testing.T, expectCode int, accessToken, roomID string, req *g
t.Fatalf("failed to marshal request: %s", err) t.Fatalf("failed to marshal request: %s", err)
} }
httpReq, err := http.NewRequest( httpReq, err := http.NewRequest(
"POST", "http://localhost:8010/_matrix/client/unstable/rooms/"+url.PathEscape(roomID)+"/spaces", "POST", "http://localhost:8010/_matrix/client/unstable/org.matrix.msc2946/rooms/"+url.PathEscape(roomID)+"/spaces",
bytes.NewBuffer(data), bytes.NewBuffer(data),
) )
httpReq.Header.Set("Authorization", "Bearer "+accessToken) httpReq.Header.Set("Authorization", "Bearer "+accessToken)

View file

@ -75,7 +75,7 @@ const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" + "INSERT INTO syncapi_output_room_events (" +
"room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + "room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) " +
"ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = $11 " + "ON CONFLICT ON CONSTRAINT syncapi_event_id_idx DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $11) " +
"RETURNING id" "RETURNING id"
const selectEventsSQL = "" + const selectEventsSQL = "" +

View file

@ -54,7 +54,7 @@ const insertEventSQL = "" +
"INSERT INTO syncapi_output_room_events (" + "INSERT INTO syncapi_output_room_events (" +
"id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" + "id, room_id, event_id, headered_event_json, type, sender, contains_url, add_state_ids, remove_state_ids, session_id, transaction_id, exclude_from_sync" +
") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " + ") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) " +
"ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = $13" "ON CONFLICT (event_id) DO UPDATE SET exclude_from_sync = (excluded.exclude_from_sync AND $13)"
const selectEventsSQL = "" + const selectEventsSQL = "" +
"SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1" "SELECT event_id, id, headered_event_json, session_id, exclude_from_sync, transaction_id FROM syncapi_output_room_events WHERE event_id = $1"

View file

@ -67,3 +67,6 @@ Forgotten room messages cannot be paginated
# Blacklisted due to flakiness # Blacklisted due to flakiness
Can re-join room if re-invited Can re-join room if re-invited
# Blacklisted due to flakiness after #1774
Local device key changes get to remote servers with correct prev_id

View file

@ -143,7 +143,6 @@ Local new device changes appear in v2 /sync
Local update device changes appear in v2 /sync Local update device changes appear in v2 /sync
Get left notifs for other users in sync and /keys/changes when user leaves Get left notifs for other users in sync and /keys/changes when user leaves
Local device key changes get to remote servers Local device key changes get to remote servers
Local device key changes get to remote servers with correct prev_id
Server correctly handles incoming m.device_list_update Server correctly handles incoming m.device_list_update
If remote user leaves room, changes device and rejoins we see update in sync If remote user leaves room, changes device and rejoins we see update in sync
If remote user leaves room, changes device and rejoins we see update in /keys/changes If remote user leaves room, changes device and rejoins we see update in /keys/changes

View file

@ -87,7 +87,7 @@ func (a *UserInternalAPI) PerformAccountCreation(ctx context.Context, req *api.P
ServerName: a.ServerName, ServerName: a.ServerName,
UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName), UserID: fmt.Sprintf("@%s:%s", req.Localpart, a.ServerName),
} }
return nil return err
} }
if err = a.AccountDB.SetDisplayName(ctx, req.Localpart, req.Localpart); err != nil { if err = a.AccountDB.SetDisplayName(ctx, req.Localpart, req.Localpart); err != nil {
@ -161,6 +161,7 @@ func (a *UserInternalAPI) deviceListUpdate(userID string, deviceIDs []string) er
var uploadRes keyapi.PerformUploadKeysResponse var uploadRes keyapi.PerformUploadKeysResponse
a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{ a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
UserID: userID,
DeviceKeys: deviceKeys, DeviceKeys: deviceKeys,
}, &uploadRes) }, &uploadRes)
if uploadRes.Error != nil { if uploadRes.Error != nil {
@ -217,6 +218,7 @@ func (a *UserInternalAPI) PerformDeviceUpdate(ctx context.Context, req *api.Perf
// display name has changed: update the device key // display name has changed: update the device key
var uploadRes keyapi.PerformUploadKeysResponse var uploadRes keyapi.PerformUploadKeysResponse
a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{ a.KeyAPI.PerformUploadKeys(context.Background(), &keyapi.PerformUploadKeysRequest{
UserID: req.RequestingUserID,
DeviceKeys: []keyapi.DeviceKeys{ DeviceKeys: []keyapi.DeviceKeys{
{ {
DeviceID: dev.ID, DeviceID: dev.ID,

View file

@ -170,8 +170,8 @@ func (d *Database) CreateAccount(
func (d *Database) createAccount( func (d *Database) createAccount(
ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string, ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string,
) (*api.Account, error) { ) (*api.Account, error) {
var account *api.Account
var err error var err error
// Generate a password hash if this is not a password-less user // Generate a password hash if this is not a password-less user
hash := "" hash := ""
if plaintextPassword != "" { if plaintextPassword != "" {
@ -180,14 +180,16 @@ func (d *Database) createAccount(
return nil, err return nil, err
} }
} }
if err := d.profiles.insertProfile(ctx, txn, localpart); err != nil { if account, err = d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID); err != nil {
if sqlutil.IsUniqueConstraintViolationErr(err) { if sqlutil.IsUniqueConstraintViolationErr(err) {
return nil, sqlutil.ErrUserExists return nil, sqlutil.ErrUserExists
} }
return nil, err return nil, err
} }
if err = d.profiles.insertProfile(ctx, txn, localpart); err != nil {
if err := d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{ return nil, err
}
if err = d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{
"global": { "global": {
"content": [], "content": [],
"override": [], "override": [],
@ -198,7 +200,7 @@ func (d *Database) createAccount(
}`)); err != nil { }`)); err != nil {
return nil, err return nil, err
} }
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID) return account, nil
} }
// SaveAccountData saves new account data for a given user and a given room. // SaveAccountData saves new account data for a given user and a given room.

View file

@ -1,27 +0,0 @@
// Copyright 2020 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// +build !wasm
package sqlite3
import (
"errors"
"github.com/mattn/go-sqlite3"
)
func isConstraintError(err error) bool {
return errors.Is(err, sqlite3.ErrConstraint)
}

View file

@ -204,6 +204,7 @@ func (d *Database) createAccount(
ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string, ctx context.Context, txn *sql.Tx, localpart, plaintextPassword, appserviceID string,
) (*api.Account, error) { ) (*api.Account, error) {
var err error var err error
var account *api.Account
// Generate a password hash if this is not a password-less user // Generate a password hash if this is not a password-less user
hash := "" hash := ""
if plaintextPassword != "" { if plaintextPassword != "" {
@ -212,14 +213,13 @@ func (d *Database) createAccount(
return nil, err return nil, err
} }
} }
if err := d.profiles.insertProfile(ctx, txn, localpart); err != nil { if account, err = d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID); err != nil {
if isConstraintError(err) {
return nil, sqlutil.ErrUserExists return nil, sqlutil.ErrUserExists
} }
if err = d.profiles.insertProfile(ctx, txn, localpart); err != nil {
return nil, err return nil, err
} }
if err = d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{
if err := d.accountDatas.insertAccountData(ctx, txn, localpart, "", "m.push_rules", json.RawMessage(`{
"global": { "global": {
"content": [], "content": [],
"override": [], "override": [],
@ -230,7 +230,7 @@ func (d *Database) createAccount(
}`)); err != nil { }`)); err != nil {
return nil, err return nil, err
} }
return d.accounts.insertAccount(ctx, txn, localpart, hash, appserviceID) return account, nil
} }
// SaveAccountData saves new account data for a given user and a given room. // SaveAccountData saves new account data for a given user and a given room.