mirror of
https://github.com/hoernschen/dendrite.git
synced 2025-08-02 06:12:45 +00:00
Merge branch 'master' into neilalexander/onoldroomevent
This commit is contained in:
commit
81ad3ef5f6
151 changed files with 3603 additions and 948 deletions
|
@ -19,6 +19,7 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/internal/eventutil"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
|
@ -78,6 +79,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
|||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("client API server output log: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -90,6 +92,7 @@ func (s *OutputClientDataConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
|||
context.TODO(), string(msg.Key), output.RoomID, output.Type,
|
||||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithFields(log.Fields{
|
||||
"type": output.Type,
|
||||
"room_id": output.RoomID,
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
|
@ -78,6 +79,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
|
|||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -90,6 +92,7 @@ func (s *OutputReceiptEventConsumer) onMessage(msg *sarama.ConsumerMessage) erro
|
|||
output.Timestamp,
|
||||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
|
@ -82,11 +83,13 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
|
|||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
|
||||
_, domain, err := gomatrixserverlib.SplitID('@', output.UserID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
if domain != s.serverName {
|
||||
|
@ -104,6 +107,7 @@ func (s *OutputSendToDeviceEventConsumer) onMessage(msg *sarama.ConsumerMessage)
|
|||
context.TODO(), output.UserID, output.DeviceID, output.SendToDeviceEvent,
|
||||
)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.WithError(err).Errorf("failed to store send-to-device message")
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"encoding/json"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/eduserver/api"
|
||||
"github.com/matrix-org/dendrite/eduserver/cache"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
|
@ -83,6 +84,7 @@ func (s *OutputTypingEventConsumer) onMessage(msg *sarama.ConsumerMessage) error
|
|||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Errorf("EDU server output log: message parse failure")
|
||||
sentry.CaptureException(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/keyserver/api"
|
||||
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
|
||||
|
@ -107,6 +108,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
|||
if err := json.Unmarshal(msg.Value, &output); err != nil {
|
||||
// If the message was invalid, log it and move on to the next message in the stream
|
||||
log.WithError(err).Error("syncapi: failed to unmarshal key change event from key server")
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
// work out who we need to notify about the new key
|
||||
|
@ -116,6 +118,7 @@ func (s *OutputKeyChangeEventConsumer) onMessage(msg *sarama.ConsumerMessage) er
|
|||
}, &queryRes)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("syncapi: failed to QuerySharedUsers for key change event from key server")
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
// make sure we get our own key updates too!
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/Shopify/sarama"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/matrix-org/dendrite/internal"
|
||||
"github.com/matrix-org/dendrite/roomserver/api"
|
||||
"github.com/matrix-org/dendrite/setup/config"
|
||||
|
@ -148,18 +149,21 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
|||
|
||||
ev, err := s.updateStateEvent(ev)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
|
||||
for i := range addsStateEvents {
|
||||
addsStateEvents[i], err = s.updateStateEvent(addsStateEvents[i])
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if msg.RewritesState {
|
||||
if err = s.db.PurgeRoomState(ctx, ev.RoomID()); err != nil {
|
||||
sentry.CaptureException(err)
|
||||
return fmt.Errorf("s.db.PurgeRoom: %w", err)
|
||||
}
|
||||
}
|
||||
|
@ -187,6 +191,7 @@ func (s *OutputRoomEventConsumer) onNewRoomEvent(
|
|||
|
||||
if pduPos, err = s.notifyJoinedPeeks(ctx, ev, pduPos); err != nil {
|
||||
log.WithError(err).Errorf("Failed to notifyJoinedPeeks for PDU pos %d", pduPos)
|
||||
sentry.CaptureException(err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -274,6 +279,7 @@ func (s *OutputRoomEventConsumer) onNewInviteEvent(
|
|||
}
|
||||
pduPos, err := s.db.AddInviteEvent(ctx, msg.Event)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": msg.Event.EventID(),
|
||||
|
@ -295,6 +301,7 @@ func (s *OutputRoomEventConsumer) onRetireInviteEvent(
|
|||
) error {
|
||||
pduPos, err := s.db.RetireInviteEvent(ctx, msg.EventID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
"event_id": msg.EventID,
|
||||
|
@ -315,6 +322,7 @@ func (s *OutputRoomEventConsumer) onNewPeek(
|
|||
) error {
|
||||
sp, err := s.db.AddPeek(ctx, msg.RoomID, msg.UserID, msg.DeviceID)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
// panic rather than continue with an inconsistent database
|
||||
log.WithFields(log.Fields{
|
||||
log.ErrorKey: err,
|
||||
|
|
|
@ -46,7 +46,6 @@ func DeviceOTKCounts(ctx context.Context, keyAPI keyapi.KeyInternalAPI, userID,
|
|||
// DeviceListCatchup fills in the given response for the given user ID to bring it up-to-date with device lists. hasNew=true if the response
|
||||
// was filled in, else false if there are no new device list changes because there is nothing to catch up on. The response MUST
|
||||
// be already filled in with join/leave information.
|
||||
// nolint:gocyclo
|
||||
func DeviceListCatchup(
|
||||
ctx context.Context, keyAPI keyapi.KeyInternalAPI, rsAPI roomserverAPI.RoomserverInternalAPI,
|
||||
userID string, res *types.Response, from, to types.LogPosition,
|
||||
|
@ -137,7 +136,6 @@ func DeviceListCatchup(
|
|||
}
|
||||
|
||||
// TrackChangedUsers calculates the values of device_lists.changed|left in the /sync response.
|
||||
// nolint:gocyclo
|
||||
func TrackChangedUsers(
|
||||
ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, userID string, newlyJoinedRooms, newlyLeftRooms []string,
|
||||
) (changed, left []string, err error) {
|
||||
|
|
|
@ -61,7 +61,6 @@ const defaultMessagesLimit = 10
|
|||
// OnIncomingMessagesRequest implements the /messages endpoint from the
|
||||
// client-server API.
|
||||
// See: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-client-r0-rooms-roomid-messages
|
||||
// nolint:gocyclo
|
||||
func OnIncomingMessagesRequest(
|
||||
req *http.Request, db storage.Database, roomID string, device *userapi.Device,
|
||||
federation *gomatrixserverlib.FederationClient,
|
||||
|
@ -306,7 +305,6 @@ func (r *messagesReq) retrieveEvents() (
|
|||
return clientEvents, start, end, err
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (r *messagesReq) filterHistoryVisible(events []*gomatrixserverlib.HeaderedEvent) []*gomatrixserverlib.HeaderedEvent {
|
||||
// TODO FIXME: We don't fully implement history visibility yet. To avoid leaking events which the
|
||||
// user shouldn't see, we check the recent events and remove any prior to the join event of the user
|
||||
|
|
|
@ -36,7 +36,6 @@ type SyncServerDatasource struct {
|
|||
}
|
||||
|
||||
// NewDatabase creates a new sync server database
|
||||
// nolint:gocyclo
|
||||
func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, error) {
|
||||
var d SyncServerDatasource
|
||||
var err error
|
||||
|
|
|
@ -661,7 +661,6 @@ func (d *Database) fetchMissingStateEvents(
|
|||
// exclusive of oldPos, inclusive of newPos, for the rooms in which
|
||||
// the user has new membership events.
|
||||
// A list of joined room IDs is also returned in case the caller needs it.
|
||||
// nolint:gocyclo
|
||||
func (d *Database) GetStateDeltas(
|
||||
ctx context.Context, device *userapi.Device,
|
||||
r types.Range, userID string,
|
||||
|
@ -773,7 +772,6 @@ func (d *Database) GetStateDeltas(
|
|||
// requests with full_state=true.
|
||||
// Fetches full state for all joined rooms and uses selectStateInRange to get
|
||||
// updates for other rooms.
|
||||
// nolint:gocyclo
|
||||
func (d *Database) GetStateDeltasForFullStateSync(
|
||||
ctx context.Context, device *userapi.Device,
|
||||
r types.Range, userID string,
|
||||
|
|
|
@ -23,7 +23,6 @@ const (
|
|||
// fields might come from either a StateFilter or an EventFilter,
|
||||
// and it's easier just to have the caller extract the relevant
|
||||
// parts.
|
||||
// nolint:gocyclo
|
||||
func prepareWithFilters(
|
||||
db *sql.DB, txn *sql.Tx, query string, params []interface{},
|
||||
senders, notsenders, types, nottypes []string, excludeEventIDs []string,
|
||||
|
|
|
@ -52,7 +52,6 @@ func NewDatabase(dbProperties *config.DatabaseOptions) (*SyncServerDatasource, e
|
|||
return &d, nil
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (d *SyncServerDatasource) prepare(dbProperties *config.DatabaseOptions) (err error) {
|
||||
if err = d.PartitionOffsetStatements.Prepare(d.db, d.writer, "syncapi"); err != nil {
|
||||
return err
|
||||
|
|
|
@ -137,7 +137,6 @@ func (p *PDUStreamProvider) CompleteSync(
|
|||
return to
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (p *PDUStreamProvider) IncrementalSync(
|
||||
ctx context.Context,
|
||||
req *types.SyncRequest,
|
||||
|
@ -254,7 +253,6 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
|
|||
return nil
|
||||
}
|
||||
|
||||
// nolint:gocyclo
|
||||
func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
|
||||
ctx context.Context,
|
||||
roomID string,
|
||||
|
|
|
@ -156,7 +156,7 @@ func (rp *RequestPool) OnIncomingSyncRequest(req *http.Request, device *userapi.
|
|||
|
||||
currentPos := rp.Notifier.CurrentPosition()
|
||||
|
||||
if !rp.shouldReturnImmediately(syncReq) {
|
||||
if !rp.shouldReturnImmediately(syncReq, currentPos) {
|
||||
timer := time.NewTimer(syncReq.Timeout) // case of timeout=0 is handled above
|
||||
defer timer.Stop()
|
||||
|
||||
|
@ -303,8 +303,8 @@ func (rp *RequestPool) OnIncomingKeyChangeRequest(req *http.Request, device *use
|
|||
// shouldReturnImmediately returns whether the /sync request is an initial sync,
|
||||
// or timeout=0, or full_state=true, in any of the cases the request should
|
||||
// return immediately.
|
||||
func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest) bool {
|
||||
if syncReq.Since.IsEmpty() || syncReq.Timeout == 0 || syncReq.WantFullState {
|
||||
func (rp *RequestPool) shouldReturnImmediately(syncReq *types.SyncRequest, currentPos types.StreamingToken) bool {
|
||||
if currentPos.IsAfter(syncReq.Since) || syncReq.Timeout == 0 || syncReq.WantFullState {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue