Remove tight send loop. Fix events not being deleted

This commit is contained in:
Andrew Morgan 2018-06-12 15:38:45 +01:00
parent 3e93ac86fe
commit af841cdcc5
2 changed files with 24 additions and 26 deletions

View file

@ -125,15 +125,18 @@ func (s *eventsStatements) selectEventsByApplicationServiceID(
applicationServiceID) applicationServiceID)
} }
}() }()
events, _, maxID, err = retrieveEvents(eventRowsCurr) events, maxID, txnID, err = retrieveEvents(eventRowsCurr)
if err != nil { if err != nil {
return 0, 0, nil, err return 0, 0, nil, err
} }
return -1, maxID, events, err return
} }
func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, txnID, maxID int, err error) { func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.ApplicationServiceEvent, maxID, txnID int, err error) {
// Get current time for use in calculating event age
nowMilli := time.Now().UnixNano() / int64(time.Millisecond)
// Iterate through each row and store event contents // Iterate through each row and store event contents
// If txn_id changes dramatically, we've switched from collecting old events to // If txn_id changes dramatically, we've switched from collecting old events to
// new ones. Send back those events first. // new ones. Send back those events first.
@ -172,8 +175,8 @@ func retrieveEvents(eventRows *sql.Rows) (events []gomatrixserverlib.Application
} }
// Get age of the event from original timestamp and current time // Get age of the event from original timestamp and current time
ageMilli := time.Now().UnixNano() / int64(time.Millisecond) // TODO: Consider removing age as not many app services use it
event.Age = ageMilli - event.OriginServerTimestamp event.Age = nowMilli - event.OriginServerTimestamp
// TODO: Synapse does this. It's unnecessary to send Sender and UserID as the // TODO: Synapse does this. It's unnecessary to send Sender and UserID as the
// same value. Do app services really require this? :) // same value. Do app services really require this? :)
@ -216,7 +219,7 @@ func (s *eventsStatements) insertEvent(
event.Type(), event.Type(),
event.Sender(), event.Sender(),
event.Content(), event.Content(),
-1, -1, // No transaction ID yet
) )
return return
} }

View file

@ -104,7 +104,7 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
} }
// Batch events up into a transaction // Batch events up into a transaction
eventsCount, maxEventID, transactionID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID) eventsCount, txnID, maxEventID, transactionJSON, err := createTransaction(ctx, db, ws.AppService.ID)
if err != nil { if err != nil {
log.WithError(err).Fatalf("appservice %s worker unable to create transaction", log.WithError(err).Fatalf("appservice %s worker unable to create transaction",
ws.AppService.ID) ws.AppService.ID)
@ -114,14 +114,11 @@ func worker(db *storage.Database, ws types.ApplicationServiceWorkerState) {
// Send the events off to the application service // Send the events off to the application service
// Backoff if the application service does not respond // Backoff if the application service does not respond
for { err = send(client, ws.AppService, txnID, transactionJSON)
err = send(client, ws.AppService, transactionID, transactionJSON) if err != nil {
if err != nil { // Backoff
// Backoff backoff(&ws, err)
backoff(&ws, err) continue
continue
}
break
} }
// We sent successfully, hooray! // We sent successfully, hooray!
@ -174,12 +171,10 @@ func createTransaction(
db *storage.Database, db *storage.Database,
appserviceID string, appserviceID string,
) ( ) (
eventsCount, maxID, txnID int, eventsCount, txnID, maxID int,
transactionJSON []byte, transactionJSON []byte,
err error, err error,
) { ) {
transactionID := currentTransactionID
// Retrieve the latest events from the DB (will return old events if they weren't successfully sent) // Retrieve the latest events from the DB (will return old events if they weren't successfully sent)
txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize) txnID, maxID, events, err := db.GetEventsWithAppServiceID(ctx, appserviceID, transactionBatchSize)
if err != nil { if err != nil {
@ -189,12 +184,11 @@ func createTransaction(
return return
} }
// Check if these are old events we are resending. If so, reuse old transactionID // Check if these events already have a transaction ID
if txnID != -1 { if txnID == -1 {
transactionID = txnID txnID = currentTransactionID
} else {
// Mark new events with current transactionID // Mark new events with current transactionID
err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, transactionID) err := db.UpdateTxnIDForEvents(ctx, appserviceID, maxID, currentTransactionID)
if err != nil { if err != nil {
return 0, 0, 0, nil, err return 0, 0, 0, nil, err
} }
@ -210,7 +204,8 @@ func createTransaction(
return return
} }
return len(events), maxID, transactionID, transactionJSON, nil eventsCount = len(events)
return
} }
// send sends events to an application service. Returns an error if an OK was not // send sends events to an application service. Returns an error if an OK was not
@ -218,11 +213,11 @@ func createTransaction(
func send( func send(
client *http.Client, client *http.Client,
appservice config.ApplicationService, appservice config.ApplicationService,
transactionID int, txnID int,
transaction []byte, transaction []byte,
) error { ) error {
// POST a transaction to our AS // POST a transaction to our AS
address := fmt.Sprintf("%s/transactions/%d", appservice.URL, transactionID) address := fmt.Sprintf("%s/transactions/%d", appservice.URL, txnID)
resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction)) resp, err := client.Post(address, "application/json", bytes.NewBuffer(transaction))
if err != nil { if err != nil {
return err return err