mirror of
https://github.com/hoernschen/dendrite.git
synced 2024-12-27 07:28:27 +00:00
Don't backoff on non-200s, tight send loop, 1 event query
This commit is contained in:
parent
d5865fa67d
commit
3e93ac86fe
2 changed files with 36 additions and 50 deletions
|
@ -50,13 +50,9 @@ CREATE TABLE IF NOT EXISTS appservice_events (
|
|||
CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id);
|
||||
`
|
||||
|
||||
const selectPastEventsByApplicationServiceIDSQL = "" +
|
||||
const selectEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
|
||||
"FROM appservice_events WHERE as_id = $1 AND txn_id > -1 LIMIT $2"
|
||||
|
||||
const selectCurrEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT id, event_id, origin_server_ts, room_id, type, sender, event_content, txn_id " +
|
||||
"FROM appservice_events WHERE as_id = $1 AND txn_id = -1 LIMIT $2"
|
||||
"FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC LIMIT $2"
|
||||
|
||||
const countEventsByApplicationServiceIDSQL = "" +
|
||||
"SELECT COUNT(event_id) FROM appservice_events WHERE as_id = $1"
|
||||
|
@ -72,12 +68,11 @@ const deleteEventsBeforeAndIncludingIDSQL = "" +
|
|||
"DELETE FROM appservice_events WHERE as_id = $1 AND id <= $2"
|
||||
|
||||
type eventsStatements struct {
|
||||
selectPastEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
selectCurrEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
countEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
updateTxnIDForEventsStmt *sql.Stmt
|
||||
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
|
||||
selectEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
countEventsByApplicationServiceIDStmt *sql.Stmt
|
||||
insertEventStmt *sql.Stmt
|
||||
updateTxnIDForEventsStmt *sql.Stmt
|
||||
deleteEventsBeforeAndIncludingIDStmt *sql.Stmt
|
||||
}
|
||||
|
||||
func (s *eventsStatements) prepare(db *sql.DB) (err error) {
|
||||
|
@ -86,10 +81,7 @@ func (s *eventsStatements) prepare(db *sql.DB) (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
if s.selectPastEventsByApplicationServiceIDStmt, err = db.Prepare(selectPastEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.selectCurrEventsByApplicationServiceIDStmt, err = db.Prepare(selectCurrEventsByApplicationServiceIDSQL); err != nil {
|
||||
if s.selectEventsByApplicationServiceIDStmt, err = db.Prepare(selectEventsByApplicationServiceIDSQL); err != nil {
|
||||
return
|
||||
}
|
||||
if s.countEventsByApplicationServiceIDStmt, err = db.Prepare(countEventsByApplicationServiceIDSQL); err != nil {
|
||||
|
@ -121,28 +113,8 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
|||
events []gomatrixserverlib.ApplicationServiceEvent,
|
||||
err error,
|
||||
) {
|
||||
// First check to see if there are any events part of an old transaction
|
||||
eventRowsPast, err := s.selectPastEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
|
||||
if err != nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
defer func() {
|
||||
err = eventRowsPast.Close()
|
||||
if err != nil {
|
||||
log.WithError(err).Fatalf("Appservice %s unable to select past events to send",
|
||||
applicationServiceID)
|
||||
}
|
||||
}()
|
||||
events, txnID, maxID, err = retrieveEvents(eventRowsPast)
|
||||
if err != nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
if len(events) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Else, if there are old events with existing transaction IDs, grab a batch of new events
|
||||
eventRowsCurr, err := s.selectCurrEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
|
||||
// Retrieve events from the database. Unsuccessfully sent events first
|
||||
eventRowsCurr, err := s.selectEventsByApplicationServiceIDStmt.QueryContext(ctx, applicationServiceID, limit)
|
||||
if err != nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
|
@ -163,6 +135,9 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
|
|||
|
||||
func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, txnID, maxID int, err error) {
|
||||
// Iterate through each row and store event contents
|
||||
// If txn_id changes dramatically, we've switched from collecting old events to
|
||||
// new ones. Send back those events first.
|
||||
lastTxnID := -2 // Invalid transaction ID
|
||||
for eventRows.Next() {
|
||||
var event gomatrixserverlib.ApplicationServiceEvent
|
||||
var eventContent sql.NullString
|
||||
|
@ -181,6 +156,14 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application
|
|||
fmt.Println("Failed:", err.Error())
|
||||
return nil, 0, 0, err
|
||||
}
|
||||
|
||||
// If txnID has changed on this event from the previous event, then we've
|
||||
// reached the end of a transaction's events. Return only those events.
|
||||
if lastTxnID > -2 && lastTxnID != txnID {
|
||||
return
|
||||
}
|
||||
lastTxnID = txnID
|
||||
|
||||
if eventContent.Valid {
|
||||
event.Content = gomatrixserverlib.RawJSON(eventContent.String)
|
||||
}
|
||||
|
@ -192,7 +175,8 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application
|
|||
ageMilli := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
event.Age = ageMilli - event.OriginServerTimestamp
|
||||
|
||||
// TODO: Synapse does this. Do app services really require this? :)
|
||||
// TODO: Synapse does this. It's unnecessary to send Sender and UserID as the
|
||||
// same value. Do app services really require this? :)
|
||||
event.Sender = event.UserID
|
||||
|
||||
events = append(events, event)
|
||||
|
|
|
@ -39,7 +39,7 @@ var (
|
|||
// TL;DR: Don't lower this number with any AS events still left in the database.
|
||||
transactionBatchSize = 50
|
||||
// Timeout for sending a single transaction to an application service.
|
||||
transactionTimeout = time.Second * 15
|
||||
transactionTimeout = time.Second * 60
|
||||
// The current transaction ID. Increments after every successful transaction.
|
||||
currentTransactionID = 0
|
||||
)
|
||||
|
@ -98,7 +98,6 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
|
|||
for {
|
||||
// Wait for more events if we've sent all the events in the database
|
||||
if *ws.EventsReady <= 0 {
|
||||
fmt.Println("Waiting")
|
||||
ws.Cond.L.Lock()
|
||||
ws.Cond.Wait()
|
||||
ws.Cond.L.Unlock()
|
||||
|
@ -114,11 +113,15 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
|
|||
}
|
||||
|
||||
// Send the events off to the application service
|
||||
err = send(client, ws.AppService, transactionID, transactionJSON)
|
||||
if err != nil {
|
||||
// Backoff
|
||||
backoff(err, &ws)
|
||||
continue
|
||||
// Backoff if the application service does not respond
|
||||
for {
|
||||
err = send(client, ws.AppService, transactionID, transactionJSON)
|
||||
if err != nil {
|
||||
// Backoff
|
||||
backoff(&ws, err)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
// We sent successfully, hooray!
|
||||
|
@ -147,7 +150,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
|
|||
}
|
||||
|
||||
// backoff pauses the calling goroutine for a 2^some backoff exponent seconds
|
||||
func backoff(err error, ws *types.ApplicationServiceWorkerState) {
|
||||
func backoff(ws *types.ApplicationServiceWorkerState, err error) {
|
||||
// Calculate how long to backoff for
|
||||
backoffDuration := time.Duration(math.Pow(2, float64(ws.Backoff)))
|
||||
backoffSeconds := time.Second * backoffDuration
|
||||
|
@ -233,9 +236,8 @@ func send(
|
|||
|
||||
// Check the AS received the events correctly
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf(
|
||||
"Non-OK status code %d returned from AS", resp.StatusCode,
|
||||
)
|
||||
// TODO: Handle non-200 error codes from application services
|
||||
return fmt.Errorf("Non-OK status code %d returned from AS", resp.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
Loading…
Reference in a new issue