mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 23:48:27 +00:00
Only limit context for fetching missing auth/prev events (#2131)
This commit is contained in:
parent
4281976df9
commit
a271fde8f5
3 changed files with 12 additions and 11 deletions
|
@ -120,7 +120,7 @@ func (r *Inputer) Start() error {
|
||||||
nats.DeliverAll(),
|
nats.DeliverAll(),
|
||||||
// Ensure that NATS doesn't try to resend us something that wasn't done
|
// Ensure that NATS doesn't try to resend us something that wasn't done
|
||||||
// within the period of time that we might still be processing it.
|
// within the period of time that we might still be processing it.
|
||||||
nats.AckWait(MaximumProcessingTime+(time.Second*10)),
|
nats.AckWait((MaximumMissingProcessingTime*2)+(time.Second*10)),
|
||||||
)
|
)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ func init() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Does this value make sense?
|
// TODO: Does this value make sense?
|
||||||
const MaximumProcessingTime = time.Minute * 2
|
const MaximumMissingProcessingTime = time.Minute * 2
|
||||||
|
|
||||||
var processRoomEventDuration = prometheus.NewHistogramVec(
|
var processRoomEventDuration = prometheus.NewHistogramVec(
|
||||||
prometheus.HistogramOpts{
|
prometheus.HistogramOpts{
|
||||||
|
@ -66,11 +66,11 @@ var processRoomEventDuration = prometheus.NewHistogramVec(
|
||||||
// TODO: Break up function - we should probably do transaction ID checks before calling this.
|
// TODO: Break up function - we should probably do transaction ID checks before calling this.
|
||||||
// nolint:gocyclo
|
// nolint:gocyclo
|
||||||
func (r *Inputer) processRoomEvent(
|
func (r *Inputer) processRoomEvent(
|
||||||
inctx context.Context,
|
ctx context.Context,
|
||||||
input *api.InputRoomEvent,
|
input *api.InputRoomEvent,
|
||||||
) (err error) {
|
) (err error) {
|
||||||
select {
|
select {
|
||||||
case <-inctx.Done():
|
case <-ctx.Done():
|
||||||
// Before we do anything, make sure the context hasn't expired for this pending task.
|
// Before we do anything, make sure the context hasn't expired for this pending task.
|
||||||
// If it has then we'll give up straight away — it's probably a synchronous input
|
// If it has then we'll give up straight away — it's probably a synchronous input
|
||||||
// request and the caller has already given up, but the inbox task was still queued.
|
// request and the caller has already given up, but the inbox task was still queued.
|
||||||
|
@ -78,13 +78,6 @@ func (r *Inputer) processRoomEvent(
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wrap the context with a time limit. We'll allow no more than MaximumProcessingTime for
|
|
||||||
// everything that we need to do for this event, or it's possible that we could end up wedging
|
|
||||||
// the roomserver for a very long time.
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
ctx, cancel := context.WithTimeout(inctx, MaximumProcessingTime)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Measure how long it takes to process this event.
|
// Measure how long it takes to process this event.
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -344,6 +337,10 @@ func (r *Inputer) fetchAuthEvents(
|
||||||
known map[string]*types.Event,
|
known map[string]*types.Event,
|
||||||
servers []gomatrixserverlib.ServerName,
|
servers []gomatrixserverlib.ServerName,
|
||||||
) error {
|
) error {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, MaximumMissingProcessingTime)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
unknown := map[string]struct{}{}
|
unknown := map[string]struct{}{}
|
||||||
authEventIDs := event.AuthEventIDs()
|
authEventIDs := event.AuthEventIDs()
|
||||||
if len(authEventIDs) == 0 {
|
if len(authEventIDs) == 0 {
|
||||||
|
|
|
@ -37,6 +37,10 @@ type missingStateReq struct {
|
||||||
func (t *missingStateReq) processEventWithMissingState(
|
func (t *missingStateReq) processEventWithMissingState(
|
||||||
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
ctx context.Context, e *gomatrixserverlib.Event, roomVersion gomatrixserverlib.RoomVersion,
|
||||||
) error {
|
) error {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
ctx, cancel = context.WithTimeout(ctx, MaximumMissingProcessingTime)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
// We are missing the previous events for this events.
|
// We are missing the previous events for this events.
|
||||||
// This means that there is a gap in our view of the history of the
|
// This means that there is a gap in our view of the history of the
|
||||||
// room. There two ways that we can handle such a gap:
|
// room. There two ways that we can handle such a gap:
|
||||||
|
|
Loading…
Reference in a new issue