From 0b732d6f45dc96041a85c227812ea0b53b19af68 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Tue, 24 Mar 2020 15:46:17 +0000 Subject: [PATCH] Use HeaderedEvents in appservice component (#939) * App service HeaderedEvents * Fix database queries * Fix lint error --- appservice/consumers/roomserver.go | 22 +++++++++---------- appservice/storage/interface.go | 4 ++-- .../postgres/appservice_events_table.go | 14 ++++++------ appservice/storage/postgres/storage.go | 6 ++--- .../sqlite3/appservice_events_table.go | 14 ++++++------ appservice/storage/sqlite3/storage.go | 6 ++--- appservice/workers/transaction_scheduler.go | 7 +++++- 7 files changed, 38 insertions(+), 35 deletions(-) diff --git a/appservice/consumers/roomserver.go b/appservice/consumers/roomserver.go index 9180d9ef..6ae58e85 100644 --- a/appservice/consumers/roomserver.go +++ b/appservice/consumers/roomserver.go @@ -101,11 +101,11 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { "type": ev.Type(), }).Info("appservice received an event from roomserver") - missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev.Event) + missingEvents, err := s.lookupMissingStateEvents(output.NewRoomEvent.AddsStateEventIDs, ev) if err != nil { return err } - events := append(missingEvents, ev.Event) + events := append(missingEvents, ev) // Send event to any relevant application services return s.filterRoomserverEvents(context.TODO(), events) @@ -114,19 +114,19 @@ func (s *OutputRoomEventConsumer) onMessage(msg *sarama.ConsumerMessage) error { // lookupMissingStateEvents looks up the state events that are added by a new event, // and returns any not already present. func (s *OutputRoomEventConsumer) lookupMissingStateEvents( - addsStateEventIDs []string, event gomatrixserverlib.Event, -) ([]gomatrixserverlib.Event, error) { + addsStateEventIDs []string, event gomatrixserverlib.HeaderedEvent, +) ([]gomatrixserverlib.HeaderedEvent, error) { // Fast path if there aren't any new state events. if len(addsStateEventIDs) == 0 { - return []gomatrixserverlib.Event{}, nil + return []gomatrixserverlib.HeaderedEvent{}, nil } // Fast path if the only state event added is the event itself. if len(addsStateEventIDs) == 1 && addsStateEventIDs[0] == event.EventID() { - return []gomatrixserverlib.Event{}, nil + return []gomatrixserverlib.HeaderedEvent{}, nil } - result := []gomatrixserverlib.Event{} + result := []gomatrixserverlib.HeaderedEvent{} missing := []string{} for _, id := range addsStateEventIDs { if id != event.EventID() { @@ -143,9 +143,7 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents( return nil, err } - for _, headeredEvent := range eventResp.Events { - result = append(result, headeredEvent.Event) - } + result = append(result, eventResp.Events...) return result, nil } @@ -157,7 +155,7 @@ func (s *OutputRoomEventConsumer) lookupMissingStateEvents( // application service. func (s *OutputRoomEventConsumer) filterRoomserverEvents( ctx context.Context, - events []gomatrixserverlib.Event, + events []gomatrixserverlib.HeaderedEvent, ) error { for _, ws := range s.workerStates { for _, event := range events { @@ -180,7 +178,7 @@ func (s *OutputRoomEventConsumer) filterRoomserverEvents( // appserviceIsInterestedInEvent returns a boolean depending on whether a given // event falls within one of a given application service's namespaces. -func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.Event, appservice config.ApplicationService) bool { +func (s *OutputRoomEventConsumer) appserviceIsInterestedInEvent(ctx context.Context, event gomatrixserverlib.HeaderedEvent, appservice config.ApplicationService) bool { // No reason to queue events if they'll never be sent to the application // service if appservice.URL == "" { diff --git a/appservice/storage/interface.go b/appservice/storage/interface.go index 4b75ff68..25d35af6 100644 --- a/appservice/storage/interface.go +++ b/appservice/storage/interface.go @@ -21,8 +21,8 @@ import ( ) type Database interface { - StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.Event) error - GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.Event, bool, error) + StoreEvent(ctx context.Context, appServiceID string, event *gomatrixserverlib.HeaderedEvent) error + GetEventsWithAppServiceID(ctx context.Context, appServiceID string, limit int) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) CountEventsWithAppServiceID(ctx context.Context, appServiceID string) (int, error) UpdateTxnIDForEvents(ctx context.Context, appserviceID string, maxID, txnID int) error RemoveEventsBeforeAndIncludingID(ctx context.Context, appserviceID string, eventTableID int) error diff --git a/appservice/storage/postgres/appservice_events_table.go b/appservice/storage/postgres/appservice_events_table.go index d72faeea..d33a83b1 100644 --- a/appservice/storage/postgres/appservice_events_table.go +++ b/appservice/storage/postgres/appservice_events_table.go @@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS appservice_events ( -- The ID of the application service the event will be sent to as_id TEXT NOT NULL, -- JSON representation of the event - event_json TEXT NOT NULL, + headered_event_json TEXT NOT NULL, -- The ID of the transaction that this event is a part of txn_id BIGINT NOT NULL ); @@ -42,14 +42,14 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + - "SELECT id, event_json, txn_id " + + "SELECT id, headered_event_json, txn_id " + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" const insertEventSQL = "" + - "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + "VALUES ($1, $2, $3)" const updateTxnIDForEventsSQL = "" + @@ -107,7 +107,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( limit int, ) ( txnID, maxID int, - events []gomatrixserverlib.Event, + events []gomatrixserverlib.HeaderedEvent, eventsRemaining bool, err error, ) { @@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( return } -func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) { // Get current time for use in calculating event age nowMilli := time.Now().UnixNano() / int64(time.Millisecond) @@ -141,7 +141,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. // new ones. Send back those events first. lastTxnID := invalidTxnID for eventsProcessed := 0; eventRows.Next(); { - var event gomatrixserverlib.Event + var event gomatrixserverlib.HeaderedEvent var eventJSON []byte var id int err = eventRows.Scan( @@ -209,7 +209,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID( func (s *eventsStatements) insertEvent( ctx context.Context, appServiceID string, - event *gomatrixserverlib.Event, + event *gomatrixserverlib.HeaderedEvent, ) (err error) { // Convert event to JSON before inserting eventJSON, err := json.Marshal(event) diff --git a/appservice/storage/postgres/storage.go b/appservice/storage/postgres/storage.go index c4756468..ef92db87 100644 --- a/appservice/storage/postgres/storage.go +++ b/appservice/storage/postgres/storage.go @@ -52,12 +52,12 @@ func (d *Database) prepare() error { return d.txnID.prepare(d.db) } -// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database // for a transaction worker to pull and later send to an application service. func (d *Database) StoreEvent( ctx context.Context, appServiceID string, - event *gomatrixserverlib.Event, + event *gomatrixserverlib.HeaderedEvent, ) error { return d.events.insertEvent(ctx, appServiceID, event) } @@ -68,7 +68,7 @@ func (d *Database) GetEventsWithAppServiceID( ctx context.Context, appServiceID string, limit int, -) (int, int, []gomatrixserverlib.Event, bool, error) { +) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) { return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } diff --git a/appservice/storage/sqlite3/appservice_events_table.go b/appservice/storage/sqlite3/appservice_events_table.go index 846f09f7..479f2213 100644 --- a/appservice/storage/sqlite3/appservice_events_table.go +++ b/appservice/storage/sqlite3/appservice_events_table.go @@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS appservice_events ( -- The ID of the application service the event will be sent to as_id TEXT NOT NULL, -- JSON representation of the event - event_json TEXT NOT NULL, + headered_event_json TEXT NOT NULL, -- The ID of the transaction that this event is a part of txn_id INTEGER NOT NULL ); @@ -42,14 +42,14 @@ CREATE INDEX IF NOT EXISTS appservice_events_as_id ON appservice_events(as_id); ` const selectEventsByApplicationServiceIDSQL = "" + - "SELECT id, event_json, txn_id " + + "SELECT id, headered_event_json, txn_id " + "FROM appservice_events WHERE as_id = $1 ORDER BY txn_id DESC, id ASC" const countEventsByApplicationServiceIDSQL = "" + "SELECT COUNT(id) FROM appservice_events WHERE as_id = $1" const insertEventSQL = "" + - "INSERT INTO appservice_events(as_id, event_json, txn_id) " + + "INSERT INTO appservice_events(as_id, headered_event_json, txn_id) " + "VALUES ($1, $2, $3)" const updateTxnIDForEventsSQL = "" + @@ -107,7 +107,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( limit int, ) ( txnID, maxID int, - events []gomatrixserverlib.Event, + events []gomatrixserverlib.HeaderedEvent, eventsRemaining bool, err error, ) { @@ -132,7 +132,7 @@ func (s *eventsStatements) selectEventsByApplicationServiceID( return } -func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.Event, maxID, txnID int, eventsRemaining bool, err error) { +func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib.HeaderedEvent, maxID, txnID int, eventsRemaining bool, err error) { // Get current time for use in calculating event age nowMilli := time.Now().UnixNano() / int64(time.Millisecond) @@ -141,7 +141,7 @@ func retrieveEvents(eventRows *sql.Rows, limit int) (events []gomatrixserverlib. // new ones. Send back those events first. lastTxnID := invalidTxnID for eventsProcessed := 0; eventRows.Next(); { - var event gomatrixserverlib.Event + var event gomatrixserverlib.HeaderedEvent var eventJSON []byte var id int err = eventRows.Scan( @@ -209,7 +209,7 @@ func (s *eventsStatements) countEventsByApplicationServiceID( func (s *eventsStatements) insertEvent( ctx context.Context, appServiceID string, - event *gomatrixserverlib.Event, + event *gomatrixserverlib.HeaderedEvent, ) (err error) { // Convert event to JSON before inserting eventJSON, err := json.Marshal(event) diff --git a/appservice/storage/sqlite3/storage.go b/appservice/storage/sqlite3/storage.go index 5040b61b..d0538e26 100644 --- a/appservice/storage/sqlite3/storage.go +++ b/appservice/storage/sqlite3/storage.go @@ -53,12 +53,12 @@ func (d *Database) prepare() error { return d.txnID.prepare(d.db) } -// StoreEvent takes in a gomatrixserverlib.Event and stores it in the database +// StoreEvent takes in a gomatrixserverlib.HeaderedEvent and stores it in the database // for a transaction worker to pull and later send to an application service. func (d *Database) StoreEvent( ctx context.Context, appServiceID string, - event *gomatrixserverlib.Event, + event *gomatrixserverlib.HeaderedEvent, ) error { return d.events.insertEvent(ctx, appServiceID, event) } @@ -69,7 +69,7 @@ func (d *Database) GetEventsWithAppServiceID( ctx context.Context, appServiceID string, limit int, -) (int, int, []gomatrixserverlib.Event, bool, error) { +) (int, int, []gomatrixserverlib.HeaderedEvent, bool, error) { return d.events.selectEventsByApplicationServiceID(ctx, appServiceID, limit) } diff --git a/appservice/workers/transaction_scheduler.go b/appservice/workers/transaction_scheduler.go index faa1e4a9..10c7ef91 100644 --- a/appservice/workers/transaction_scheduler.go +++ b/appservice/workers/transaction_scheduler.go @@ -181,9 +181,14 @@ func createTransaction( } } + var ev []gomatrixserverlib.Event + for _, e := range events { + ev = append(ev, e.Event) + } + // Create a transaction and store the events inside transaction := gomatrixserverlib.ApplicationServiceTransaction{ - Events: events, + Events: ev, } transactionJSON, err = json.Marshal(transaction)